开发者社区> 问答> 正文

并行解析模式下,获取meta逻辑存在线程安全问题

环境信息

canal version 1.1.2 mysql version 5.7

问题描述

并行解析模式下,获取meta逻辑存在线程安全问题. connection是线程非安全的,当meta过期时会触发异常。 非tsdb模式逻辑如下,tsdb模式同理:

private TableMeta getTableMetaByDB(String fullname) throws IOException { try { ResultSetPacket packet = connection.query("show create table " + fullname); String[] names = StringUtils.split(fullname, "."); String schema = names[0]; String table = names[1].substring(0, names[1].length()); return new TableMeta(schema, table, parseTableMeta(schema, table, packet)); } catch (Throwable e) { // fallback to desc table ResultSetPacket packet = connection.query("desc " + fullname); String[] names = StringUtils.split(fullname, "."); String schema = names[0]; String table = names[1].substring(0, names[1].length()); return new TableMeta(schema, table, parseTableMetaByDesc(packet)); } }

异常栈如下:

2019-05-31 21:34:51.678 [destination =, address = , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:* [com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.google.common.util.concurrent.UncheckedExecutionException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:stress_test.d1 Caused by: com.google.common.util.concurrent.UncheckedExecutionException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:stress_test.d1 at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) at com.google.common.cache.LocalCache.get(LocalCache.java:3937) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3941) at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4824) at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4830) at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache.getTableMeta(TableMetaCache.java:196) at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.getTableMeta(LogEventConvert.java:956) at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parseRowsEventForTableMeta(LogEventConvert.java:504) at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parseRowsEvent(LogEventConvert.java:525) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor$DmlParserStage.onEvent(MysqlMultiStageCoprocessor.java:330) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor$DmlParserStage.onEvent(MysqlMultiStageCoprocessor.java:316) at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:stress_test.d1 Caused by: java.io.IOException: should execute connector.connect() first at com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor.(MysqlQueryExecutor.java:30) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.query(MysqlConnection.java:104) at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache.getTableMetaByDB(TableMetaCache.java:93) at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache.access$000(TableMetaCache.java:32) at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache$1.load(TableMetaCache.java:63) at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache$1.load(TableMetaCache.java:53) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) at com.google.common.cache.LocalCache.get(LocalCache.java:3937) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3941) at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4824) at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4830) at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache.getTableMeta(TableMetaCache.java:196) at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.getTableMeta(LogEventConvert.java:956) at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parseRowsEventForTableMeta(LogEventConvert.java:504) at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parseRowsEvent(LogEventConvert.java:525) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor$DmlParserStage.onEvent(MysqlMultiStageCoprocessor.java:330) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor$DmlParserStage.onEvent(MysqlMultiStageCoprocessor.java:316) at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

Steps to reproduce 制造meta过期场景即可。如: binlog中的字段数量与mysql中的字段数量不一致,此时重启canal进程(非tsdb)

解决这个问题pr如下: 同步方式获取meta #1866

原提问者GitHub用户lanxinxu

展开
收起
云上静思 2023-05-04 13:03:03 142 0
1 条回答
写回答
取消 提交回答
  • 代码已合并

    原回答者GitHub用户agapple

    2023-05-05 10:41:24
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
ECS块储存产品全面解析 立即下载
阿里云HBase产品体系架构及特性解析 立即下载
多IO线程优化版 立即下载

相关镜像