如何验证传入的流Webhook消息

  • QuickNode
  • 发布于 2024-09-27 11:59
  • 阅读 43

本文介绍了如何使用QuickNode的Streams服务来验证区块链数据的HMAC签名,以确保数据的完整性和真实性。文章详细描述了如何在Node.js、Python和Go中实现签名验证,并提供了最佳安全实践。

Streams

Streams 从免费计划开始即可使用。对于有独特需求的团队,我们提供定制的数据集、专属支持和自定义集成。联系我们的团队以获取更多信息。

概述

由于基础设施设置的复杂性和持续的管理需求,实时流式传输区块链数据通常令人望而生畏。QuickNode 通过 Streams 解决了这一挑战,这是一个区块链数据流式传输解决方案,允许你轻松地将历史和实时数据直接流式传输到你的应用程序或服务中。

本指南将带你了解如何验证来自 Streams webhook 服务的传入消息的 HMAC 签名。确保这些消息的完整性和真实性对于保护你的应用程序免受篡改和伪造至关重要。通过本指南,你将掌握使用 Python 有效实现签名验证的知识。

你将做什么

  • 在 Node.js、Python 或 Go 中实现 webhook 签名验证
  • 设置服务器以处理传入的 webhook 消息
  • 使用 HMAC SHA-256 验证签名
  • 处理压缩和未压缩的有效载荷
  • 测试你的验证实现
  • 讨论最佳安全实践以增强验证过程的安全性

你需要什么

  • 你首选的语言环境(Node.js、Python 或 Go)

  • 代码编辑器(例如 VSCode)

  • 安装 ngrok

  • 你的 Stream's 安全Token,来自 QuickNode 仪表板

  • 特定语言的需求(见下面的依赖表)

  • Node.js

  • Python

  • Go

依赖项 版本
node.js >=16.x
express ^4.18.2
body-parser ^1.20.2
ngrok ^3.0.0
依赖项 版本
python >=3.7
flask ^2.0.0
ngrok ^3.0.0
依赖项 版本
go >=1.16
ngrok ^3.0.0

理解 Webhook 签名

当你的服务器从 Streams 接收到 webhook 时,它会包含三个用于验证的关键标头:

  • X-QN-Nonce:一个唯一的字符串,用于防止重放攻击
  • X-QN-Signature:你需要验证的 HMAC 签名
  • X-QN-Timestamp:消息签名时的时间戳

签名是通过将这些元素与有效载荷以特定方式组合在一起,然后使用你的 Stream 的安全Token作为密钥应用 HMAC-SHA256 哈希来创建的。现在,我们将进入指南的编码部分,向你展示如何验证来自 Streams 的传入 webhook 消息。

Stream 设置和安全Token

首先,我们需要一个用于签名验证的安全Token。你可以:

  • 如果你已有 Stream:转到你的 Stream's 设置选项卡并妥善保管你的安全Token(我们稍后会回到这里)
  • 如果你需要创建 Stream
  1. 访问 TypedWebhook.tools 并复制提供的 webhook URL
  2. 转到 QuickNode 仪表板上的 Streams 部分
  3. 点击 创建 Stream
  4. 配置你的 Stream:
    • 选择 Ethereum 作为区块链,选择 Mainnet 作为你的网络
    • 选择 Blocks 作为你的数据集
    • 在 webhook 目标设置中粘贴 TypedWebhook.tools URL
  5. 创建 Stream 并通过设置选项卡复制你的安全Token

注意:这个 TypedWebhook.tools URL 是临时的,直到我们在指南的后面部分向你展示如何运行你自己的本地服务器和 ngrok 来验证传入消息。

设置你的开发环境

首先,创建一个新的项目目录,并根据你选择的语言设置你的环境:

  • Node.js
  • Python
  • Go
mkdir webhook-verification
cd webhook-verification
npm init -y
npm install express body-parser
mkdir webhook-verification
cd webhook-verification
pip install flask
mkdir webhook-verification
cd webhook-verification
go mod init webhook-verification

实现验证服务器

现在,让我们实现验证服务器。我们将创建一个服务器,它:

  • /webhook 端点上监听 POST 请求
  • 提取必要的标头(例如 nonce、signature、timestamp)
  • 验证签名
  • 返回 HTTP 响应

选择你首选的语言,并在适当的目录中创建你的服务器文件(基于你选择的语言)。记住your_security_token_here 占位符字符串替换为你的实际安全Token。

  • Node.js
  • Python
  • Go
const express = require('express');
const crypto = require('crypto');
const bodyParser = require('body-parser');

const app = express();
const PORT = 9999;

app.use(bodyParser.raw({
    type: '*/*',
    limit: '50mb'
}));

app.use(bodyParser.json());

function verifySignature(secretKey, payload, nonce, timestamp, givenSignature) {
    // 首先将字符串连接起来
    const signatureData = nonce + timestamp + payload;

    // 转换为字节
    const signatureBytes = Buffer.from(signatureData);

    // 使用转换为字节的密钥创建 HMAC
    const hmac = crypto.createHmac('sha256', Buffer.from(secretKey));
    hmac.update(signatureBytes);
    const computedSignature = hmac.digest('hex');

    console.log('\nSignature Debug:');
    console.log('Message components:');
    console.log('- Nonce:', nonce);
    console.log('- Timestamp:', timestamp);
    console.log('- Payload first 100 chars:', payload.substring(0, 100));
    console.log('\nSignatures:');
    console.log('- Computed:', computedSignature);
    console.log('- Given:', givenSignature);

    return crypto.timingSafeEqual(
        Buffer.from(computedSignature, 'hex'),
        Buffer.from(givenSignature, 'hex')
    );
}

app.post('/webhook', async (req, res) => {
    const secretKey = 'your_security_token_here';
    const nonce = req.headers['x-qn-nonce'];
    const timestamp = req.headers['x-qn-timestamp'];
    const givenSignature = req.headers['x-qn-signature'];

    if (!nonce || !timestamp || !givenSignature) {
        console.error('Missing required headers');
        return res.status(400).send('Missing required headers');
    }

    try {
        const payloadString = req.body.toString('utf8');
        const isValid = verifySignature(
            secretKey,
            payloadString,
            nonce,
            timestamp,
            givenSignature
        );

        if (isValid) {
            console.log('\n✅ Signature verified successfully');
            return res.status(200).send('Webhook received and verified');
        } else {
            console.log('\n❌ Signature verification failed');
            return res.status(401).send('Invalid signature');
        }
    } catch (error) {
        console.error('Error processing webhook:', error);
        return res.status(500).send('Error processing webhook');
    }
});

app.listen(PORT, () => {
    console.log(`Server running on port ${PORT}`);
});
import hmac
import hashlib
import gzip
from flask import Flask, request, jsonify
import logging

app = Flask(__name__)

## 配置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

## 将此替换为你的实际安全Token
SECRET_KEY = "your_security_token_here"

def verify_signature(secret_key, payload, nonce, timestamp, given_signature):
    message = nonce + timestamp + payload
    computed_signature = hmac.new(
        secret_key.encode(),
        message.encode(),
        hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(computed_signature, given_signature)

@app.route('/webhook', methods=['POST'])
def handle_webhook():
    nonce = request.headers.get('X-QN-Nonce')
    timestamp = request.headers.get('X-QN-Timestamp')
    given_signature = request.headers.get('X-QN-Signature')

    if not all([nonce, timestamp, given_signature]):
        logger.error("Missing required headers")
        return jsonify({"error": "Missing required headers"}), 400

    # 获取原始有效载荷
    raw_payload = request.get_data()

    # 检查有效载荷是否经过 gzip 压缩
    if request.headers.get('Content-Encoding') == 'gzip':
        try:
            payload = gzip.decompress(raw_payload).decode('utf-8')
        except Exception as e:
            logger.error(f"Error decompressing payload: {str(e)}")
            return jsonify({"error": "Failed to decompress payload"}), 400
    else:
        payload = raw_payload.decode('utf-8')

    try:
        is_valid = verify_signature(SECRET_KEY, payload, nonce, timestamp, given_signature)
    except Exception as e:
        logger.error(f"Error verifying signature: {str(e)}")
        return jsonify({"error": "Failed to verify signature"}), 500

    if is_valid:
        logger.info("Received valid webhook")
        # 在此处理 webhook 有效载荷
        # 目前,我们只返回成功消息
        return jsonify({"message": "Webhook received and verified"}), 200
    else:
        logger.warning("Received invalid webhook")
        return jsonify({"error": "Invalid signature"}), 401

if __name__ == '__main__':
    app.run(debug=True, port=5000)
package main

import (
    "bytes"
    "compress/gzip"
    "crypto/hmac"
    "crypto/sha256"
    "encoding/hex"
    "errors"
    "fmt"
    "io"
    "log"
    "net/http"
)

// 处理 POST 请求的处理程序
func postHandler(w http.ResponseWriter, r *http.Request) {
    // 确保方法是 POST
    if r.Method != http.MethodPost {
        http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
        return
    }

    signature := r.Header.Get("X-QN-Signature")
    nonce := r.Header.Get("X-QN-Nonce")
    timestamp := r.Header.Get("X-QN-Timestamp")
    secretKey := "your_security_token_here"

    // 读取请求体
    body, err := io.ReadAll(r.Body)
    if err != nil {
        http.Error(w, "Failed to read body", http.StatusInternalServerError)
        return
    }

    defer r.Body.Close()

    // 检查请求体是否经过 gzip 编码
    if r.Header.Get("Content-Encoding") == "gzip" {
        gzipReader, err := gzip.NewReader(bytes.NewReader(body))
        if err != nil {
            http.Error(w, "Failed to create gzip reader", http.StatusInternalServerError)
            return
        }
        defer gzipReader.Close()

        decodedBody, err := io.ReadAll(gzipReader)
        if err != nil {
            http.Error(w, "Failed to read gzip body", http.StatusInternalServerError)
            return
        }
        body = decodedBody
    }

    err = VerifyHMAC(secretKey, nonce, timestamp, string(body), signature)
    if err != nil {
        fmt.Println(err)
        http.Error(w, "Invalid HMAC", http.StatusUnauthorized)
        return
    }

    fmt.Println("HMAC is valid")
    // 响应客户端
    w.WriteHeader(http.StatusOK)
    _, _ = w.Write([]byte("Received POST request"))
}

func main() {
    http.HandleFunc("/", postHandler)

    // 在端口 8080 上启动服务器
    port := ":8080"
    fmt.Printf("Server is listening on port %s\n", port)
    log.Fatal(http.ListenAndServe(port, nil))
}

func VerifyHMAC(secretKey, nonce, timestamp, message, receivedHMAC string) error {
    // 将 nonce、timestamp 和 message 组合起来
    data := nonce + timestamp + message

    // 使用 SHA-256 和密钥创建新的 HMAC
    h := hmac.New(sha256.New, []byte(secretKey))
    h.Write([]byte(data))

    // 计算预期的 HMAC
    expectedHMAC = hex.EncodeToString(h.Sum(nil))

    // 比较接收到的 HMAC 和预期的 HMAC
    if !hmac.Equal([]byte(expectedHMAC), []byte(receivedHMAC)) {
        return errors.New("invalid HMAC: message integrity or authenticity check failed")
    }

    // HMAC 有效
    return nil
}

启动你的服务器

根据你的实现启动你的验证服务器:

  • Node.js
  • Python
  • Go
node server.js
python verify_signature.py
go run main.go

设置 ngrok

然后,创建一个到你的本地服务器的隧道(根据你的实现调整端口):

  • Node.js
  • Python
  • Go
ngrok http 9999
ngrok http 5000
ngrok http 8080

保持终端窗口方便,因为我们在更新 Stream 的 webhook 目标时需要 URL。

使用传入的 Stream 数据验证消息

  1. 首先,复制在你的终端中运行的 ngrok URL(例如 https://abc123.ngrok.io)
  2. 然后,转到 QuickNode 仪表板中的 Stream 设置。
  3. 暂停你的 Stream 并将 webhook URL 更新为你的 ngrok URL + /webhook(例如 https://abc123.ngrok.io/webhook)
  4. 恢复你的 Stream
  5. 观察你的服务器日志。对于每个区块,你应该看到类似的输出:
  • Node.js
  • Python
  • Go

Signature Debug:
Message components:
- Nonce: 02c6d2644296c8b830970891410825a3
- Timestamp: 1735613672
- Payload first 100 chars: {"data":[{"baseFeePerGas":"0xd557bf66","blobGasUsed":"0x60000","difficulty":"0x0","excessBlobGas":"0\
\
Signatures:\
- Computed: 049827ff010a1c24cd21594d72e490fd43a48d9e69a5c18628788063665134cc\
- Given: 049827ff010a1c24cd21594d72e490fd43a48d9e69a5c18628788063665134cc\
\
✅ Signature verified successfully\
\
```\
\
```\
INFO:__main__:Received valid webhook\
INFO:werkzeug:127.0.0.1 - - [30/Dec/2024 21:53:01] "POST /webhook HTTP/1.1" 200 -\
\
```\
\
```\
Server running on port :8080\
HMAC is valid\
\
```\
\
如果你看到成功的验证消息,恭喜!你的服务器正在正确验证 webhook 签名。\
\
### 最佳安全实践 
\
在实现 HMAC 签名验证时,确保以下做法以保护你的应用程序:\
\
- 始终**保持安全Token的机密性**并安全存储,不要将其硬编码在代码的公开可访问区域中。\
- **记录所有验证尝试**,无论是成功还是失败,以帮助审计和故障排除。\
- 实施**额外的检查**,如时间戳验证,以防止重放攻击。\
\
### 结论 
\
恭喜!你已成功学习了如何通过验证 HMAC 签名来验证传入的 Streams webhook 消息。这种做法对于保护你的应用程序免受外部威胁并确保数据完整性至关重要。\
\
如果你有任何疑问,请直接[联系我们](https://www.quicknode.com/contact-us)。如果你有任何想法或建议,例如新的目的地、功能、指标或数据集,你希望我们支持。\
\
此外,通过关注我们的 [Twitter](https://twitter.com/QuickNode) 并加入我们的 [Discord](https://discord.gg/quicknode) 和 [Telegram 公告频道](https://t.me/quicknodehq),随时了解最新动态。\
\
##### 我们 ❤️ 反馈!\
\
[让我们知道](https://airtable.com/shrKKKP7O1Uw3ZcUB?prefill_Guide+Name=How%20to%20Validate%20Incoming%20Streams%20Webhook%20Messages) 如果你有任何反馈或新主题的请求。我们很乐意听取你的意见。

>- 原文链接: [quicknode.com/guides/qui...](https://www.quicknode.com/guides/quicknode-products/streams/validating-incoming-streams-webhook-messages)
>- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
QuickNode
QuickNode
江湖只有他的大名,没有他的介绍。