如何在不断开连接的情况下修改Solana Yellowstone gRPC订阅请求

  • Shyft_to
  • 发布于 6小时前
  • 阅读 36

本文介绍了如何在Solana上使用Yellowstone gRPC动态修改订阅请求,而无需断开流或丢失数据。通过stream.write()方法,可以在连接中发送新的订阅请求,从而实现实时更新订阅,同时保持流的活动状态,避免断开和重新连接的开销,适用于需要根据用户行为或外部事件进行调整的应用场景,例如追踪新上线的Token、监控钱包活动或响应治理提议。

学习如何在 Solana 上修改你的 Yellowstone gRPC 订阅请求,而无需停止流或丢失数据

重连 gRPC 流 Solana 封面

Solana Yellowstone gRPCs 提供了一种强大的方式来使用基于 gRPC 的订阅流式传输实时区块链事件,如交易、账户更新和区块。从流接收到的数据类型通过订阅请求指定,这允许开发者筛选特定的更新,如交易、账户或 slot。

虽然 Yellowstone gRPCs 是依赖连续、实时数据流的生产应用中的可靠技术,但实际用例通常需要动态适应性。在一些用例中,当你开始获取数据时,你最初请求的信息并不完全是你后来需要的,尤其是当你的应用或其用户发生变化时。动态修改订阅请求——无需断开和重新连接流——对于几个用例来说至关重要。

在本文中,我们将探讨如何在不中断流的情况下修改订阅请求

开始前的准备

要开始,我们需要一些东西。

身份验证:gRPC 端点和 gRPC Token Shyft 的 gRPC 节点遍布欧盟和美国地区。要访问,我们需要特定区域的 gRPC 端点和访问Token,这些Token可以在你的 Shyft 仪表板上购买。

一个服务器端后端(如 NodeJS)来接收 gRPC 数据 由于 web 浏览器不支持 gRPC 服务,因此你需要一个后端应用程序如 C#、Go、Java、Python 等来接收 gRPC 数据。

本文的完整代码可在 我们的文档 Replit 中找到——可以随意探索和测试。我们还在 GitHub 上分享了一系列涵盖 gRPC 和 DeFi 的示例用例,你可以克隆并进行实验。

代码示例:订阅然后更新 Yellowstone gRPC 流

这是一个示例,它使用 client.subscribe() 请求发送订阅,然后使用另一个 client.write() 来写入相同的订阅请求。

import Client, {
  CommitmentLevel,
  SubscribeRequestAccountsDataSlice,
  SubscribeRequestFilterAccounts,
  SubscribeRequestFilterBlocks,
  SubscribeRequestFilterBlocksMeta,
  SubscribeRequestFilterEntry,
  SubscribeRequestFilterSlots,
  SubscribeRequestFilterTransactions,
} from "@triton-one/yellowstone-grpc";
import { SubscribeRequestPing } from "@triton-one/yellowstone-grpc/dist/grpc/geyser";

interface SubscribeRequest {
  accounts: { [key: string]: SubscribeRequestFilterAccounts };
  slots: { [key: string]: SubscribeRequestFilterSlots };
  transactions: { [key: string]: SubscribeRequestFilterTransactions };
  transactionsStatus: { [key: string]: SubscribeRequestFilterTransactions };
  blocks: { [key: string]: SubscribeRequestFilterBlocks };
  blocksMeta: { [key: string]: SubscribeRequestFilterBlocksMeta };
  entry: { [key: string]: SubscribeRequestFilterEntry };
  commitment?: CommitmentLevel;
  accountsDataSlice: SubscribeRequestAccountsDataSlice[];
  ping?: SubscribeRequestPing;
}

const subscribedWalletsA: string[] = [\
  "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",\
  "5n2WeFEQbfV65niEP63sZc3VA7EgC4gxcTzsGGuXpump",\
  "4oJh9x5Cr14bfaBtUsXN1YUZbxRhuae9nrkSyWGSpump",\
  "GBpE12CEBFY9C74gRBuZMTPgy2BGEJNCn4cHbEPKpump",\
  "oraim8c9d1nkfuQk9EzGYEUGxqL3MHQYndRw1huVo5h",\
];

const subscribedWalletsB: string[] = [\
  "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P",\
];

const subscribeRequest1: SubscribeRequest = {
  accounts: {},
  slots: {},
  transactions: {
    modifying_A: {
      vote: false,
      failed: false,
      signature: undefined,
      accountInclude: subscribedWalletsA,
      accountExclude: [],
      accountRequired: [],
    },
  },
  transactionsStatus: {},
  entry: {},
  blocks: {},
  blocksMeta: {},
  accountsDataSlice: [],
  ping: undefined,
  commitment: CommitmentLevel.PROCESSED,
};

// Subscribes to account changes for program-owned accounts of subscribedWalletsB
// 订阅 subscribedWalletsB 的程序拥有的账户的账户更改
const subscribeRequest2: SubscribeRequest = {
  accounts: {
    modifying_B: {
      account: [],
      filters: [],
      owner: subscribedWalletsB,
    },
  },
  slots: {},
  transactions: {},
  transactionsStatus: {},
  blocks: {},
  blocksMeta: {
    block: [],
  },
  entry: {},
  accountsDataSlice: [],
  ping: undefined,
  commitment: CommitmentLevel.PROCESSED,
};

/**
 * Dynamically updates the current stream subscription with new request parameters.
 * 使用新的请求参数动态更新当前流订阅。
 */
async function updateSubscription(stream: any, args: SubscribeRequest) {
  try {
    stream.write(args);
  } catch (error) {
    console.error("Failed to send updated subscription request:", error);
  }
}

/**
 * Handles a single streaming session.
 * 处理单个流会话。
 * Automatically switches to a second subscription request after a timeout.
 * 超时后自动切换到第二个订阅请求。
 */
async function handleStream(client: Client, args: SubscribeRequest) {
  const stream = await client.subscribe();

  // Waits for the stream to close or error out
  // 等待流关闭或出错
  const streamClosed = new Promise<void>((resolve, reject) => {
    stream.on("error", (error) => {
      console.error("Stream Error:", error);
      reject(error);
      stream.end();
    });
    stream.on("end", resolve);
    stream.on("close", resolve);
  });

  // Automatically switch subscription after 10 seconds
  // 10 秒后自动切换订阅
  setTimeout(async () => {
    console.log("🔁 Switching to second subscription request...");
    await updateSubscription(stream, subscribeRequest2);
  }, 10000);

  // Handle incoming data
  // 处理传入数据
  stream.on("data", async (data) => {
    try {
      console.log("📦 Streamed Data:", data);
      // You can add more processing logic here
      // 你可以在此处添加更多处理逻辑
    } catch (error) {
      console.error("Error processing stream data:", error);
    }
  });

  // Send initial subscription request
  // 发送初始订阅请求
  await new Promise<void>((resolve, reject) => {
    stream.write(args, (err: any) => (err ? reject(err) : resolve()));
  }).catch((reason) => {
    console.error("Initial stream write failed:", reason);
    throw reason;
  });

  await streamClosed;
}

/**
 * Starts the stream and continuously attempts to reconnect on errors.
 * 启动流并不断尝试在错误时重新连接。
 */
async function subscribeCommand(client: Client, args: SubscribeRequest) {
  while (true) {
    try {
      await handleStream(client, args);
    } catch (error) {
      console.error("Stream error. Retrying in 1 second...", error);
      await new Promise((resolve) => setTimeout(resolve, 1000));
    }
  }
}

const client = new Client("YOUR-GRPC-URL", "YOUR-GRPC-TOKEN", undefined);

// Start streaming with the first subscription
// 从第一个订阅开始流式传输
subscribeCommand(client, subscribeRequest1);

在上面演示的示例中,我们可以看到我们有两个不同的订阅请求。流以 subscribeRequest1 开始,它侦听涉及预定义的一组钱包地址(subscribedWalletsA)的交易。

动态更新 10 秒后,代码使用 subscribeRequest2 调用 updateSubscription(),这将流切换到监视由另一个钱包集(subscribedWalletsB)拥有的帐户,并开始侦听 blocksMeta

这是在不结束或重新启动流的情况下完成的,这要归功于 stream.write() 方法,该方法在连接中发送新的 SubscribeRequest

强大的错误处理 handleStream() 函数侦听 errorcloseend 事件。如果流中断,subscribeCommand() 包装器会在 1 秒延迟后重试连接,从而确保在不稳定的网络条件下或后端中断期间的弹性。

结论:为什么动态订阅更新很重要

在 Solana Yellowstone gRPC 中间流修改 SubscribeRequest 是构建依赖于实时区块链数据的响应式、可用于生产的应用程序的关键功能。以下是这种方法如此强大的原因:

  • 实时更新,无需中断流: 通过在流保持打开状态时动态更新订阅,应用程序避免了断开连接然后重新连接的开销。这意味着没有丢弃的连接,没有重新同步的延迟,以及更流畅的开发者体验。
  • 灵活适应实时需求: 区块链应用程序通常需要根据用户行为或外部事件进行调整——例如跟踪新上市的代币、关注钱包活动的激增或对治理提案做出反应。动态订阅更新使你能够立即做出反应,而无需重新初始化数据基础设施。
  • 更低的延迟和零数据丢失: 当流保持活动状态时,数据传递中没有间隙。事件继续不间断地流动,这对于时间敏感型用例(如交易机器人、监控仪表板或自动化分析管道)至关重要。

如果你错过了,本文的完整代码可在 我们的文档 Replit 中找到——可以随意探索和测试。我们还在 GitHub 上分享了一系列涵盖 gRPC 和 DeFi 的示例用例,你可以克隆并进行实验。

你可以浏览我们的其他相关文章: 在 Solana 上流式传输实时数据 , 使用 gRPC 进行实时数据流式传输:账户、交易、区块 , 如何在 Solana 上流式传输实时 Pump.fun 更新 , 和 跟踪 Raydium 上的新池 .

资源

  • 原文链接: blogs.shyft.to/how-to-mo...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
Shyft_to
Shyft_to
在 Solana上更快更智能地构建,使用Shyft的SuperIndexer、gRPC、RPC、API和SDK