开发者社区> 问答> 正文

MySQL binlog 表名大小写 产生的kafka动态匹配规则错误

环境信息

canal version 1.1.3 mysql version 5.7

问题描述

在集成kafka的情况下,canal在拉取到binlog之后,匹配topic的时候,如果topic字母相同但大小写不同会引发kafka的日志恢复并resize,报错如下:(之前已有一个topic:redis_cachecloud_QRTZ_FIRED_TRIGGERS) ERROR Error while creating log for redis_cachecloud_qrtz_fired_triggers-0 in dir E:\kafka\logs\kafka2 (kafka.server.LogDirFailureChannel) java.io.IOException: 请求的操作无法在使用用户映射区域打开的文件上执行 at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.AbstractIndex$$anonfun$resize$1.apply$mcZ$sp(AbstractIndex.scala:186) at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:173) at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:173) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.log.AbstractIndex.resize(AbstractIndex.scala:173) at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcZ$sp(AbstractIndex.scala:242) at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:242) at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:242) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241) at kafka.log.LogSegment.recover(LogSegment.scala:377) at kafka.log.Log.kafka$log$Log$$recoverSegment(Log.scala:467) at kafka.log.Log.kafka$log$Log$$recoverLog(Log.scala:581) at kafka.log.Log$$anonfun$2.apply$mcJ$sp(Log.scala:552) at kafka.log.Log$$anonfun$2.apply(Log.scala:552) at kafka.log.Log$$anonfun$2.apply(Log.scala:552) at kafka.log.Log.retryOnOffsetOverflow(Log.scala:1938) at kafka.log.Log.loadSegments(Log.scala:551) at kafka.log.Log.(Log.scala:276) at kafka.log.Log$.apply(Log.scala:2071) at kafka.log.LogManager$$anonfun$getOrCreateLog$1.apply(LogManager.scala:691) at kafka.log.LogManager$$anonfun$getOrCreateLog$1.apply(LogManager.scala:659) at scala.Option.getOrElse(Option.scala:121) at kafka.log.LogManager.getOrCreateLog(LogManager.scala:659) at kafka.cluster.Partition$$anonfun$getOrCreateReplica$1.apply(Partition.scala:199) at kafka.cluster.Partition$$anonfun$getOrCreateReplica$1.apply(Partition.scala:195) at kafka.utils.Pool$$anon$2.apply(Pool.scala:61) at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) at kafka.utils.Pool.getAndMaybePut(Pool.scala:60) at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:194) at kafka.cluster.Partition$$anonfun$makeFollower$1$$anonfun$apply$mcZ$sp$3.apply(Partition.scala:439) at kafka.cluster.Partition$$anonfun$makeFollower$1$$anonfun$apply$mcZ$sp$3.apply(Partition.scala:439) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.cluster.Partition$$anonfun$makeFollower$1.apply$mcZ$sp(Partition.scala:439) at kafka.cluster.Partition$$anonfun$makeFollower$1.apply(Partition.scala:431) at kafka.cluster.Partition$$anonfun$makeFollower$1.apply(Partition.scala:431) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at kafka.cluster.Partition.makeFollower(Partition.scala:431) at kafka.server.ReplicaManager$$anonfun$makeFollowers$3.apply(ReplicaManager.scala:1244) at kafka.server.ReplicaManager$$anonfun$makeFollowers$3.apply(ReplicaManager.scala:1238) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:1238) at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1076) at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:185) at kafka.server.KafkaApis.handle(KafkaApis.scala:110) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:748)

步骤重现

既然数据库不区分大小写,那为何动态匹配规则的时候要弄成两个topic?可否修改?

期待结果

正确匹配

现执行情况

broker报错

我查看了binlog,确实是同一张表,但是记录却是大小写不一致,结果导致canal认为是两张表的修改,发到kafka的时候就会出现错误,后来我在canal的MQMessageUtils类里动态匹配topic那里修改了,直接全部通过String的toLowerCase()改为小写并清除kafka log,重启broker之后就能正常运行了

原提问者GitHub用户finefuture

展开
收起
古拉古拉 2023-05-08 14:16:44 122 0
2 条回答
写回答
取消 提交回答
  • MQMessageUtils我看都是默认忽略了大小写进行匹配的,filter和simpleName的匹配都是

    原回答者GitHub用户agapple

    2023-05-09 17:55:28
    赞同 展开评论 打赏
  • 这个问题可能是因为 Windows 系统的文件名大小写不敏感导致的。在 Windows 系统中,redis_cachecloud_QRTZ_FIRED_TRIGGERSredis_cachecloud_qrtz_fired_triggers 实际上被视为相同的文件名。但是在 Kafka 中,每个 topic 都会对应一个独立的数据目录,如果两个 topic 的名称只有大小写不同,实际上会被视为两个不同的目录,导致 Kafka 在写入数据时出现问题。因此,建议在创建 topic 时使用统一的大小写规范,避免出现大小写不同但名称相同的情况,这样可以避免这个问题的发生。

    2023-05-08 15:51:02
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载

相关镜像