同步服务模块 handler
同步模块的消息handler 负责处理从 net 模块监听到的 NetMsg_SYNC_BLOCK_MSG 类型的消息
func (sync *BlockChainSyncServer) blockSyncMsgHandler(from string, msg []byte, msgType netPb.NetMsg_MsgType) error {
// 检查服务是否启动
check sync.start
// 检查消息类型
check msgType != netPb.NetMsg_SYNC_BLOCK_MSG
// 解析 msg
unmarshal msg -> syncMsg
switch syncMsg.Type {
case syncPb.SyncMsg_NODE_STATUS_REQ:
// 返回本地高度
return sync.handleNodeStatusReq(from)
case syncPb.SyncMsg_NODE_STATUS_RESP:
// 在定时器任务队列中添加 NodeStatusMsg 任务,启动处理流程
return sync.handleNodeStatusResp(&syncMsg, from)
case syncPb.SyncMsg_BLOCK_SYNC_REQ:
// 返回区块数据(封装在 SyncMsg 中,包含相应的业务数据)
return sync.handleBlockReq(&syncMsg, from)
case syncPb.SyncMsg_BLOCK_SYNC_RESP:
// 在定时器任务队列中添加 SyncedBlockMsg 任务,启动处理流程
return sync.scheduler.addTask(&SyncedBlockMsg{msg: syncMsg.Payload, from: from})
}
}
7.2.2.3. 调度服务 handler
func (sch *scheduler) handler(event queue.Item) (queue.Item, error) {
switch msg := event.(type) {
case NodeStatusMsg:
// NodeStatusMsg任务流
// 更新 peers 中记录的节点ID和高度,更新 blockStates 初始化为 newBlock
sch.handleNodeStatus(msg)
case LivenessMsg:
// pending 状态区块超时检查
sch.handleLivinessMsg()
case SchedulerMsg:
// 发送 SyncMsg_BLOCK_SYNC_REQ 请求到远端
return sch.handleScheduleMsg()
case *SyncedBlockMsg:
// ReceivedBlocks任务流
// 更新高度对应区块状态为 receivedBlock,封装 ReceivedBlocks 数据提交给 processor 消息队列,由 handleReceivedBlocks 函数处理
return sch.handleSyncedBlockMsg(msg)
case ProcessedBlockResp:
// 处理 processor 消息队列提交的 ProcessedBlockResp 消息,检查本地区块写入结果,更新 pendingRecvHeight
return sch.handleProcessedBlockResp(msg)
case DataDetection:
// 检查 blockStates 数据,删除低于本地最高快的缓存数据
sch.handleDataDetection()
}
}
处理服务 handler
func (pro *processor) handler(event queue.Item) (queue.Item, error) {
switch msg := event.(type) {
case *ReceivedBlocks:
// 将区块数据写入 processor.queue 队列
pro.handleReceivedBlocks(msg)
case ProcessBlockMsg:
// ProcessBlockMsg 工作流
// 从 processor.queue 中取出 pendingBlockHeight 块高的数据,
// 调用 validateAndCommitBlock 写入本地账本,封装 ProcessedBlockResp 消息返回
return pro.handleProcessBlockMsg()
case DataDetection:
// 检查 processor.queue,删除低块
pro.handleDataDetection()
}
}
定时任务的独立线程
func (sync *BlockChainSyncServer) loop() {
var (
// 定时任务:触发区块处理工作流
doProcessBlockTk = time.NewTicker(sync.conf.processBlockTick)
// 定时任务: 触发远端状态查询工作流
doScheduleTk = time.NewTicker(sync.conf.schedulerTick)
// 定时任务: 广播高度查询消息
doNodeStatusTk = time.NewTicker(sync.conf.nodeStatusTick)
// 定时任务: 触发查询请求超时校验逻辑
doLivenessTk = time.NewTicker(sync.conf.livenessTick)
// 定时任务: 触发定时器调度服务中本地区块状态字典的高度检查,舍弃低于本地最高快的 kv 对
doDataDetect = time.NewTicker(sync.conf.dataDetectionTick)
)
for {
select {
case <-sync.close:
return
// Timing task
case <-doProcessBlockTk.C:
sync.processor.addTask(ProcessBlockMsg{})
case <-doScheduleTk.C:
sync.scheduler.addTask(SchedulerMsg{})
case <-doLivenessTk.C:
sync.scheduler.addTask(LivenessMsg{})
case <-doNodeStatusTk.C:
sync.broadcastMsg(syncPb.SyncMsg_NODE_STATUS_REQ, nil)
case <-doDataDetect.C:
sync.processor.addTask(DataDetection{})
sync.scheduler.addTask(DataDetection{})
// 消息中转和消费逻辑
case resp := <-sync.scheduler.out:
sync.processor.addTask(resp)
case resp := <-sync.processor.out:
sync.scheduler.addTask(resp)
}
}
}