通知业务指得是,交易所钱包将交易的状态变更情况通知上层调用者,告知他们某笔交易的状态如何。
通知业务指得是,交易所钱包将交易的状态变更情况通知上层调用者,告知他们某笔交易的状态如何。
完整项目 github
地址(如果对您有用,请给个小 star
⭐️):
exchange-wallet-service
:钱包业务层服务: <https://github.com/Shawn-Shaw-x/exchange-wallet-service>signature-machine
:离线签名机: <https://github.com/Shawn-Shaw-x/signature-machine>chains-union-rpc
:多链统一 rpc
: <https://github.com/Shawn-Shaw-x/chains-union-rpc>在交易所的通知业务中,钱包的通知实现相对比较简单。因为我们所有的交易请求都是有项目方(或者交易所业务层)请求调用的,为了保证我们整个交易所系统的安全性,所以我们钱包业务中,不会去主动构造交易。 所以说,我们在进行发起链上交易交互的时候,我们钱包业务中,所需要做的就是:给上层调用者发送通知,告诉他们这笔交易的状态是怎样的,方便他们进行相关业务操作。 例如:
交易所钱包系统中,我们的交易发现器会将交易从链上发现,并且变更交易的状态到数据库中。
通过查询数据库获取需要发送通知但还未通知的交易。例如充值上账、充值确认、提现已广播、交易回滚等。
构建相应的通知内容,通过项目方(业务层)配置的 http 接口进行发送出去
如果发送成功,则变更交易的通知状态为已成功通知
来上代码:
/*启动通知任务*/
func (nf *Notifier) Start() error {
log.Info("start notifier worker...")
nf.tasks.Go(func() error {
for {
select {
case <-nf.ticker.C:
var txn []Transaction
/*每个项目方去查询相应业务表,发出通知*/
for _, businessId := range nf.businessIds {
log.Info("start notifier business", "business", businessId, "txn", txn)
/*查出应通知的充值交易*/
needNotifyDeposits, err := nf.db.Deposits.QueryNotifyDeposits(businessId)
if err != nil {
log.Error("Query notify deposits fail", "err", err)
}
/*查出应通知的提现*/
needNotifyWithdraws, err := nf.db.Withdraws.QueryNotifyWithdraws(businessId)
if err != nil {
log.Error("Query notify withdraw fail", "err", err)
}
/*查出应通知的内部交易*/
needNotifyInternals, err := nf.db.Internals.QueryNotifyInternal(businessId)
if err != nil {
log.Error("Query notify internal fail", "err", err)
}
/*构建通知请求体*/
notifyRequest, err := nf.BuildNotifyTransaction(needNotifyDeposits, needNotifyWithdraws, needNotifyInternals)
if err != nil {
log.Error("Build notify transaction fail", "err", err)
}
if notifyRequest.Txn == nil || len(notifyRequest.Txn) == 0 {
log.Warn("no notify transaction to notify, wait for notify")
continue
}
/*发送通知*/
notify, err := nf.notifier[businessId].BusinessNotify(notifyRequest)
if err != nil {
log.Error("notify business platform fail", "err", err)
}
log.Info("After notify", "business", businessId, "notifyStatus", notify, "deposits", needNotifyDeposits, "err", err)
err = nf.AfterNotify(businessId, notify, needNotifyDeposits, needNotifyWithdraws, needNotifyInternals)
if err != nil {
log.Error("change notified status fail", "err", err)
}
}
case <-nf.resourceCtx.Done():
log.Info("notifier worker shutting down")
return nil
}
}
})
return nil
}
/*通知之前:更新通知前状态*/
func (nf *Notifier) AfterNotify(businessId string, notifySuccess bool, deposits []*database.Deposits, withdraws []*database.Withdraws, internals []*database.Internals) error {
if !notifySuccess {
log.Warn("notify business platform fail", "business", businessId)
return fmt.Errorf("notify business platform fail, businessId: %v", businessId)
}
depositsNotifyStatus := constant.TxStatusNotified
withdrawNotifyStatus := constant.TxStatusNotified
internalNotifyStatus := constant.TxStatusNotified
// 过滤状态为 0 的交易
var updateStutusDepositTxn []*database.Deposits
for _, deposit := range deposits {
if deposit.Status != constant.TxStatusCreateUnsigned {
updateStutusDepositTxn = append(updateStutusDepositTxn, deposit)
}
}
/*更新通知前状态(待通知)*/
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
if _, err := retry.Do[interface{}](nf.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
if err := nf.db.Transaction(func(tx *database.DB) error {
if len(deposits) > 0 {
if err := tx.Deposits.UpdateDepositsStatusByTxHash(businessId, depositsNotifyStatus, updateStutusDepositTxn); err != nil {
return err
}
}
if len(withdraws) > 0 {
if err := tx.Withdraws.UpdateWithdrawStatusByTxHash(businessId, withdrawNotifyStatus, withdraws); err != nil {
return err
}
}
if len(internals) > 0 {
if err := tx.Internals.UpdateInternalStatusByTxHash(businessId, internalNotifyStatus, internals); err != nil {
return err
}
}
return nil
}); err != nil {
log.Error("unable to persist batch", "err", err)
return nil, err
}
return nil, nil
}); err != nil {
return err
}
return nil
}
type NotifyRequest struct {
Txn []httpclient.Transaction `json:"txn"`
}
func main() { http.HandleFunc("/exchange-wallet/notify", func(w http.ResponseWriter, r *http.Request) { log.Println("📩 Received a request")
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "failed to read body", http.StatusInternalServerError)
return
}
defer r.Body.Close()
var req NotifyRequest
if err := json.Unmarshal(body, &req); err != nil {
http.Error(w, "invalid JSON", http.StatusBadRequest)
log.Println("❌ Invalid JSON:", err)
return
}
// 打印格式化的 JSON
fmt.Println("🧾 Parsed JSON request:")
pretty, _ := json.MarshalIndent(req, "", " ")
fmt.Println(string(pretty))
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"success":true}`))
})
addr := "127.0.0.1:9997/exchange-wallet/notify"
log.Println("🚀 Mock Notify Server listening on", addr)
if err := http.ListenAndServe("127.0.0.1:9997", nil); err != nil {
log.Fatal("❌ Server failed:", err)
}
}
2. **启动这个模拟程序**

3. **充值一笔试试,等待 10 个确认位**

至此,钱包的所有业务解析完毕!!!
`下篇文章来给大家总结下交易所钱包项目吧(画个巨大的图来整合所有业务模块)`
后续,钱包有一些 bug 以及优化什么的,等我慢慢 `fix` 吧 hhh
如果有愿意参加这个开源项目去参与贡献的,欢迎联系我(或者直接 `github` 上 `issue` 交流也可以的)
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!