现象: node在insert 时出现异常触发manager挂起,不处理这个异常,紧接着直接在manager界面上面点击解挂,2次这样操作后channel 状态显示正常,但实际上channel已经假死不工作。
原因: 出现异常rollback,触发SelectTask.java里面 if (rversion.get() != startVersion) {// 说明存在过变化,中间出现过rollback,需要丢弃该数据 logger.warn("rollback happend , should skip this data and get new message."); canStartSelector.get();// 确认一下rollback是否完成 gotMessage = otterSelector.selector();// 这时不管有没有数据,都需要执行一次s/e/t/l } ,同时rollback会发送错误给manager,更改channel状态为PAUSE。如果在ExtractMemoryArbitrateEvent中channel状态已经变成了PAUSE, 会直接丢掉数据不进行任何处理,EventStore里面的读取位置点没有被rollback.
原提问者GitHub用户funnyAnt
调试发现不是EventStore的回滚的问题,是ExtractMemoryArbitrateEvent.java里面发现channel是PAUSE态直接把信号包丢弃掉,没有remove掉MemoryStageController里面相关的processId,
ExtractMemoryArbitrateEvent里面的代码修改如下:
if (status.isStart()) {// 即时查询一下当前的状态,状态随时可能会变 ...... } else { logger.warn("pipelineId[{}] select ignore processId[{}] by status[{}]", new Object[] { pipelineId, processId, status }); // 释放下processId,因为MemoryStageController的load是等待processId最小值完成Tranform才继//续,如果这里不释放,会一直卡死等待 stageController.clearProgress(processId); return await(pipelineId);// 递归调用 } 仅仅stageController.clearProgress(processId); 移除掉processId还是有问题,一是select stage信号量会丢掉1个,二是eventStore里面的get值没有回置。 测试发现把pipeline里面的并发度改成1,卡死的问题会稳定出现。
f512a3f 这个里面,做了一些修改, SelectTask.java里面增加sleep,等待channel变成start态,尽量避免走丢信号量流程。
****MemoryArbitrateEvent.java里面把stageController.clearProgress(processId)调用改成stageController.termin(TerminType.ROLLBACK);
// 进行ROLLBACK,触发释放下processId,信号量及EventStore里面的读位置点。
// 1)因为MemoryStageController的load是等待processId最小值完成Tranform才继续,如果这里不释放,会一直卡死等待
// 2)SELECT信号量消耗完selectTask任务会停止3)EventStore里面的读位置点不回置,如果正好队列已经满并且读取了最后,BINLOG新的数据进不来
stageController.termin(TerminType.ROLLBACK);
原回答者GitHub用户funnyAnt
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。