本文介绍了Reth Execution Extensions (ExEx)框架,旨在简化以太坊节点的复杂离链基础设施构建,支持高性能的实时数据处理。文章详细探讨了ExEx的工作原理,包括如何创建和使用ExEx来构建索引器和Rollup,强调了其在性能和开发便捷性方面的优势。
Reth 是一个用于构建高性能和可定制节点的全功能工具包。我们最近发布了改进 Reth 性能超过 100 倍的 性能路线图,以及我们的测试网络 Rollup - Reth AlphaNet,用于推动 Reth 的模块化和可扩展性到极限。
今天,我们很高兴地宣布 Reth 执行扩展(ExEx)。ExEx 是一个构建高性能和复杂离链基础设施的框架,作为后执行Hook。Reth 的 ExEx 可以用来实现 Rollup、索引器、MEV 机器人等,代码量比现有方法少超过 10 倍。通过此发布,我们展示了一个生产就绪的重组追踪器,代码行数少于 20,一种索引器,代码行数少于 250,以及一种 Rollup,代码行数少于 1000。
ExEx 是与研究集体 init4 共同设计的,init4 正在构建下一代以太坊基础设施。我们期待继续与 init4 团队合作,使 Reth 成为构建加密基础设施的首选平台!
区块链是一个以固定时间间隔确认区块及其交易数据的时钟。离链基础设施订阅这些定期的区块更新,并作为响应更新其自身的内部状态。
例如,考虑以太坊索引器是如何工作的:
eth_subscribe 或通过 eth_getFilterChanges 进行轮询。这是大型数据管道中常见的提取、转换和加载(ETL)模式,像 Fivetran 这样的公司负责数据提取,Snowflake 处理数据装载到数据仓库,而客户则专注于编写转换的业务逻辑。
我们观察到同样的模式也适用于其他加密基础设施元素,例如 Rollup、MEV 搜索器,或更复杂的数据基础设施,如 Shadow Logs。
以此为动力,我们识别出在为以太坊节点构建 ETL 管道时的关键挑战:
迫切需要一个更好的 API 来构建依赖于节点状态变化的离链基础设施。该 API 必须是高性能的、集成全面的,并且能感知重组。我们需要建立以太坊 ETL 基础设施和作业编排的 Airflow 时刻。
执行扩展 (ExEx) 是构建实时、高性能以及零操作离链基础设施的后执行Hook,基于 Reth。
执行扩展是一个从 Reth 状态衍生其状态的任务。此类状态衍生的示例包括 Rollup、索引器、MEV 提取器等。我们预计,开发者将构建可重用的 ExEx,这些 ExEx 以标准化方式进行组合,类似于 Cosmos SDK 模块或 Substrate 小部件的工作方式。

我们与 init4 团队 协同设计了执行扩展(请关注他们!),这是一个新兴的研究集体,正在构建下一代以太坊基础设施。我们期待继续与他们团队合作,将 ExEx 推向生产,并使 Reth 成为构建加密基础设施的首选平台!
我们仍处于构建 ExEx 的最佳实践早期阶段,欢迎开发者加入我们,探索构建离链加密基础设施的新前沿。如果有合作想法,请随时联系。
在 Rust 术语中,ExEx 是一个与 Reth 一起无限运行的 Future。ExEx 是使用一个解构为 ExEx 的异步闭包初始化的。以下是预期的端到端流程:
ExExNotification,其中 包含 已提交的区块和所有相关的交易和收据、状态变化以及与之相关的 Merkle 树更新列表。async 函数来消费该流,以获取状态,例如 Rollup 区块。该流在 追加 ExEx 状态 时暴露 ChainCommitted 变体,并提供 撤销任何变更 的 ChainReverted/Reorged 变体。这使 ExEx 可以在原生区块时间运行,同时提供安全处理重组的合理 API,而非不处理重组而引入延迟。ExExManager 调度,该管理者负责将 Reth 的通知路由到 ExEx,并将 ExEx 事件返回到 Reth,同时 Reth 的任务执行器推动 ExEx 完成。install_exex API 在节点上安装。以下是从节点开发者的角度看大致的样子:
上面的 <50 行代码片段封装了定义和安装 ExEx 的过程。它极其强大,允许在没有额外基础设施的情况下扩展以太坊节点的功能。
现在让我们逐步了解一些例子。
执行扩展的 “Hello World” 是一个重组追踪器。下图所示的 ExEx 说明了记录是否出现新链或重组。人们只需解析以下 ExEx 发出的日志信息,便可以轻松地在其 Reth 节点上构建重组追踪器。
在这个例子中,old 和 new 链在该区块范围内完全访问每一个状态变化,以及有关 Chain 结构的 Merkle 树更新和其他有用信息。
既然我们已经了解了如何Hook节点事件,现在让我们构建一个更复杂的示例,例如 一个用于 OP 堆栈链中存款和取款的索引器,使用 SQlite 作为后端。
在这个案例中:
sol! 宏加载了 OP 堆栈的桥接合约,以生成类型安全的 ABI 解码器(这是一个非常强大的宏,我们鼓励开发者 深入了解)。ExExNotification 时,我们会读取每个已提交区块的日志,解码它,然后将其插入到 SQLite 中。ExExNotification 是关于链重组的,则从 SQLite 表中删除相应的条目。就这样!超级简单,可能是在 30 分钟内构建的本地托管实时索引器中性能最高的。请查看下面的代码,并查看 完整示例。
现在让我们做一些更有趣的事情,构建一个最小的 Rollup 作为 ExEx,使用 EVM 运行时和 SQLite 作为后端!
如果你 拉远一点,甚至 Rollup 也是 ETL 式的管道:
在此示例中,我们演示了一个简化的 Rollup,将其状态派生自发布到 Zenith 的 RLP 编码 EVM 交易(一个 Holesky 智能合约,用于发布我们 Rollup 的块承诺)通过一个 简单的区块构建器,这两个项目都是由 init4 团队构建的。
该示例特别:
revm 数据库特征,以使用 SQLite 作为 EVM 后端。同样,超级简单。它还可以与 blobs 一起工作!
ExEx Rollups 是极其强大的,因为我们现在可以在 Reth 上运行任意数量的 Rollups,而无需额外的基础设施,只需将其作为 ExEx 安装。
我们正在努力扩展示例以支持 blobs,并提供内置排序器,以便于完成的端到端演示。如果这是你想构建的内容,请随时联系,因为我们认为这有可能引入 L2 PBS、去中心化/共享排序器或甚至基于 SGX 的排序器等功能。
下面是 示例 代码片段。
这个问题可以重新表述为“什么可以建模为后执行Hook?”结果是,很多事情都可以!
我们看到了几种应该构建的有价值 ExEx:
目前,ExEx 需要在节点上通过在主函数中进行自定义构建来安装。我们的目标是使 ExEx 动态加载为插件,并暴露 Docker Hub 风格的 reth pull API,以便开发者可以轻松地通过空中分发 ExEx 给节点操作员。
我们希望使 Reth 成为一个平台,在核心节点操作上提供稳定性和性能,同时也成为创新的发射台。
Reth 项目有望改变人们对构建高性能离链基础设施的方法的看法,而 ExEx 只是一个开始。我们期待继续在 Reth 上构建基础设施并对其进行投资。
如果你想构建 ExEx 项目,请联系 georgios@paradigm.xyz,或直接在 Github 上 贡献。
use futures::Future;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
// `ExExContext` 可在每个 ExEx 中使用,以与节点的其余部分接口。
//
// pub struct ExExContext<Node: FullNodeComponents> {
//     /// 用于与区块链交互的配置提供者。
//     pub provider: Node::Provider,
//     /// 节点的任务执行器。
//     pub task_executor: TaskExecutor,
//     /// 节点的交易池。
//     pub pool: Node::Pool,
//     /// 接收 [`ExExNotification`] 的通道。
//     pub notifications: Receiver<ExExNotification>,
//     // .. 其他有用的上下文字段
// }
async fn exex<Node: FullNodeComponents>(mut ctx: ExExContext<Node>) -> eyre::Result<()> {
    while let Some(notification) = ctx.notifications.recv().await {
        match ¬ification {
            ExExNotification::ChainCommitted { new } => {
                // 做一些事情
            }
            ExExNotification::ChainReorged { old, new } => {
                // 做一些事情
            }
            ExExNotification::ChainReverted { old } => {
                // 做一些事情
            }
        };
    }
    Ok(())
}
fn main() -> eyre::Result<()> {
    reth::cli::Cli::parse_args().run(|builder, _| async move {
        let handle = builder
            .node(EthereumNode::default())
            .install_exex("Minimal", |ctx| async move { exex(ctx) } )
            .launch()
            .await?;
        handle.wait_for_node_exit().await
    })
}async fn exex<Node: FullNodeComponents>(mut ctx: ExExContext<Node>) -> eyre::Result<()> {
    while let Some(notification) = ctx.notifications.recv().await {
        match ¬ification {
            ExExNotification::ChainCommitted { new } => {
                info!(committed_chain = ?new.range(), "Received commit");
            }
            ExExNotification::ChainReorged { old, new } => {
                info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
            }
            ExExNotification::ChainReverted { old } => {
                info!(reverted_chain = ?old.range(), "Received revert");
            }
        };
        if let Some(committed_chain) = notification.committed_chain() {
            ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
        }
    }
    Ok(())
}use alloy_sol_types::{sol, SolEventInterface};
use futures::Future;
use reth_exex::{ExExContext, ExExEvent};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_primitives::{Log, SealedBlockWithSenders, TransactionSigned};
use reth_provider::Chain;
use reth_tracing::tracing::info;
use rusqlite::Connection;
sol!(L1StandardBridge, "l1_standard_bridge_abi.json");
use crate::L1StandardBridge::{ETHBridgeFinalized, ETHBridgeInitiated, L1StandardBridgeEvents};
fn create_tables(connection: &mut Connection) -> rusqlite::Result<()> {
    connection.execute(
        r#"
            CREATE TABLE IF NOT EXISTS deposits (
                id               INTEGER PRIMARY KEY,
                block_number     INTEGER NOT NULL,
                tx_hash          TEXT NOT NULL UNIQUE,
                contract_address TEXT NOT NULL,
                "from"           TEXT NOT NULL,
                "to"             TEXT NOT NULL,
                amount           TEXT NOT NULL
            );
            "#,
        (),
    )?;
    // .. 其余的数据库初始化
    Ok(())
}
/// 一个 ExEx 示例,监听来自 OP 堆栈链的 ETH 桥接事件
/// 并在 SQLite 数据库中存储存款和取款。
async fn op_bridge_exex<Node: FullNodeComponents>(
    mut ctx: ExExContext<Node>,
    connection: Connection,
) -> eyre::Result<()> {
    // 处理所有新的链状态通知
    while let Some(notification) = ctx.notifications.recv().await {
        // 撤销所有存款和取款
        if let Some(reverted_chain) = notification.reverted_chain() {
            // ..
        }
        // 插入所有新的存款和取款
        if let Some(committed_chain) = notification.committed_chain() {
            // ..
        }
    }
    Ok(())
}
/// 将区块链解码为扁平的收据日志列表,并仅过滤
/// [L1StandardBridgeEvents]。
fn decode_chain_into_events(
    chain: &Chain,
) -> impl Iterator<Item = (&SealedBlockWithSenders, &TransactionSigned, &Log, L1StandardBridgeEvents)>
{
    chain
        // 获取所有区块和收据
        .blocks_and_receipts()
        // .. 继续解码
}
fn main() -> eyre::Result<()> {
    reth::cli::Cli::parse_args().run(|builder, _| async move {
        let handle = builder
            .node(EthereumNode::default())
            .install_exex("OPBridge", |ctx| async move {
                let connection = Connection::open("op_bridge.db")?;
                create_tables(&mut connection)?;
                Ok(op_bridge_exex(ctx, connection))
            })
            .launch()
            .await?;
        handle.wait_for_node_exit().await
    })
}
use alloy_rlp::Decodable;
use alloy_sol_types::{sol, SolEventInterface, SolInterface};
use db::Database;
use eyre::OptionExt;
use once_cell::sync::Lazy;
use reth_exex::{ExExContext, ExExEvent};
use reth_interfaces::executor::BlockValidationError;
use reth_node_api::{ConfigureEvm, ConfigureEvmEnv, FullNodeComponents};
use reth_node_ethereum::{EthEvmConfig, EthereumNode};
use reth_primitives::{
    address, constants,
    revm::env::fill_tx_env,
    revm_primitives::{CfgEnvWithHandlerCfg, EVMError, ExecutionResult, ResultAndState},
    Address, Block, BlockWithSenders, Bytes, ChainSpec, ChainSpecBuilder, Genesis, Hardfork,
    Header, Receipt, SealedBlockWithSenders, TransactionSigned, U256,
};
use reth_provider::Chain;
use reth_revm::{
    db::{states::bundle_state::BundleRetention, BundleState},
    DatabaseCommit, StateBuilder,
};
use reth_tracing::tracing::{debug, error, info};
use rusqlite::Connection;
use std::sync::Arc;
mod db;
sol!(RollupContract, "rollup_abi.json");
use RollupContrac:{RollupContractCalls, RollupContractEvents};
const DATABASE_PATH: &str = "rollup.db";
const ROLLUP_CONTRACT_ADDRESS: Address = address!("74ae65DF20cB0e3BF8c022051d0Cdd79cc60890C");
const ROLLUP_SUBMITTER_ADDRESS: Address = address!("B01042Db06b04d3677564222010DF5Bd09C5A947");
const CHAIN_ID: u64 = 17001;
static CHAIN_SPEC: Lazy<Arc<ChainSpec>> = Lazy::new(|| {
    Arc::new(
        ChainSpecBuilder::default()
            .chain(CHAIN_ID.into())
            .genesis(Genesis::clique_genesis(CHAIN_ID, ROLLUP_SUBMITTER_ADDRESS))
            .shanghai_activated()
            .build(),
    )
});
struct Rollup<Node: FullNodeComponents> {
    ctx: ExExContext<Node>,
    db: Database,
}
impl<Node: FullNodeComponents> Rollup<Node> {
    fn new(ctx: ExExContext<Node>, connection: Connection) -> eyre::Result<Self> {
        let db = Database::new(connection)?;
        Ok(Self { ctx, db })
    }
    async fn start(mut self) -> eyre::Result<()> {
        // 处理所有新的链状态通知
        while let Some(notification) = self.ctx.notifications.recv().await {
            if let Some(reverted_chain) = notification.reverted_chain() {
                self.revert(&reverted_chain)?;
            }
            if let Some(committed_chain) = notification.committed_chain() {
                self.commit(&committed_chain)?;
                self.ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
            }
        }
        Ok(())
    }
    /// 处理新的链提交。
    ///
    /// 此函数将所有交易解码到 Rollup 合约中为事件,执行相应的动作,并将结果插入数据库。
    fn commit(&mut self, chain: &Chain) -> eyre::Result<()> {
        let events = decode_chain_into_rollup_events(chain);
        for (_, tx, event) in events {
            match event {
                // 一个新的区块提交到 Rollup 合约。
                // 此区块在现有 Rollup 状态之上执行,并提交到数据库中。
                RollupContractEvents::BlockSubmitted(_) => {
                    // ..
                }
                // 对 Rollup 合约的 ETH 存款。此存款将添加到接收者的余额中并提交到数据库中。
                RollupContractEvents::Enter(RollupContract::Enter {
                    token,
                    rollupRecipient,
                    amount,
                }) => {
                    // ..
                }
                _ => (),
            }
        }
        Ok(())
    }
    /// 处理链撤销。
    ///
    /// 此函数将所有交易解码到 Rollup 合约中为事件,撤销相应的动作并更新数据库。
    fn revert(&mut self, chain: &Chain) -> eyre::Result<()> {
        let mut events = decode_chain_into_rollup_events(chain);
        // 逆转事件顺序以从提示开始撤销
        events.reverse();
        for (_, tx, event) in events {
            match event {
                // 从数据库中撤销区块。
                RollupContractEvents::BlockSubmitted(_) => {
                    // ..
                }
                // 从接收者的余额中减去存款。
                RollupContractEvents::Enter(RollupContract::Enter {
                    token,
                    rollupRecipient,
                    amount,
                }) => {
                    // ..
                }
                _ => (),
            }
        }
        Ok(())
    }
}
fn main() -> eyre::Result<()> {
    reth::cli::Cli::parse_args().run(|builder, _| async move {
        let handle = builder
            .node(EthereumNode::default())
            .install_exex("Rollup", move |ctx| async {
                let connection = Connection::open(DATABASE_PATH)?;
                Ok(Rollup::new(ctx, connection)?.start())
            })
            .launch()
            .await?;
        handle.wait_for_node_exit().await
    })
}
- 原文链接: paradigm.xyz/2024/05/ret...
- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
 
                如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!