环境信息
canal version 1.1.3 mysql version 5.7.20
问题描述
RdbSyncService 类中
public void sync(List dmls, Function<Dml, Boolean> function) { try { boolean toExecute = false; for (Dml dml : dmls) { if (!toExecute) { toExecute = function.apply(dml); } else { function.apply(dml); } } if (toExecute) { List futures = new ArrayList<>(); for (int i = 0; i < threads; i++) { int j = i; futures.add(executorThreads[i].submit(() -> { dmlsPartition[j] .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml)); //FIXME 在这里进行commit操作,如果有一个执行失败了,如何处理SQL rollback? batchExecutors[j].commit(); return true; })); }
futures.forEach(future -> {
try {
future.get();
} catch (Exception e) {
//FIXME 如果这里只打印错误消息,SQL执行出错,但是认为从MQ成功获取了消息
logger.error(e.getMessage(), e);
}
});
for (int i = 0; i < threads; i++) {
dmlsPartition[i].clear();
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
//TODO 如果执行失败,抛出异常,保证消息消费的一致性
}
}
1、dmlsPartition执行完以后直接commit ,如果有dmlsPartition执行失败的情况下,如何rollback?
2、出错了以后只打印错误信息,没有抛出异常,那么会认为消息消费成功了(但是SQL执行出错了),这样就可能造成数据同步不一致
是不是考虑把commit操作放在 dmlsPartition[i].clear(); 的循环里面执行?
另外在出错后是不是应该都重新抛出RuntimeException异常? 如果dmlsPartition[i].clear();执行失败呢?
原提问者GitHub用户flackyang
最新代码已修复。做了部分修复,多种关系数据库的PK约束冲突的错误码需要后续慢慢补充。
原回答者GitHub用户agapple
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。