NewPendingTransactions始末
Geth 提供了realtime event接口,通过websocket 链接节点之后,发送
{"id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}可以得到实时的pendingTransactions的hash.
eth/filters/api.go,eth/filters/filter_system.go,eth/backend.go入口从func (s *Ethereum) APIs() []rpc.API 开始.再早些的代码逻辑不在本次的讨论范围内.
func (s *Ethereum) APIs() []rpc.API {
    apis := ethapi.GetAPIs(s.APIBackend, s.blockchain)
    // Append any APIs exposed explicitly by the consensus engine
    apis = append(apis, s.engine.APIs(s.BlockChain())...)
    // Append all the local APIs and return
    return append(apis, []rpc.API{
                ...
         {
            Namespace: "eth",
            Version:   "1.0",
            Service:   filters.NewPublicFilterAPI(s.APIBackend, false, 5*time.Minute, s.config.RangeLimit),
            Public:    true,
        }, 
                ...}}}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(backend Backend, lightMode bool, timeout time.Duration, rangeLimit bool) *PublicFilterAPI {
    api := &PublicFilterAPI{
        backend:    backend,
        events:     NewEventSystem(backend, lightMode),
        filters:    make(map[rpc.ID]*filter),
        timeout:    timeout,
        rangeLimit: rangeLimit,
    }
    go api.timeoutLoop(timeout)
    return api
}
api.timeoutLoop 这里的这个goruntime 就是控制事件订阅超时关闭的工作,逻辑不在本次讨论范围内.
在创建PublicFilterApi的是成员events 由 NewEventSystem 创建.该实例管理了监听事件的启动和过滤
//filter_system.go  
func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
    m := &EventSystem{
        backend:       backend,
        lightMode:     lightMode,
        install:       make(chan *subscription),
        uninstall:     make(chan *subscription),
        txsCh:         make(chan core.NewTxsEvent, txChanSize),
        logsCh:        make(chan []*types.Log, logsChanSize),
        rmLogsCh:      make(chan core.RemovedLogsEvent, rmLogsChanSize),
        pendingLogsCh: make(chan []*types.Log, logsChanSize),
        chainCh:       make(chan core.ChainEvent, chainEvChanSize),
    }
    // Subscribe events
    m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
    m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
    m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
    m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
    m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
    // Make sure none of the subscriptions are empty
    if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
        log.Crit("Subscribe for event system failed")
    }
    go m.eventLoop()
    return m
}
EventSystem.install 监听新增的订阅事件ws连入的时候,通过install <- 来创建一个新的订阅
EventSystem.uninstall销毁阅订阅事件的管道
EventSystem.txsCh交易发生变换时写入的管道
其他的与newPendingTransactions没有太大关系,这里就不提及了
m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)这行代码,新建一个NewTxsEvent订阅,pool中交易发生变化时,会写入到txsCh.
go m.eventLoop()主要的生命周期goruntime,负责处理收到的信息,并分发给对应的filters
//filter_system.go 
// eventLoop (un)installs filters and processes mux events.
func (es *EventSystem) eventLoop() {
    // Ensure all subscriptions get cleaned up
    defer func() {
        es.txsSub.Unsubscribe()
                ...
    }()
    index := make(filterIndex)
    for i := UnknownSubscription; i < FullPendingTransactionsSubscription; i++ {
        index[i] = make(map[rpc.ID]*subscription)
    }
    for {
        select {
        case ev := <-es.txsCh://新交易handler
            es.handleTxsEvent(index, ev)
        ...
        case f := <-es.install: //新建订阅
            if f.typ == MinedAndPendingLogsSubscription {
                // the type are logs and pending logs subscriptions
                index[LogsSubscription][f.id] = f
                index[PendingLogsSubscription][f.id] = f
            } else {
                index[f.typ][f.id] = f
            }
            close(f.installed)
        case f := <-es.uninstall: //取消订阅
            if f.typ == MinedAndPendingLogsSubscription {
                // the type are logs and pending logs subscriptions
                delete(index[LogsSubscription], f.id)
                delete(index[PendingLogsSubscription], f.id)
            } else {
                delete(index[f.typ], f.id)
            }
            close(f.err)
        // System stopped
        case <-es.txsSub.Err():
            return
        case <-es.logsSub.Err():
            return
        case <-es.rmLogsSub.Err():
            return
        case <-es.chainSub.Err():
            return
        }
    }
}
index的数据类型是 type filterIndex map[Type]map[rpc.ID]*subscription可以理解为一个二维数组[x,y]x表示的是订阅的类型,y表示rpc session的唯一标识.虽然他是个map,但是完全可以用二维数组的思维理解.下面是所有的Type,即订阅的类型.newPendingTransactions 对应的订阅事件类型是PendingTransactionsSubscription
const (
    // UnknownSubscription indicates an unknown subscription type
    UnknownSubscription Type = iota
    // LogsSubscription queries for new or removed (chain reorg) logs
    LogsSubscription
    // PendingLogsSubscription queries for logs in pending blocks
    PendingLogsSubscription
    // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
    MinedAndPendingLogsSubscription
    // PendingTransactionsSubscription queries tx hashes for pending
    // transactions entering the pending state
    PendingTransactionsSubscription
    // BlocksSubscription queries hashes for blocks that are imported
    BlocksSubscription
    // LastSubscription keeps track of the last index
    LastIndexSubscription
)
es.handleTxsEvent(index, ev)为处理txsch 事件的处理函数,txsch实际上是一个[]*types.Transaction来看下对应的代码
//filter_system.go
func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
    hashes := make([]common.Hash, 0, len(ev.Txs))
    for _, tx := range ev.Txs {
        hashes = append(hashes, tx.Hash())
    }
    for _, f := range filters[PendingTransactionsSubscription] {
        f.hashes <- hashes
    }
}
直接从ev中copy Hashes之后全部写入到 index[PendingTransactionsSubscription]的subscription.hashes.即当前所有存活的类型为PendingTransactionsSubscription订阅者.事件分发的部分到上面这里就结束了.下面来看看用户是如何创建一个新订阅的
//eth/filters/api.go
// NewPendingTransactions creates a subscription that is triggered each time a transaction
// enters the transaction pool and was signed from one of the transactions this nodes manages.
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
    notifier, supported := rpc.NotifierFromContext(ctx)
    if !supported {
        return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
    }
    rpcSub := notifier.CreateSubscription()
    gopool.Submit(func() {
        txHashes := make(chan []common.Hash, 128)
        pendingTxSub := api.events.SubscribePendingTxs(txHashes)
        for {
            select {
            case hashes := <-txHashes:
                // To keep the original behaviour, send a single tx hash in one notification.
                // TODO(rjl493456442) Send a batch of tx hashes in one notification
                for _, h := range hashes {
                    notifier.Notify(rpcSub.ID, h)
                }
            case <-rpcSub.Err():
                pendingTxSub.Unsubscribe()
                return
            case <-notifier.Closed():
                pendingTxSub.Unsubscribe()
                return
            }
        }
    })
    return rpcSub, nil
}
当向rpc发送这个订阅命令的时候
{"id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}最后 调用的就是上面的NewPendingTransactions的方法.主要的核心逻辑看这里
txHashes := make(chan []common.Hash, 128)创建了一个读写的128容量的common.Hash 类型的管道,
pendingTxSub := api.events.SubscribePendingTxs(txHashes)创建一个新的PendingTxs订阅,该方法会将,创建好的subscription通过下面的subscribe方法发送给install,进行注册,即我们上文提到的index
// subscribe installs the subscription in the event broadcast loop.
func (es *EventSystem) subscribe(sub *subscription) *Subscription {
    es.install <- sub
    <-sub.installed
    return &Subscription{ID: sub.id, f: sub, es: es}
}
在for循环中等待读取 txHashes的消息,即subscription.hashes然后通过
notifier.Notify(rpcSub.ID, h)返回给对应RPC.ID的ws session.
到这里基本上就结束了,另外还有退出订阅一些接口,就不讲了.我看过几个版本的Geth,该版本是bsc 最新的release版本,之前也有看过Heco的geth版本但是也是老早之前的事情,不同版本的Geth在这方面的实现上有差异,但是基本流程差不多.看这个目的是为了FullPendingTransactionsSubscription监听交易的时候不用再去查一次,交易的详细数据.效果如图

如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!