本文介绍了如何使用QuickNode的Yellowstone gRPC接口,通过Go语言实时监控Solana链上DEX的流动性池交易。文章详细说明了如何设置Go环境、创建gRPC客户端,以及如何监控Raydium流动性池的交易活动和分析交易吞吐量,为DeFi应用和交易系统提供实时数据访问。
黄石龙之口 (Yellowstone) 是一个强大的 gRPC 接口,构建于 Solana 的 Geyser 插件系统之上,可以实时传输区块链数据。通过连接到这项服务,你可以同时跟踪多个 DEX 池的活动,分析交易模式,并构建需要精确到毫秒级数据的响应式应用程序。
在本指南中,你将:
Geyser 是 Solana 的插件系统,它使验证者能够将实时的区块链数据流传输到外部系统,而不会造成沉重的 RPC 负载。Geyser 不是通过 RPC 调用重复轮询区块链,而是在数据可用时推送数据,从而显著降低了延迟和资源使用。
黄石龙之口是一个构建于 Solana 的 Geyser 插件之上的开源 gRPC 接口。它提供高性能、类型安全的流传输:
对于 DeFi 应用程序和交易系统,这种实时数据访问可以提供关键的竞争优势。
首先,让我们创建一个新的 Go 项目并安装必要的依赖项。
mkdir solana-dex-monitor && cd solana-dex-monitor
go mod init solana-dex-monitor
go get google.golang.org/grpc
go get github.com/joho/godotenv
go get github.com/mr-tron/base58
go get github.com/rpcpool/yellowstone-grpc/examples/golang/proto
.env
文件来存储你的 QuickNode 凭据:qn_grpc_url=your-quicknode-yellowstone-endpoint.grpc.solana.quiknode.pro:443
qn_grpc_token=your-quicknode-token
你可以在我们的文档中找到有关配置端点的信息,这里。
现在,让我们创建我们的主应用程序来监控流动性池的交易。对于此示例,我们将利用 Raydium 上一个非常活跃的 SOL/USDC 池,3ucNos4NbumPLZNWztqGHNFFgkHeRMBQAVemeeomsUxv,但是你可以轻松添加其他池或将其适应其他池或 DEX。
创建一个名为 main.go
的文件,并添加以下代码:
package main
import (
"context"
"crypto/tls"
"fmt"
"log"
"os"
"strings"
"sync"
"time"
pb "github.com/rpcpool/yellowstone-grpc/examples/golang/proto"
"github.com/joho/godotenv"
"github.com/mr-tron/base58"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"
)
var (
endpoint string
token string
)
var SolUsdcPoolAddresses = []string{
"3ucNos4NbumPLZNWztqGHNFFgkHeRMBQAVemeeomsUxv", // Raydium SOL/USDC 池的例子
// 根据需要添加更多池地址
}
// 加载环境变量
func init() {
err := godotenv.Load()
if err != nil {
log.Fatalf("Error loading .env file: %v", err)
}
endpoint = getEnv("qn_grpc_url", "example.com:10000") // 默认值作为回退
token = getEnv("qn_grpc_token", "token")
}
// 辅助函数,用于获取具有默认值的环境变量
func getEnv(key, defaultValue string) string {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
return value
}
// PoolTxStats 跟踪交易的统计信息
type PoolTxStats struct {
txCount int
firstTxTime time.Time
lastTxTime time.Time
mutex sync.Mutex
}
// 用于跟踪统计信息的全局变量
var (
poolStats = make(map[string]*PoolTxStats) // 键是字符串形式的插槽
statsMutex sync.RWMutex
)
type tokenAuth struct {
token string
}
func (t tokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": t.token,
}, nil
}
func (t tokenAuth) RequireTransportSecurity() bool {
return true
}
// 用于从交易中安全提取签名的函数
func extractSignature(tx *pb.SubscribeUpdateTransaction) string {
if tx == nil {
return "No transaction"
}
if sig := tx.GetTransaction().GetSignature(); len(sig) > 0 {
// 将二进制签名转换为 base58
return base58.Encode(sig)
}
return "No signature found"
}
func boolPtr(b bool) *bool {
return &b
}
func main() {
// 设置连接参数
kacp := keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 5 * time.Second,
PermitWithoutStream: true,
}
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})),
grpc.WithKeepaliveParams(kacp),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*1024), grpc.UseCompressor(gzip.Name)),
grpc.WithPerRPCCredentials(tokenAuth{token: token}),
}
// 建立连接
conn, err := grpc.Dial(endpoint, opts...)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewGeyserClient(conn)
// 为 SOL/USDC 流动性池交易创建订阅请求
transactions := make(map[string]*pb.SubscribeRequestFilterTransactions)
transactions["sol_usdc_pool_txs"] = &pb.SubscribeRequestFilterTransactions{
Vote: boolPtr(false),
Failed: boolPtr(false),
AccountInclude: SolUsdcPoolAddresses,
AccountExclude: []string{},
AccountRequired: []string{},
}
commitment := pb.CommitmentLevel_CONFIRMED
subReq := &pb.SubscribeRequest{
Transactions: transactions,
Commitment: &commitment,
}
fmt.Println("Connecting to Yellowstone gRPC...")
stream, err := client.Subscribe(context.Background())
if err != nil {
fmt.Printf("Failed to subscribe to yellowstone: %v\n", err)
return
}
fmt.Println("Sending subscription request for SOL/USDC pool transactions...")
if err = stream.Send(subReq); err != nil {
fmt.Printf("Failed to send subscription request: %v\n", err)
return
}
// 打印池统计信息的标题
fmt.Printf("\n%-12s %-12s %-15s %-20s\n",
"Slot", "TX Count", "TX/sec", "Total Time (ms)")
fmt.Println(strings.Repeat("-", 65))
// 每 5 秒打印一次统计信息
lastPrintTime := time.Now()
printInterval := time.Second * 5
fmt.Println("Monitoring SOL/USDC liquidity pool transactions...")
for {
m, err := stream.Recv()
if err != nil {
fmt.Printf("Failed to receive yellowstone message: %v\n", err)
return
}
if tx := m.GetTransaction(); tx != nil {
// 提取交易签名
signature := extractSignature(tx)
// 使用插槽处理交易以进行跟踪
slot := tx.GetSlot()
now := time.Now()
// 维护基于插槽的简化跟踪方法
statsMutex.Lock()
slotStr := fmt.Sprintf("%d", slot)
if _, exists := poolStats[slotStr]; !exists {
poolStats[slotStr] = &PoolTxStats{
firstTxTime: now,
lastTxTime: now,
}
}
stats := poolStats[slotStr]
stats.mutex.Lock()
stats.txCount++
stats.lastTxTime = now
stats.mutex.Unlock()
statsMutex.Unlock()
// 打印带有签名的交易信息
fmt.Printf("Pool transaction detected at Slot=%d\nSignature=%s\n", slot, signature)
// 定期打印统计信息
if now.Sub(lastPrintTime) >= printInterval {
printPoolStats()
lastPrintTime = now
// 清理旧的统计信息
cleanupOldStats()
}
}
}
}
func printPoolStats() {
statsMutex.RLock()
defer statsMutex.RUnlock()
fmt.Println("\nTransaction Statistics:")
fmt.Printf("\n%-12s %-12s %-15s %-20s\n",
"Slot", "TX Count", "TX/sec", "Total Time (ms)")
fmt.Println(strings.Repeat("-", 65))
for slotStr, stats := range poolStats {
stats.mutex.Lock()
duration := stats.lastTxTime.Sub(stats.firstTxTime).Milliseconds()
var txPerSec float64
if duration > 0 {
txPerSec = float64(stats.txCount) / (float64(duration) / 1000.0)
}
fmt.Printf("%-12s %-12d %-15.2f %-20d\n",
slotStr,
stats.txCount,
txPerSec,
duration,
)
stats.mutex.Unlock()
}
fmt.Println()
}
func cleanupOldStats() {
statsMutex.Lock()
defer statsMutex.Unlock()
now := time.Now()
for slotStr, stats := range poolStats {
stats.mutex.Lock()
// 删除超过 1 分钟的统计信息
if now.Sub(stats.lastTxTime) > time.Minute {
delete(poolStats, slotStr)
}
stats.mutex.Unlock()
}
}
让我们分解应用程序的关键组件:
池地址:我们定义了一个示例 SOL/USDC 池地址列表——在本例中,我们使用一个活跃的 Raydium 池。你可以通过将地址附加到 SolUsdcPoolAddresses
切片来添加更多池。
身份验证:tokenAuth
结构处理使用你的 QuickNode Token进行授权。
连接设置:我们使用 TLS、压缩和 keepalive 参数配置 gRPC 连接设置。
我们监控系统的核心是订阅请求:
transactions["sol_usdc_pool_txs"] = &pb.SubscribeRequestFilterTransactions{
Vote: boolPtr(false),
Failed: boolPtr(false),
AccountInclude: SolUsdcPoolAddresses,
AccountExclude: []string{},
AccountRequired: []string{},
}
此过滤器:
对于每个传入的交易:
extractSignature
函数提取签名要运行监控器:
go run main.go
你应该看到类似于以下的输出:
Connecting to Yellowstone gRPC...
Sending subscription request for SOL/USDC pool transactions...
Slot TX Count TX/sec Total Time (ms)
-----------------------------------------------------------------
Monitoring SOL/USDC liquidity pool transactions...
Pool transaction detected at Slot=212439883
Signature=4ZV7JsQTwQfLWtN9YMu2EJkTKjyAC9Yjjd1TGY8X5qvqYhfpfTKk3PUK5NZ2P9HFfxXUE2mRJsW2LcUfTF1oTBcP
Transaction Statistics:
Slot TX Count TX/sec Total Time (ms)
-----------------------------------------------------------------
212439883 1 0.20 5000
想要继续构建吗?以下是一些增强你的监控器的想法:
你可以通过在 SolUsdcPoolAddresses
切片中包含它们的地址来轻松添加更多要监控的池:
var SolUsdcPoolAddresses = []string{
"3ucNos4NbumPLZNWztqGHNFFgkHeRMBQAVemeeomsUxv", // Raydium SOL/USDC 池的例子
// 根据需要添加更多池地址
}
你可以解码交易数据以提取交换金额、价格影响和其他详细信息,而不仅仅是计数交易:
// 添加此函数以解码交易数据
func decodeTransaction(tx *pb.SubscribeUpdateTransaction) {
// 基于程序 ID 解析交易数据
// 不同的 DEX(Orca、Raydium、Jupiter)具有不同的交易结构
}
将监控器连接到数据库以存储交易数据以供以后分析:
// 添加数据库集成
func storeTransactionData(slot uint64, signature string, details map[string]interface{}) {
// 插入数据库(PostgreSQL、InfluxDB 等)
}
解析帐户数据后,可以添加逻辑来检测显着的价格波动或异常活动:
// 添加价格监控
func detectPriceAnomaly(currentPrice, previousPrice float64) bool {
// 实施异常检测逻辑
percentChange := (currentPrice - previousPrice) / previousPrice * 100
return math.Abs(percentChange) > 1.0 // 警报超过 1% 的价格变化
}
将 Yellowstone gRPC 与 Go 结合使用提供了一种强大的方式来以极低的延迟监控 Solana 流动性池。这种方法可以实时跟踪多个程序中的 DEX 活动,为你提供交易、分析或监控应用程序所需的数据。
对于毫秒至关重要的交易机器人或套利系统,Yellowstone 的流式传输方法比传统的 RPC 方法具有显着优势。通过直接进入 Solana 区块链数据源,你可以随时了解市场动向。
如果你对新主题有任何反馈或要求,请告诉我们。我们很乐意收到你的来信。
如果你有任何问题或者需要帮助去实现你的 Solana dApp,加入我们的 Discord 社区 或者联系我们的支持团队!
- 原文链接: quicknode.com/guides/sol...
- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!