Venus 获取deadline信息并进行windowPost计算

damocles-mamanger启动后会调用run方法,方法里面有一个for循环。不停的监听ChainNotify信息,当有区块产出或者别的headchain的事件发生后,就会调用fetchMinerProvingDeadlineInfos()方法,主动去从链节点获取deadline信息,然后调用

每个sector封装完毕以后,会放到一个partition里面进行接受挑战,2349个sector一般情况下是一个矿工是在24小时后内无法封装完成的,但是协议要求24小时以后几要开始做winningpost了。做winngpost时,会锁定partition,封装完成的sector将不能写进锁定的partition。将会写到另外一个partition。也就是说,2349个sector虽然理论上面只需要一个deadline进行挑战,但是因为上面的原因,2349个sector会被分配到两个以上的partition里面,所以2349个sector的windowpost的挑战,大概率会占有两个以上的deadline。

哪个sector在哪个deadline被挑战(要求做dindowpost),都记载在一个bitfield的结构体并存储在链上。而sector被挑战的具体点,则是根据epoch的不一样,去drand获取一个随机数,根据这个随机数来选择sector被挑战的具体点。

damocles-mamanger启动后会调用run方法,方法里面有一个for循环。不停的监听ChainNotify信息,当有区块产出或者别的headchain的事件发生后,就会调用fetchMinerProvingDeadlineInfos()方法,主动去从链节点获取deadline信息,然后调用handleHeadChange()方法,处理headchange事件。在处理headechange事件的过程中,会根据当下的epoch信息,选择调用runner.start()进行winpost的proof生成或者runner.submit()方法把生的winpost proof提交上链。

deadline信息其实没有什么,主要的信息就是每个actorid的起始时间不一样,从而导致其24小时的,48个proving period的开始epooch不一样而已。deadline里面主要就是当下的deadline的startepoch信息。

启动for循环,监听chainnotify信息。

func (p *PoSter) Run(ctx context.Context) {
            ....
            .....
            ch, err := p.deps.chain.ChainNotify(ctx)
            .......
            .......
            // 从链上获取deadline信息
            dinfos := p.fetchMinerProvingDeadlineInfos(ctx, mids, highest)
			if len(dinfos) == 0 {
				continue CHAIN_HEAD_LOOP
			}
        
        //处理headchange消息。一般是有了新的区块出现。
	p.handleHeadChange(ctx, lowest, highest, dinfos)
}
func (p *PoSter) handleHeadChange(ctx context.Context, revert *types.TipSet, advance *types.TipSet, dinfos map[abi.ActorID]map[abi.ChainEpoch]*dline.Info) {

        // cleanup
	for mid := range p.schedulers {
        ....
        ....
        		for open := range scheds {
			sched := scheds[open]
                        
                        // 某个actorid已经开始runner任务以获取deadline的任务,所以删除掉,不再次开启runner任务
			if dls, dlsOk := dinfos[mid]; dlsOk {
				// 尝试开启的 deadline 已经存在
				if _, dlOk := dls[open]; dlOk {
					delete(dls, open)
				}
			}

                           .......
                           .......

			// post config 由于某种原因缺失时,无法触发 start 或 submit
			if pcfg != nil && sched.isActive(currHeight) {
				if sched.shouldStart(pcfg, currHeight) {
					sched.runner.start(pcfg, advance)
				}

				if sched.couldSubmit(pcfg, currHeight) {
					sched.runner.submit(pcfg, advance)
				}
			}
        ....
        ....
        
        }
        
        .....
        ....
        for mid := range dinfos {
        .....
        .....
        dls := dinfos[mid]
		for open := range dls {
			dl := dls[open]

			sched := newScheduler(
				dl,
				p.runnerConstructor(
					ctx,
					p.deps,
					mid,
					maddr,
					minfo.WindowPoStProofType,
					dl,
				),
			)

			if _, ok := p.schedulers[mid]; !ok {
				p.schedulers[mid] = map[abi.ChainEpoch]*scheduler{}
			}

			mdLog.Debugw("init deadline runner", "open", dl.Open)
			p.schedulers[mid][dl.Open] = sched
			if sched.isActive(currHeight) && sched.shouldStart(pcfg, currHeight) {
				sched.runner.start(pcfg, advance) // 开启runner任务,计算windowpost proof
			}
        .....
        .....
        }
}
func (pr *postRunner) start(pcfg *modules.MinerPoStConfig, ts *types.TipSet) {
        // 因为是sync.once 对象,所以只会运行一次。即:计算一次windowpost proof
	pr.startOnce.Do(func() {
		pr.startCtx.pcfg = pcfg
		pr.startCtx.ts = ts

		baseLog := pr.log.With("tsk", ts.Key(), "tsh", ts.Height())

		go pr.handleFaults(baseLog)
		go pr.generatePoSt(baseLog)
	})
}

// 这里计算windowpost proof
func (pr *postRunner) generatePoStForPartitionBatch(glog *logging.ZapLogger, rand core.WindowPoStRandomness, batchIdx int, batch []chain.Partition, batchPartitionStartIdx int) {
....
....
....
....
//有些类型的失败,会重新计算windowpost proof
for attempt := 0; ; attempt++ {
		alog := pblog.With("attempt", attempt)
                /
		needRetry, err := proveAttempt(alog)
		if err != nil {
			alog.Errorf("attempt to generate window post proof: %v", err)
		}

		if !needRetry {
			alog.Info("partition batch done")
			break
		}

		select {
		case <-pr.ctx.Done():
			return

		case <-time.After(5 * time.Second):
		}

		alog.Debug("retry partition batch")
	}
}

当runner.start()方法真正的生成了windowpost proof,pr.proofs.proofs就不会是nil。就可以提交到链上了。

func (pr *postRunner) submit(pcfg *modules.MinerPoStConfig, ts *types.TipSet) {
	// check for proofs
	pr.proofs.Lock()
	proofs := pr.proofs.proofs
	pr.proofs.proofs = nil
	pr.proofs.Unlock()

	if proofs == nil {
		return
	}

	go pr.submitPoSts(pcfg, ts, proofs)
}

func (pr *postRunner) submitSinglePost(slog *logging.ZapLogger, pcfg *modules.MinerPoStConfig, proof *miner.SubmitWindowedPoStParams) {
	uid, resCh, err := pr.publishMessage(stbuiltin.MethodsMiner.SubmitWindowedPoSt, proof, false)
	if err != nil {
		slog.Errorf("publish post message: %v", err)
		return
	}
        ....
        ...
        .....
}

windowpost proof不会直接提交到链节点,而是先提交给messager,messager把收到的windowpost proof信息,放到自己的数据库后,方法就会返回成功。
但是这个时候,信息还没有上链,所以会调用waitmessage()方法,等待消息上链。

func (pr *postRunner) publishMessage(method abi.MethodNum, params cbor.Marshaler, useExtraMsgID bool) (string, <-chan msgResult, error) {
    .....
    .....
    uid, err := pr.deps.msg.PushMessageWithId(pr.ctx, mid, &msg, &spec)
	if err != nil {
		return "", nil, fmt.Errorf("push msg with id %s: %w", mid, err)
	}

    .....
    m, err := pr.waitMessage(uid, pr.startCtx.pcfg.Confidence)
    .....
}
点赞 1
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
杜满想Elvin
杜满想Elvin
老程序员,区块链架构师