如何使用函数将区块链数据流式传输到Kafka

  • Loom__
  • 发布于 2025-02-24 21:14
  • 阅读 66

本文介绍了如何使用QuickNode Functions将区块链数据流式传输到本地Kafka集群,包括Kafka集群的搭建、ngrok的配置、Function的创建及数据流的监控。

概述

实时区块链数据处理通常需要强大的消息队列来处理高吞吐量并确保数据的可靠传递。 Apache Kafka 是这一用例中最流行的解决方案之一。

在本指南中,我们将向你展示如何设置本地 Kafka 集群,并使用 QuickNode Functions 将区块链数据流式传输到其中。

你可以在 QuickNode 的 awesome-functions GitHub 仓库 中找到本仓库中的代码。

快速演示

观看这个使用 QuickNode Functions 设置和使用 Kafka 的简短演示:

通过 QuickNode Functions 将区块链数据流式传输到 Kafka 集群

通过 QuickNode Functions 将区块链数据流式传输到 Kafka 集群

7 分钟

20 次观看

0

在 Loom 中打开视频

1.2×

7 分钟⚡️8 分钟 36 秒6 分钟 53 秒5 分钟 44 秒4 分钟 35 秒4 分钟 3 秒3 分钟 26 秒2 分钟 45 秒

介绍

你的用户代理不支持 HTML5 视频元素。

通过 QuickNode Functions 将区块链数据流式传输到 Kafka 集群

7 分钟

20 次观看

0

在 Loom 中打开视频

1.2×

7 分钟⚡️8 分钟 36 秒6 分钟 53 秒5 分钟 44 秒4 分钟 35 秒4 分钟 3 秒3 分钟 26 秒2 分钟 45 秒

介绍

在这个演示中,你将看到:

  • 使用 Docker 设置本地 Kafka 环境
  • 通过 ngrok 暴露 Kafka
  • 创建一个 Function 来处理区块链数据
  • 配置 Stream 将数据发送到 Kafka
  • 在 Kafka UI 中监控数据流

现在,让我们逐步进行设置。

你需要什么

你将做什么

  • 使用 Docker 设置本地 Kafka 集群
  • 使用 ngrok 将 Kafka 暴露到互联网
  • 创建一个 Function 将区块链数据流式传输到 Kafka
  • 配置 Streams 将数据传递到你的 Function
  • 使用 Kafka UI 监控数据流

本地设置

首先,让我们设置本地 Kafka 环境。为你的项目创建一个新目录,并添加以下 docker-compose.yml 文件:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://${NGROK_URL:-localhost:9092}
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: PLAINTEXT
      DYNAMIC_CONFIG_ENABLED: 'true'

此配置设置了:

  • 一个用于 Kafka 集群管理的 Zookeeper 实例
  • 一个配置为内部和外部访问的 Kafka 代理
  • 一个用于监控 Kafka 的基于 Web 的 UI

暴露 Kafka

为了使我们的本地 Kafka 能够被 QuickNode Functions 访问,我们将使用 ngrok。打开终端并运行:

ngrok tcp 9092

你将收到一个转发 URL,例如 tcp://2.tcp.ngrok.io:18139。复制此 URL - 我们将在 Docker 和我们的 Function 中用到它。

现在,使用 ngrok URL 启动 Kafka。不要忘记将转发 URL 替换为你自己的 URL,并去掉 tcp:// 前缀:

NGROK_URL=2.tcp.ngrok.io:18139 docker-compose up -d

创建 Function

QuickNode 仪表板 中,创建一个新的 Function 并添加 kafkajs 作为依赖项。以下是我们的 Function 代码:

const { Kafka } = require('kafkajs');

// 使用你的 ngrok URL 配置 Kafka 代理,去掉 `tcp://` 前缀
const KAFKA_BROKER = process.env.KAFKA_BROKER || '2.tcp.ngrok.io:18139';  // 👈 替换为你自己的 ngrok URL

async function initializeKafka() {
    try {
        const kafka = new Kafka({
            clientId: 'quicknode-stream-producer',
            brokers: [KAFKA_BROKER],
            retry: {
                initialRetryTime: 100,
                retries: 5
            }
        });

        const producer = kafka.producer();
        await producer.connect();
        console.log('成功连接到 Kafka 代理');
        return producer;
    } catch (error) {
        console.error('初始化 Kafka 失败:', error);
        throw error;
    }
}

async function main(params) {
    let producer = null;
    try {
        producer = await initializeKafka();

        const {
            metadata: { dataset, network },
            data,
            user_data
        } = params;

        // 创建主题名称
        const sanitizedDataset = dataset.toLowerCase().replace(/[^a-z0-9-]/g, '-');
        const topic = `${network.toLowerCase()}-${sanitizedDataset}`;

        // 准备消息负载
        const messagePayload = {
            dataset,
            network,
            timestamp: new Date().toISOString(),
            data,
            user_data
        };

        // 发送到 Kafka
        const result = await producer.send({
            topic,
            messages: [\
                {\
                    key: `${network}-${dataset}-${Date.now()}`,\
                    value: JSON.stringify(messagePayload),\
                    headers: {\
                        network,\
                        dataset,\
                        timestamp: new Date().toISOString()\
                    }\
                }\
            ]
        });

        console.log(`成功将数据发送到 Kafka 主题 ${topic}`);

        // 返回前始终断开连接
        await producer.disconnect();
        console.log('Kafka 生产者已断开连接');

        return {
            status: 'success',
            message: `数据已发送到 Kafka 主题 ${topic}`,
            metadata: {
                dataset,
                network,
                kafka_result: result
            }
        };

    } catch (error) {
        if (producer) {
            await producer.disconnect();
            console.log('发生错误后 Kafka 生产者已断开连接');
        }

        return {
            status: 'error',
            message: error.message,
            metadata: {
                dataset: params.metadata?.dataset,
                network: params.metadata?.network
            }
        };
    }
}

module.exports = { main };

将测试参数保留为默认值,然后点击“保存并关闭”。

此 Function 的关键特性:

  • 为每次调用创建一个 Kafka 生产者
  • 根据网络和数据集自动创建主题
  • 在消息头中包含元数据
  • 正确处理连接和断开连接
  • 包括错误处理和日志记录

设置 Stream

  1. 转到 QuickNode 仪表板 中的 Streams 部分
  2. 点击“创建 Stream”
  3. 选择你想要的链、网络和数据集
  4. 保留其他设置为默认值
  5. 在“选择一个现有 Function”后添加你的 Kafka Function 作为目的地
  6. 启动 Stream

监控数据流

访问 http://localhost:8080 的 Kafka UI 以监控:

  • 正在创建的主题
  • 实时到达的消息
  • 消费者组和分区
  • 代理健康状况

Kafka UI

你还可以在 QuickNode 仪表板中检查你的 Function 日志,以获取详细的执行信息。

常见问题及解决方案

连接被拒绝

如果你看到连接错误:

  • 验证 ngrok 是否正在运行
  • 检查 Function 代码中的 ngrok URL
  • 确保 Kafka 容器正在运行

缺少主题

如果主题没有出现:

  • 主题在第一条消息时自动创建
  • 检查 Function 日志以确保消息成功传递
  • 验证代码中的主题命名约定

消息传递问题

如果消息没有出现:

  • 检查 Function 日志以确保成功传递
  • 在 Kafka UI 中验证主题名称
  • 确保正确的 Kafka 连接配置

生产环境注意事项

虽然此设置非常适合开发,但在生产环境中,你应该考虑:

  1. 使用托管 Kafka 服务而不是本地设置
  2. 实施适当的安全措施
  3. 设置监控和警报
  4. 处理连接重试和回退
  5. 实施消息传递保证

其他资源

最后的话

订阅我们的 新闻通讯 以获取更多关于区块链开发的文章和指南。无论你在构建什么,我们都希望听到你的声音。在 DiscordTwitter 上给我们留言,告诉我们你在做什么!

我们 ❤️ 反馈!

让我们知道 如果你有任何反馈或新主题的请求。我们非常乐意听取你的意见。

  • 原文链接: quicknode.com/guides/qui...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
Loom__
Loom__
江湖只有他的大名,没有他的介绍。