本文介绍了如何结合QuickNode的CoinPaprika价格与市场数据API以及Yellowstone Geyser gRPC插件,构建一个强大的Solana流动性池监控系统。该系统能够获取特定Token的最高交易量池,并订阅这些池的实时交易更新进行分析,从而帮助交易者发现套利、做市或趋势跟踪的机会。文章提供搭建该系统的Rust代码示例和详细步骤。
跟踪高交易量流动性池的活动对于 Solana 上的自动化交易策略至关重要。通过识别具有显著交易量的池并实时监控其交易流,交易者可以发现套利、做市或趋势跟踪的机会。本指南演示了如何将 QuickNode 的 CoinPaprika 价格和市场数据 API 与 Yellowstone Geyser gRPC 插件结合起来,构建一个强大的池监控系统。
我们将构建一个 Rust 应用程序,它可以:
依赖 | 版本 |
---|---|
rustc | 1.85.0 |
cargo | 1.85.0 |
在深入研究代码之前,让我们了解这些服务是如何协同工作的:
CoinPaprika API 提供全面的市场数据,包括:
Yellowstone Geyser gRPC 提供:
通过组合这些服务,我们可以识别重要的池,并以最小的延迟监控它们的活动。
让我们开始构建吧!
首先,创建一个新的 Rust 项目并添加所需的依赖项:
cargo new pool-monitor && cd pool-monitor
使用以下依赖项更新你的 Cargo.toml
:
[package]
name = "pool-monitor"
version = "0.1.0"
edition = "2021"
[dependencies]
reqwest = { version = "0.12", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["full"] }
anyhow = "1.0"
dotenv = "0.15"
bs58 = "0.5.1"
futures = "0.3"
log = "0.4"
env_logger = "0.11"
tonic = { version = "0.12", features = ["tls", "tls-roots"] }
yellowstone-grpc-client = "6.1.0"
yellowstone-grpc-proto = "6.1.0"
在你的项目根目录中创建一个 .env
文件:
## QuickNode endpoint with CoinPaprika add-on enabled
QUICKNODE_URL=https://your-endpoint-name.solana-mainnet.quiknode.pro/your-token
## Yellowstone Geyser credentials
GEYSER_ENDPOINT=https://your-endpoint-name.solana-mainnet.quiknode.pro:10000
GEYSER_AUTH_TOKEN=your-auth-token
## Configuration
TARGET_TOKEN_ADDRESS=DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263
MONITORED_POOL_COUNT=3
将占位符值替换为你实际的 QuickNode Yellowstone gRPC 节点和身份验证 Token。你可以在我们的文档中找到有关配置你的节点的信息,这里。
让我们将实现分解为可管理的部分。
首先,我们需要在我们的 main.rs
文件中导入必要的 crate 和模块:
use {
anyhow::{Context, Result},
bs58,
dotenv::dotenv,
futures::{sink::SinkExt, stream::StreamExt},
log::{error, info, warn},
reqwest,
serde::{Deserialize, Serialize},
std::{collections::HashMap, env},
tokio,
tonic::{service::Interceptor, transport::ClientTlsConfig, Status},
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_proto::{
geyser::SubscribeUpdate,
prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest,
SubscribeRequestFilterTransactions,
},
},
};
为 BONK Token 地址和默认池计数添加常量:
const RUST_LOG_LEVEL: &str = "info";
const BONK_TOKEN_ADDRESS: &str = "DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263";
接下来,我们需要设置日志以捕获重要事件和错误:
fn setup_logging() {
env::set_var("RUST_LOG", RUST_LOG_LEVEL);
env_logger::init();
}
此函数使用默认日志级别初始化日志记录器,默认日志级别可以被 RUST_LOG
环境变量覆盖。
接下来,我们将定义数据结构来表示来自 CoinPaprika API 的池信息:
##[derive(Debug, Deserialize)]
pub struct PoolsResponse {
pub pools: Vec<Pool>,
}
##[derive(Debug, Deserialize, Clone)]
pub struct Pool {
pub id: String,
pub dex_name: String,
pub volume_usd: f64,
pub tokens: Vec<Token>,
}
##[derive(Debug, Deserialize, Clone)]
pub struct Token {
pub symbol: String,
}
##[derive(Debug, Serialize)]
pub struct PoolsQuery {
pub limit: u32,
pub sort: String,
pub order_by: String,
}
这些结构直接映射到 CoinPaprika API 响应格式。Pool
结构包含:
id
: 池的链上地址dex_name
: 哪个 DEX 托管了这个池 (Raydium, Orca, 等等)volume_usd
: 24 小时交易量,以美元计tokens
: 此池中的 Token 对接下来,让我们为 Pool
结构定义方法,以帮助我们格式化 Token 对并管理池元数据:
impl Pool {
pub fn token_pair(&self) -> String {
if self.tokens.len() >= 2 {
format!("{}/{}", self.tokens[0].symbol, self.tokens[1].symbol)
} else {
"Unknown pair".to_string()
}
}
}
##[derive(Debug, Clone)]
pub struct PoolMetadata {
pools: Vec<Pool>,
}
impl PoolMetadata {
pub fn new(pools: Vec<Pool>) -> Self {
Self { pools }
}
pub fn get_pool_ids(&self) -> Vec<String> {
self.pools.iter().map(|p| p.id.clone()).collect()
}
}
接下来,我们将创建一个客户端来与 CoinPaprika API 交互:
##[derive(Debug, Clone)]
pub struct QuickNodeClient {
client: reqwest::Client,
base_url: String,
}
impl QuickNodeClient {
pub fn from_env() -> Result<Self> {
let base_url = env::var("QUICKNODE_URL")
.context("Missing QUICKNODE_URL")?;
Ok(Self {
client: reqwest::Client::new(),
base_url,
})
}
pub async fn get_top_pools_by_volume(
&self,
token_address: &str,
limit: u32
) -> Result<Vec<Pool>> {
let query = PoolsQuery {
limit,
sort: "desc".to_string(),
order_by: "volume_usd".to_string(),
};
let url = format!(
"{}/addon/912/networks/solana/tokens/{}/pools",
self.base_url,
token_address
);
for attempt in 1..=3 {
let response = self.client
.get(&url)
.query(&query)
.send()
.await;
match response {
Ok(resp) if resp.status().is_success() => {
let json_text = resp.text().await.context("Failed to read response body")?;
let pools_response: PoolsResponse = serde_json::from_str(&json_text).context("Failed to parse JSON response")?;
return Ok(pools_response.pools);
}
Ok(resp) if attempt < 3 => {
let status = resp.status().as_u16();
warn!("Request failed with status {}, retrying in {}s... (attempt {}/3)", status, attempt, attempt);
tokio::time::sleep(tokio::time::Duration::from_secs(attempt)).await;
continue;
}
Ok(resp) => {
anyhow::bail!("API error: {}", resp.status());
}
Err(e) if attempt < 3 => {
warn!("Network error, retrying: {}", e);
tokio::time::sleep(
tokio::time::Duration::from_secs(attempt)
).await;
continue;
}
Err(e) => return Err(e).context("Request failed"),
}
}
unreachable!()
}
}
此客户端:
现在让我们实现获取和显示热门池的逻辑:
async fn fetch_pools() -> Result<PoolMetadata> {
let target_address = env::var("TARGET_TOKEN_ADDRESS").unwrap_or_else(|_| BONK_TOKEN_ADDRESS.to_string());
let pool_count: u32 = env::var("MONITORED_POOL_COUNT")
.unwrap_or_else(|_| "3".to_string())
.parse()
.unwrap_or(3);
info!("Fetching top {} target pools by volume...", pool_count);
let client = QuickNodeClient::from_env().context("Failed to create QuickNode client")?;
let pools = client.get_top_pools_by_volume(&target_address, pool_count)
.await
.context("Failed to fetch BONK pools")?;
if pools.is_empty() {
anyhow::bail!("No BONK pools found");
}
for (i, pool) in pools.iter().enumerate() {
info!("{}. {} - {} (${:.0})", i + 1, pool.dex_name, pool.token_pair(), pool.volume_usd);
}
Ok(PoolMetadata::new(pools))
}
此函数:
在确定了我们的池之后,我们需要设置 Geyser 客户端进行实时监控:
async fn create_geyser_client() -> Result<GeyserGrpcClient<impl Interceptor>> {
let endpoint = env::var("GEYSER_ENDPOINT").context("Missing GEYSER_ENDPOINT")?;
let auth_token = env::var("GEYSER_AUTH_TOKEN").context("Missing GEYSER_AUTH_TOKEN")?;
info!("Connecting to gRPC endpoint...");
let client = GeyserGrpcClient::build_from_shared(endpoint)?
.x_token(Some(auth_token))?
.tls_config(ClientTlsConfig::new().with_native_roots())?
.connect()
.await?;
Ok(client)
}
Geyser 客户端设置:
如果你不熟悉 Yellowstone,请查看我们的 Yellowstone Geyser 文档 或 Yellowstone gRPC (Rust) 指南 了解更多关于如何配置和使用它的细节。
现在我们将订阅涉及我们的目标池的交易:
async fn subscribe_to_pools(
client: &mut GeyserGrpcClient<impl Interceptor>,
pool_ids: Vec<String>,
) -> Result<impl StreamExt<Item = Result<SubscribeUpdate, Status>>> {
let (mut tx, rx) = client.subscribe().await?;
info!("Setting up filters for {} pools", pool_ids.len());
let mut accounts_filter = HashMap::new();
accounts_filter.insert(
"bonk_monitor".to_string(),
SubscribeRequestFilterTransactions {
account_include: pool_ids,
account_exclude: vec![],
account_required: vec![],
vote: Some(false),
failed: Some(false),
signature: None,
},
);
tx.send(SubscribeRequest {
transactions: accounts_filter,
commitment: Some(CommitmentLevel::Processed as i32),
..Default::default()
}).await?;
Ok(rx)
}
此订阅:
最后,让我们处理传入的交易流:
async fn process_transaction_stream(
mut stream: impl StreamExt<Item = Result<SubscribeUpdate, Status>> + Unpin,
) -> Result<()> {
while let Some(message) = stream.next().await {
match message {
Ok(msg) => handle_update(msg),
Err(e) => {
error!("Stream error: {:?}", e);
break;
}
}
}
Ok(())
}
fn handle_update(update: SubscribeUpdate) {
if let Some(UpdateOneof::Transaction(transaction_update)) = update.update_oneof {
if let Some(tx_info) = &transaction_update.transaction {
let tx_id = bs58::encode(&tx_info.signature).into_string();
info!(" Pool transaction: {}", tx_id);
// 在这里你将实现你的交易逻辑:
// - 解析指令数据
// - 计算交易金额
// - 检查套利机会
// - 执行反向交易
}
}
}
流处理器:
这是编排一切的 main
函数:
##[tokio::main]
async fn main() -> Result<()> {
dotenv().ok();
setup_logging();
info!("🚀 Starting Pool Monitor");
// 步骤 1:获取高交易量池
let pool_metadata = fetch_pools().await?;
// 步骤 2:连接到 Geyser
let mut client = create_geyser_client().await?;
// 步骤 3:订阅池交易
let pool_ids = pool_metadata.get_pool_ids();
let stream = subscribe_to_pools(&mut client, pool_ids).await?;
info!("👂 Listening for transactions...");
// 步骤 4:处理交易
process_transaction_stream(stream).await?;
Ok(())
}
要运行你的池监控器,请在你的终端中执行以下命令:
cargo run
确保你的 .env
文件已使用你的 QuickNode 节点和 Yellowstone Geyser 凭据正确配置。监控器将获取热门的 BONK 池,订阅它们的交易流,并在发生更新时记录更新。如果一切设置正确,你应该看到类似于以下的输出:
[2025-05-29T17:32:11Z INFO pool-monitor] 🚀 Starting BONK Pool Monitor
[2025-05-29T17:32:11Z INFO pool-monitor] Fetching top 3 BONK pools by volume...
[2025-05-29T17:32:13Z INFO pool-monitor] 1. Orca - SOL/Bonk ($1675994)
[2025-05-29T17:32:13Z INFO pool-monitor] 2. Meteora - Bonk/SOL ($1316065)
[2025-05-29T17:32:13Z INFO pool-monitor] 3. Raydium CLMM - SOL/Bonk ($1373222)
[2025-05-29T17:32:13Z INFO pool-monitor] Connecting to gRPC endpoint...
[2025-05-29T17:32:13Z INFO pool-monitor] Setting up filters for 3 pools
[2025-05-29T17:32:13Z INFO pool-monitor] 👂 Listening for transactions...
[2025-05-29T17:32:22Z INFO pool-monitor] BONK Pool transaction: 4ZMQyEM...
[2025-05-29T17:32:23Z INFO pool-monitor] BONK Pool transaction: 3ztQfCE...
[2025-05-29T17:32:23Z INFO pool-monitor] BONK Pool transaction: rKkM21m...
干得漂亮!
这个基本的监控器为更复杂的交易策略提供了基础。以下是你可能实施的一些增强功能的想法:
解析指令数据以确定:
查看我们的指南,让解析指令更容易:
比较不同池的价格以识别:
实施以下策略:
存储交易数据以进行:
在构建生产交易系统时:
通过结合 CoinPaprika 的 Solana Token 价格和流动性池 API 以及 QuickNode 的 Yellowstone Geyser 插件,你可以构建强大的池监控系统,而无需额外的基础设施开销。这种方法提供:
无论你是在构建套利机器人、做市商还是分析工具,这个基础都为你提供了在 Solana 上进行复杂交易策略所需的数据管道。
如果你有任何反馈或对新主题的请求,请 告诉我们。我们很乐意听到你的声音。
- 原文链接: quicknode.com/guides/sol...
- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!