本文介绍了如何使用QuickNode的Streams服务来验证区块链数据的HMAC签名,以确保数据的完整性和真实性。文章详细描述了如何在Node.js、Python和Go中实现签名验证,并提供了最佳安全实践。
Streams
Streams 从免费计划开始即可使用。对于有独特需求的团队,我们提供定制的数据集、专属支持和自定义集成。联系我们的团队以获取更多信息。
由于基础设施设置的复杂性和持续的管理需求,实时流式传输区块链数据通常令人望而生畏。QuickNode 通过 Streams 解决了这一挑战,这是一个区块链数据流式传输解决方案,允许你轻松地将历史和实时数据直接流式传输到你的应用程序或服务中。
本指南将带你了解如何验证来自 Streams webhook 服务的传入消息的 HMAC 签名。确保这些消息的完整性和真实性对于保护你的应用程序免受篡改和伪造至关重要。通过本指南,你将掌握使用 Python 有效实现签名验证的知识。
你首选的语言环境(Node.js、Python 或 Go)
代码编辑器(例如 VSCode)
安装 ngrok
你的 Stream's 安全Token,来自 QuickNode 仪表板
特定语言的需求(见下面的依赖表)
Node.js
Python
Go
依赖项 | 版本 |
---|---|
node.js | >=16.x |
express | ^4.18.2 |
body-parser | ^1.20.2 |
ngrok | ^3.0.0 |
依赖项 | 版本 |
---|---|
python | >=3.7 |
flask | ^2.0.0 |
ngrok | ^3.0.0 |
依赖项 | 版本 |
---|---|
go | >=1.16 |
ngrok | ^3.0.0 |
当你的服务器从 Streams 接收到 webhook 时,它会包含三个用于验证的关键标头:
签名是通过将这些元素与有效载荷以特定方式组合在一起,然后使用你的 Stream 的安全Token作为密钥应用 HMAC-SHA256 哈希来创建的。现在,我们将进入指南的编码部分,向你展示如何验证来自 Streams 的传入 webhook 消息。
首先,我们需要一个用于签名验证的安全Token。你可以:
注意:这个 TypedWebhook.tools URL 是临时的,直到我们在指南的后面部分向你展示如何运行你自己的本地服务器和 ngrok 来验证传入消息。
首先,创建一个新的项目目录,并根据你选择的语言设置你的环境:
mkdir webhook-verification
cd webhook-verification
npm init -y
npm install express body-parser
mkdir webhook-verification
cd webhook-verification
pip install flask
mkdir webhook-verification
cd webhook-verification
go mod init webhook-verification
现在,让我们实现验证服务器。我们将创建一个服务器,它:
/webhook
端点上监听 POST 请求选择你首选的语言,并在适当的目录中创建你的服务器文件(基于你选择的语言)。记住将 your_security_token_here
占位符字符串替换为你的实际安全Token。
const express = require('express');
const crypto = require('crypto');
const bodyParser = require('body-parser');
const app = express();
const PORT = 9999;
app.use(bodyParser.raw({
type: '*/*',
limit: '50mb'
}));
app.use(bodyParser.json());
function verifySignature(secretKey, payload, nonce, timestamp, givenSignature) {
// 首先将字符串连接起来
const signatureData = nonce + timestamp + payload;
// 转换为字节
const signatureBytes = Buffer.from(signatureData);
// 使用转换为字节的密钥创建 HMAC
const hmac = crypto.createHmac('sha256', Buffer.from(secretKey));
hmac.update(signatureBytes);
const computedSignature = hmac.digest('hex');
console.log('\nSignature Debug:');
console.log('Message components:');
console.log('- Nonce:', nonce);
console.log('- Timestamp:', timestamp);
console.log('- Payload first 100 chars:', payload.substring(0, 100));
console.log('\nSignatures:');
console.log('- Computed:', computedSignature);
console.log('- Given:', givenSignature);
return crypto.timingSafeEqual(
Buffer.from(computedSignature, 'hex'),
Buffer.from(givenSignature, 'hex')
);
}
app.post('/webhook', async (req, res) => {
const secretKey = 'your_security_token_here';
const nonce = req.headers['x-qn-nonce'];
const timestamp = req.headers['x-qn-timestamp'];
const givenSignature = req.headers['x-qn-signature'];
if (!nonce || !timestamp || !givenSignature) {
console.error('Missing required headers');
return res.status(400).send('Missing required headers');
}
try {
const payloadString = req.body.toString('utf8');
const isValid = verifySignature(
secretKey,
payloadString,
nonce,
timestamp,
givenSignature
);
if (isValid) {
console.log('\n✅ Signature verified successfully');
return res.status(200).send('Webhook received and verified');
} else {
console.log('\n❌ Signature verification failed');
return res.status(401).send('Invalid signature');
}
} catch (error) {
console.error('Error processing webhook:', error);
return res.status(500).send('Error processing webhook');
}
});
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
import hmac
import hashlib
import gzip
from flask import Flask, request, jsonify
import logging
app = Flask(__name__)
## 配置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
## 将此替换为你的实际安全Token
SECRET_KEY = "your_security_token_here"
def verify_signature(secret_key, payload, nonce, timestamp, given_signature):
message = nonce + timestamp + payload
computed_signature = hmac.new(
secret_key.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(computed_signature, given_signature)
@app.route('/webhook', methods=['POST'])
def handle_webhook():
nonce = request.headers.get('X-QN-Nonce')
timestamp = request.headers.get('X-QN-Timestamp')
given_signature = request.headers.get('X-QN-Signature')
if not all([nonce, timestamp, given_signature]):
logger.error("Missing required headers")
return jsonify({"error": "Missing required headers"}), 400
# 获取原始有效载荷
raw_payload = request.get_data()
# 检查有效载荷是否经过 gzip 压缩
if request.headers.get('Content-Encoding') == 'gzip':
try:
payload = gzip.decompress(raw_payload).decode('utf-8')
except Exception as e:
logger.error(f"Error decompressing payload: {str(e)}")
return jsonify({"error": "Failed to decompress payload"}), 400
else:
payload = raw_payload.decode('utf-8')
try:
is_valid = verify_signature(SECRET_KEY, payload, nonce, timestamp, given_signature)
except Exception as e:
logger.error(f"Error verifying signature: {str(e)}")
return jsonify({"error": "Failed to verify signature"}), 500
if is_valid:
logger.info("Received valid webhook")
# 在此处理 webhook 有效载荷
# 目前,我们只返回成功消息
return jsonify({"message": "Webhook received and verified"}), 200
else:
logger.warning("Received invalid webhook")
return jsonify({"error": "Invalid signature"}), 401
if __name__ == '__main__':
app.run(debug=True, port=5000)
package main
import (
"bytes"
"compress/gzip"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"log"
"net/http"
)
// 处理 POST 请求的处理程序
func postHandler(w http.ResponseWriter, r *http.Request) {
// 确保方法是 POST
if r.Method != http.MethodPost {
http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
return
}
signature := r.Header.Get("X-QN-Signature")
nonce := r.Header.Get("X-QN-Nonce")
timestamp := r.Header.Get("X-QN-Timestamp")
secretKey := "your_security_token_here"
// 读取请求体
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read body", http.StatusInternalServerError)
return
}
defer r.Body.Close()
// 检查请求体是否经过 gzip 编码
if r.Header.Get("Content-Encoding") == "gzip" {
gzipReader, err := gzip.NewReader(bytes.NewReader(body))
if err != nil {
http.Error(w, "Failed to create gzip reader", http.StatusInternalServerError)
return
}
defer gzipReader.Close()
decodedBody, err := io.ReadAll(gzipReader)
if err != nil {
http.Error(w, "Failed to read gzip body", http.StatusInternalServerError)
return
}
body = decodedBody
}
err = VerifyHMAC(secretKey, nonce, timestamp, string(body), signature)
if err != nil {
fmt.Println(err)
http.Error(w, "Invalid HMAC", http.StatusUnauthorized)
return
}
fmt.Println("HMAC is valid")
// 响应客户端
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("Received POST request"))
}
func main() {
http.HandleFunc("/", postHandler)
// 在端口 8080 上启动服务器
port := ":8080"
fmt.Printf("Server is listening on port %s\n", port)
log.Fatal(http.ListenAndServe(port, nil))
}
func VerifyHMAC(secretKey, nonce, timestamp, message, receivedHMAC string) error {
// 将 nonce、timestamp 和 message 组合起来
data := nonce + timestamp + message
// 使用 SHA-256 和密钥创建新的 HMAC
h := hmac.New(sha256.New, []byte(secretKey))
h.Write([]byte(data))
// 计算预期的 HMAC
expectedHMAC = hex.EncodeToString(h.Sum(nil))
// 比较接收到的 HMAC 和预期的 HMAC
if !hmac.Equal([]byte(expectedHMAC), []byte(receivedHMAC)) {
return errors.New("invalid HMAC: message integrity or authenticity check failed")
}
// HMAC 有效
return nil
}
根据你的实现启动你的验证服务器:
node server.js
python verify_signature.py
go run main.go
然后,创建一个到你的本地服务器的隧道(根据你的实现调整端口):
ngrok http 9999
ngrok http 5000
ngrok http 8080
保持终端窗口方便,因为我们在更新 Stream 的 webhook 目标时需要 URL。
/webhook
(例如 https://abc123.ngrok.io/webhook)
Signature Debug:
Message components:
- Nonce: 02c6d2644296c8b830970891410825a3
- Timestamp: 1735613672
- Payload first 100 chars: {"data":[{"baseFeePerGas":"0xd557bf66","blobGasUsed":"0x60000","difficulty":"0x0","excessBlobGas":"0\
\
Signatures:\
- Computed: 049827ff010a1c24cd21594d72e490fd43a48d9e69a5c18628788063665134cc\
- Given: 049827ff010a1c24cd21594d72e490fd43a48d9e69a5c18628788063665134cc\
\
✅ Signature verified successfully\
\
```\
\
```\
INFO:__main__:Received valid webhook\
INFO:werkzeug:127.0.0.1 - - [30/Dec/2024 21:53:01] "POST /webhook HTTP/1.1" 200 -\
\
```\
\
```\
Server running on port :8080\
HMAC is valid\
\
```\
\
如果你看到成功的验证消息,恭喜!你的服务器正在正确验证 webhook 签名。\
\
### 最佳安全实践
\
在实现 HMAC 签名验证时,确保以下做法以保护你的应用程序:\
\
- 始终**保持安全Token的机密性**并安全存储,不要将其硬编码在代码的公开可访问区域中。\
- **记录所有验证尝试**,无论是成功还是失败,以帮助审计和故障排除。\
- 实施**额外的检查**,如时间戳验证,以防止重放攻击。\
\
### 结论
\
恭喜!你已成功学习了如何通过验证 HMAC 签名来验证传入的 Streams webhook 消息。这种做法对于保护你的应用程序免受外部威胁并确保数据完整性至关重要。\
\
如果你有任何疑问,请直接[联系我们](https://www.quicknode.com/contact-us)。如果你有任何想法或建议,例如新的目的地、功能、指标或数据集,你希望我们支持。\
\
此外,通过关注我们的 [Twitter](https://twitter.com/QuickNode) 并加入我们的 [Discord](https://discord.gg/quicknode) 和 [Telegram 公告频道](https://t.me/quicknodehq),随时了解最新动态。\
\
##### 我们 ❤️ 反馈!\
\
[让我们知道](https://airtable.com/shrKKKP7O1Uw3ZcUB?prefill_Guide+Name=How%20to%20Validate%20Incoming%20Streams%20Webhook%20Messages) 如果你有任何反馈或新主题的请求。我们很乐意听取你的意见。
>- 原文链接: [quicknode.com/guides/qui...](https://www.quicknode.com/guides/quicknode-products/streams/validating-incoming-streams-webhook-messages)
>- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!