扫链同步器是一个钱包业务的核心。无论是充值、提现、归集、还是热转冷、冷转热和回滚业务,都是需要和链上交易数据进行交互。而我们如何获取链上的交易数据呢,扫链同步器就是获取链上交易数据到本地环境上的一个核心组件。可以说,扫链同步器是一个交易所业务系统的核心驱动引擎。
在交易所中,扫链同步器是一个钱包业务的核心。无论是充值、提现、归集、还是热转冷、冷转热和回滚业务,都是需要和链上交易数据进行交互。而我们如何获取链上的交易数据呢,扫链同步器就是获取链上交易数据到本地环境上的一个核心组件。可以说,扫链同步器是一个交易所业务系统的核心驱动引擎。
完整项目 github
地址(如果对您有用,请给个小 star
):
exchange-wallet-service
:钱包业务层服务: <https://github.com/Shawn-Shaw-x/exchange-wallet-service>signature-machine
:离线签名机: <https://github.com/Shawn-Shaw-x/signature-machine>chains-union-rpc
:多链统一 rpc
: <https://github.com/Shawn-Shaw-x/chains-union-rpc>
同步器内主要做两件事:
同步区块头:
根据设置的同步步长,去批量同步区块到本地数据库中,并保证区块的正确性(通过检查区块头中的 parentHash
和前一个区块头中的 Hash
是否一致)
解析交易: 拿到批量的区块头列表后,可以根据区块头的块高,调用接口获取这个区块的所有交易。然后对交易中的与我们项目方有关的充值、提现、归集、热转冷、冷转热交易进行过滤筛选,打上标记推送到一个同步管道中。供我们后续的相关任务进行使用。
worker
下,建立 synchronizer.go
文件
核心数据结构为一个管道,用于存放每个项目方的需要处理的批量交易
核心管道,存放一批次的交易,map 中的 key 为业务方 id*/
buinessChannels chan map[string]*BatchTransactions
在 cli.go
中集成启动扫链同步的任务
{
Name: "work",
Flags: flags,
Description: "Run rpc scanner wallet chain node",
Action: cliapp.LifecycleCmd(runAllWorker),
},
使用定时任务启动 扫链同步器
/*定时任务*/
syncer.worker = clock.NewLoopFn(clock.SystemClock, syncer.tick, func() error {
log.Info("shutting down synchronizer produce...")
close(syncer.businessChannels)
return nil
}, syncer.loopInterval)
调用封装的方法,通过 chains-union-rpc
接口批量获取区块头,并且判断链上是否出现回滚情况。
如果出现某个区块的 parentHash
不等于上一个区块的 hash
则认为出现链回滚(重组的情况),
则同步器会空转,无法获取到新的一批区块,直到重组区块被处理完成。(通过 lastTraversalBlockHeader
来进行标记处理)
/*headers 只有一个数据的情况(边界情况):
元素的 parentHash != lastTraversedHeader 的 Hash
则说明发生链重组-->触发 fallback*/
if len(headers) == 1 && f.lastTraversedHeader != nil && headers[0].ParentHash != f.lastTraversedHeader.Hash {
log.Warn("lastTraversedHeader and header zero: parentHash and hash", "parentHash", headers[0].ParentHash, "Hash", f.lastTraversedHeader.Hash)
return nil, blockHeader, true, ErrBlockFallBack
}
/*如果发现第 i 个 header 与 i-1 个不连续(parentHash 不匹配),
也说明链断开或被重组。*/
if len(headers) > 1 && headers[i-1].Hash != headers[i].ParentHash {
log.Warn("headers[i-1] nad headers[i] parentHash and hash", "parentHash", headers[i].ParentHash, "Hash", headers[i-1].Hash)
return nil, blockHeader, true, ErrBlockFallBack
}
区块头批量扫描完成后,即可进入交易解析的过程。
/*
* 充值:from 地址为外部地址,to 地址为用户地址
* 提现:from 地址为热钱包地址,to 地址为外部地址
* 归集:from 地址为用户地址,to 地址为热钱包地址(默认热钱包地址为归集地址)
* 热转冷:from 地址为热钱包地址,to 地址为冷钱包地址
* 冷转热:from 地址为冷钱包地址,to 地址为热钱包地址
*/
/*核心管道,存放一批次的交易,map 中的 key 为业务方 id*/
businessChannels chan map[string]*BatchTransactions
blocks
表中。然后清理上一批次的交易 headers
列表,使同步器能够进行下一次同步区块。
/*处理这一批次区块*/
err := syncer.processBatch(syncer.headers)
/*成功则清空 headers,进入到下一轮*/
if err == nil {
syncer.headers = nil
}
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!