开发者社区> 问答> 正文

Canal Adapter RDB 同步时sql执行出错没有rollback

环境信息

canal version 1.1.3 mysql version 5.7.20

问题描述

RdbSyncService 类中

public void sync(List dmls, Function<Dml, Boolean> function) { try { boolean toExecute = false; for (Dml dml : dmls) { if (!toExecute) { toExecute = function.apply(dml); } else { function.apply(dml); } } if (toExecute) { List futures = new ArrayList<>(); for (int i = 0; i < threads; i++) { int j = i; futures.add(executorThreads[i].submit(() -> { dmlsPartition[j] .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml)); //FIXME 在这里进行commit操作,如果有一个执行失败了,如何处理SQL rollback? batchExecutors[j].commit(); return true; })); }

            futures.forEach(future -> {
                try {
                    future.get();
                } catch (Exception e) {
                    //FIXME 如果这里只打印错误消息,SQL执行出错,但是认为从MQ成功获取了消息
                    logger.error(e.getMessage(), e);
                }
            });

            for (int i = 0; i < threads; i++) {
                dmlsPartition[i].clear();
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
        //TODO 如果执行失败,抛出异常,保证消息消费的一致性
    }
}

1、dmlsPartition执行完以后直接commit ,如果有dmlsPartition执行失败的情况下,如何rollback?

2、出错了以后只打印错误信息,没有抛出异常,那么会认为消息消费成功了(但是SQL执行出错了),这样就可能造成数据同步不一致

是不是考虑把commit操作放在 dmlsPartition[i].clear(); 的循环里面执行?

另外在出错后是不是应该都重新抛出RuntimeException异常? 如果dmlsPartition[i].clear();执行失败呢?

原提问者GitHub用户flackyang

展开
收起
古拉古拉 2023-05-08 16:44:16 82 0
1 条回答
写回答
取消 提交回答
  • 最新代码已修复。做了部分修复,多种关系数据库的PK约束冲突的错误码需要后续慢慢补充。

    原回答者GitHub用户agapple

    2023-05-09 18:15:24
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载