钱包业务层 - 2. 实现交易所扫链区块同步器

扫链同步器是一个钱包业务的核心。无论是充值、提现、归集、还是热转冷、冷转热和回滚业务,都是需要和链上交易数据进行交互。而我们如何获取链上的交易数据呢,扫链同步器就是获取链上交易数据到本地环境上的一个核心组件。可以说,扫链同步器是一个交易所业务系统的核心驱动引擎。

扫链同步器

在交易所中,扫链同步器是一个钱包业务的核心。无论是充值、提现、归集、还是热转冷、冷转热和回滚业务,都是需要和链上交易数据进行交互。而我们如何获取链上的交易数据呢,扫链同步器就是获取链上交易数据到本地环境上的一个核心组件。可以说,扫链同步器是一个交易所业务系统的核心驱动引擎。

完整项目 github 地址(如果对您有用,请给个小 star):

  1. exchange-wallet-service:钱包业务层服务: <https://github.com/Shawn-Shaw-x/exchange-wallet-service>
  2. signature-machine:离线签名机:  <https://github.com/Shawn-Shaw-x/signature-machine>
  3. chains-union-rpc:多链统一 rpc:  <https://github.com/Shawn-Shaw-x/chains-union-rpc>

扫链同步器流程图

image.png 同步器内主要做两件事:

  1. 同步区块头: 根据设置的同步步长,去批量同步区块到本地数据库中,并保证区块的正确性(通过检查区块头中的 parentHash 和前一个区块头中的 Hash 是否一致)

  2. 解析交易: 拿到批量的区块头列表后,可以根据区块头的块高,调用接口获取这个区块的所有交易。然后对交易中的与我们项目方有关的充值、提现、归集、热转冷、冷转热交易进行过滤筛选,打上标记推送到一个同步管道中。供我们后续的相关任务进行使用。

扫链同步器实现步骤

  • worker 下,建立 synchronizer.go 文件 核心数据结构为一个管道,用于存放每个项目方的需要处理的批量交易

      核心管道,存放一批次的交易,map 中的 key 为业务方 id*/
      buinessChannels chan map[string]*BatchTransactions
  • cli.go 中集成启动扫链同步的任务

    {
        Name:        "work",
        Flags:       flags,
        Description: "Run rpc scanner wallet chain node",
        Action:      cliapp.LifecycleCmd(runAllWorker),
    },
  • 使用定时任务启动 扫链同步器

        /*定时任务*/
    syncer.worker = clock.NewLoopFn(clock.SystemClock, syncer.tick, func() error {
        log.Info("shutting down synchronizer produce...")
        close(syncer.businessChannels)
        return nil
    }, syncer.loopInterval)
  • 调用封装的方法,通过 chains-union-rpc 接口批量获取区块头,并且判断链上是否出现回滚情况。 如果出现某个区块的 parentHash 不等于上一个区块的 hash 则认为出现链回滚(重组的情况), 则同步器会空转,无法获取到新的一批区块,直到重组区块被处理完成。(通过 lastTraversalBlockHeader 来进行标记处理)

    /*headers 只有一个数据的情况(边界情况):
    元素的 parentHash != lastTraversedHeader 的 Hash
    则说明发生链重组-->触发 fallback*/
    if len(headers) == 1 && f.lastTraversedHeader != nil && headers[0].ParentHash != f.lastTraversedHeader.Hash {
      log.Warn("lastTraversedHeader and header zero: parentHash and hash", "parentHash", headers[0].ParentHash, "Hash", f.lastTraversedHeader.Hash)
      return nil, blockHeader, true, ErrBlockFallBack
    }
    /*如果发现第 i 个 header 与 i-1 个不连续(parentHash 不匹配),
    也说明链断开或被重组。*/
    if len(headers) > 1 && headers[i-1].Hash != headers[i].ParentHash {
      log.Warn("headers[i-1] nad headers[i] parentHash and hash", "parentHash", headers[i].ParentHash, "Hash", headers[i-1].Hash)
      return nil, blockHeader, true, ErrBlockFallBack
        }
  • 区块头批量扫描完成后,即可进入交易解析的过程。

    1. 循环遍历区块头列表,每个区块获取这个区块内的交易
    2. 按照项目方匹配这个区块内的交易,匹配规则如下:
      /*
      * 充值:from 地址为外部地址,to 地址为用户地址
      * 提现:from 地址为热钱包地址,to 地址为外部地址
      * 归集:from 地址为用户地址,to 地址为热钱包地址(默认热钱包地址为归集地址)
      * 热转冷:from 地址为热钱包地址,to 地址为冷钱包地址
      * 冷转热:from 地址为冷钱包地址,to 地址为热钱包地址
      */
    3. 标记完交易后,所有项目方的筛选后的交易都放到一个核心的交易管道中,供后续的充值、提现、归集、热转冷、冷转热任务所使用。
      /*核心管道,存放一批次的交易,map 中的 key 为业务方 id*/
      businessChannels chan map[string]*BatchTransactions
    4. 交易推送完后,还需要对所解析的区块进行存库,存储到 blocks 表中。然后清理上一批次的交易 headers 列表,使同步器能够进行下一次同步区块。
      /*处理这一批次区块*/
      err := syncer.processBatch(syncer.headers)
      /*成功则清空 headers,进入到下一轮*/
      if err == nil {
          syncer.headers = nil
      }

      扫块测试

      • 启动扫链同步器服务

scanBlocksRequest.png

scanBlocksResponse.png

点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
shawn_shaw
shawn_shaw
web3潜水员、技术爱好者、web3钱包开发工程师。欢迎闲聊唠嗑、精进技术、交流工作机会。vx:cola_ocean