如何使用 Solana Yellowstone gRPC 重新连接和重放插槽

  • Shyft_to
  • 发布于 6小时前
  • 阅读 26

本文介绍了如何为 Solana gRPC 流实现重新连接逻辑,并具有重放功能,以确保不会错过任何插槽。通过跟踪最后接收的插槽并在断开连接后从该插槽恢复,可以避免错过更新或重复数据,从而提高应用程序的可靠性和实时性。

在本文中,你将学习如何为你的 Solana gRPC 流实现重连逻辑,并具有重放功能,以确保你不会错过任何 slot。

重连机制封面

Yellowstone gRPC 是一个功能强大、可用于生产环境且经过实战检验的工具,用于流式传输实时的 Solana 数据。但在实际条件下,网络中断或服务器重启可能导致连接中断。如果没有适当的重连策略,你的应用程序可能会错过区块链的关键更新。 为了防止这种情况,重要的是构建一个系统,该系统不仅自动重连,而且还从特定的 slot 恢复数据流,从而确保一致性、可靠性和零遗漏事件。

开始之前

要开始,我们需要准备一些东西。

认证:gRPC 端点和 gRPC Token Shyft 的 gRPC 节点遍布欧盟和美国地区的各个位置。要访问,我们需要一个特定于区域的 gRPC 端点和一个访问Token,你可以在 Shyft 仪表板 上购买。

服务器端后端(如 NodeJS)用于接收 gRPC 数据 由于 Web 浏览器不支持 gRPC 服务,因此你需要一个后端应用程序,例如 C#、Go、Java、Python 等,来接收 gRPC 数据。

代码示例:实现重连机制

为了确保流从临时断开连接中自动恢复,我们实现了一个简单的重连循环。如果连接因错误而断开,应用程序会等待一段短暂的延迟,然后使用相同的订阅请求重新启动流。这确保了连续的数据流,无需手动干预,即使在不稳定的网络条件下也是如此。

/**
 * 重连机制在 handle stream 函数中实现
 * 如果发生任何错误,流将等待 1000 毫秒,然后调用
 * handleStream 函数,该函数反过来将重新启动流
 */
async function subscribeCommand(client: Client, args: SubscribeRequest) {
  while (true) {
    try {
      await handleStream(client, args); // 订阅并处理流
    } catch (error) {
      console.error("Stream error, retrying in 1 second...", error);
      await new Promise((resolve) => setTimeout(resolve, 1000));
      // 可以在这里更改超时时间
    }
  }
}

该代码演示了一个 while 循环,其中调用了 handleStream() 函数。handleStream() 函数负责订阅和接收流。一旦流中断,将从 handle stream 函数抛出一个错误,该错误在循环内处理。然后,循环等待给定的超时时间并迭代,重新发送订阅请求。

你可以查看 我们的文档 ,或者直接运行 Repl 此处的代码 以获取上面示例的完整代码。

代码示例:从特定 Slot 重放更新

为了避免在断开连接期间丢失任何数据,流会跟踪从每个交易更新收到的最后一个 slot。当流遇到错误时,它会尝试重新连接,并使用 SubscribeRequest 中的 fromSlot 字段从该确切 slot 恢复。此逻辑确保在重新连接时不会跳过任何交易更新。使用重试计数器来防止无限次尝试 — 达到限制后,系统会回退到从最新的可用 slot 进行流式传输。

require("dotenv").config();
import Client, { CommitmentLevel } from "@triton-one/yellowstone-grpc";
import { SubscribeRequest } from "@triton-one/yellowstone-grpc/dist/types/grpc/geyser";
import * as bs58 from "bs58";

const MAX_RETRY_WITH_LAST_SLOT = 30;
const RETRY_DELAY_MS = 1000;
const ADDRESS_TO_STREAM_FROM = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P";

type StreamResult = {
  lastSlot?: string;
  hasRcvdMSg: boolean;
};

async function handleStream(
  client: Client,
  args: SubscribeRequest,
  lastSlot?: string
): Promise<StreamResult> {
  const stream = await client.subscribe();
  let hasRcvdMSg = false;

  return new Promise((resolve, reject) => {
    stream.on("data", (data) => {
      const tx = data.transaction?.transaction?.transaction;
      if (tx?.signatures?.[0]) {
        const sig = bs58.encode(tx.signatures[0]);
        console.log("Got tx:", sig);
        lastSlot = data.transaction.slot;
        hasRcvdMSg = true;
      }
    });

    stream.on("error", (err) => {
      stream.end();
      reject({ error: err, lastSlot, hasRcvdMSg });
    });

    const finalize = () => resolve({ lastSlot, hasRcvdMSg });
    stream.on("end", finalize);
    stream.on("close", finalize);

    stream.write(args, (err: any) => {
      if (err) reject({ error: err, lastSlot, hasRcvdMSg });
    });
  });
}

async function subscribeCommand(client: Client, args: SubscribeRequest) {
  let lastSlot: string | undefined;
  let retryCount = 0;

  while (true) {
    try {
      if (args.fromSlot) {
        console.log("Starting stream from slot", args.fromSlot);
      }

      const result = await handleStream(client, args, lastSlot);
      lastSlot = result.lastSlot;
      if (result.hasRcvdMSg) retryCount = 0;
    } catch (err: any) {
      console.error(
        `Stream error, retrying in ${RETRY_DELAY_MS / 1000} second...`
      );
      await new Promise((resolve) => setTimeout(resolve, RETRY_DELAY_MS));

      lastSlot = err.lastSlot;
      if (err.hasRcvdMSg) retryCount = 0;

      if (lastSlot && retryCount < MAX_RETRY_WITH_LAST_SLOT) {
        console.log(
          `#${retryCount} retrying with last slot ${lastSlot}, remaining retries ${
            MAX_RETRY_WITH_LAST_SLOT - retryCount
          }`
        );
        args.fromSlot = lastSlot;
        retryCount++;
      } else {
        console.log("Retrying from latest slot (no last slot available)");
        delete args.fromSlot;
        retryCount = 0;
        lastSlot = undefined;
      }
    }
  }
}

const client = new Client(process.env.GRPC_URL!, process.env.X_TOKEN!, {
  "grpc.keepalive_permit_without_calls": 1,
  "grpc.keepalive_time_ms": 10000,
  "grpc.keepalive_timeout_ms": 1000,
  "grpc.default_compression_algorithm": 2,
});

const req: SubscribeRequest = {
  accounts: {},
  slots: {},
  transactions: {
    pumpFun: {
      vote: false,
      failed: false,
      accountInclude: [ADDRESS_TO_STREAM_FROM],
      accountExclude: [],
      accountRequired: [],
    },
  },
  transactionsStatus: {},
  blocks: {},
  blocksMeta: {},
  entry: {},
  accountsDataSlice: [],
  commitment: CommitmentLevel.CONFIRMED,
};

subscribeCommand(client, req);

与上一种方法类似,无限循环确保流在每次断开连接时都保持重新连接。我们初始化两个变量,一个用于存储 lastSlot,即从流收到的最新 slot,另一个用于 retryCount,它限制了从先前 slot 重试的次数,以避免卡在错误数据或间隙上。

if (args.fromSlot) {
    console.log("Starting stream from slot", args.fromSlot);
}

在启动流之前,代码检查是否设置了 fromSlot。如果是,流将从该特定 slot 恢复,而不是从最新的区块开始。handleStream 函数打开流,侦听传入的交易数据,并跟踪收到的最新 slot。如果收到任何数据,它会将流标记为成功(hasRcvdMSg = true)并重置重试计数器,以便系统可以在需要时继续从上次已知的 slot 重试。

const result = await handleStream(client, args, lastSlot);
lastSlot = result.lastSlot;
if (result.hasRcvdMSg) retryCount = 0;
  • 如果在发生错误之前成功记录了 lastSlot,则将在下一次尝试中重复使用它。
  • 如果之前的流确实传递了数据,我们将重置 retryCount

智能回退

if (lastSlot && retryCount < MAX_RETRY_WITH_LAST_SLOT) {
  args.fromSlot = lastSlot;
  retryCount++;
} else {
  delete args.fromSlot;
  retryCount = 0;
  lastSlot = undefined;
}

这是核心的弹性逻辑

  • 如果我们仍然有有效的 lastSlot 并且没有超过重试限制,我们将尝试从它恢复。
  • 如果我们重试的次数过多或者没有有效的 slot,我们将清除 fromSlot 并让流从区块链的顶端开始。

本文的完整代码可在 GitHub 上获取 — 随意克隆并进行测试。我们还在 GitHub 上分享了一系列涵盖 gRPC 和 DeFi 的示例用例,你可以克隆并进行实验。

结论

构建具有 基于 slot 的重放 的重连策略,可确保你的 Solana 应用程序 保持可靠和实时 — 即使在网络中断的情况下也是如此。通过跟踪上次收到的 slot 并智能地重试,你可以从中断的地方恢复流式传输,避免错过更新或重复数据。这种方法 增加了弹性,并保证了 任何生产级区块链应用程序的 更顺畅的 用户体验。

你可以浏览我们的其他相关文章: 在 Solana 上流式传输实时数据 使用 gRPC 进行实时数据流:账户、交易、区块 如何在 Solana 上流式传输实时的 Pump.fun 更新 ,以及 追踪 Raydium 上的新资金池

资源

  • 原文链接: blogs.shyft.to/how-to-re...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
Shyft_to
Shyft_to
在 Solana上更快更智能地构建,使用Shyft的SuperIndexer、gRPC、RPC、API和SDK