开发者社区> 问答> 正文

canal投递kafka消息(FlatMessage) 可能会存在同一分区消息乱序。

版本:1.1.4-SNAPSHO 描述:canal投递kafka(库.表 多分区模式) 动态topic 单表数据 按照id分片 投递消息为flatMessage可能会存在 同一个topic下同一分区消息乱序,核心代码如下:

// 发送扁平数据json List flatMessages = MQMessageUtils.messageConverter(message); List records = new ArrayList(); if (flatMessages != null) { for (FlatMessage flatMessage : flatMessages) { if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) { FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage, canalDestination.getPartitionsNum(), canalDestination.getPartitionHash()); int length = partitionFlatMessage.length; for (int i = 0; i < length; i++) { FlatMessage flatMessagePart = partitionFlatMessage[i]; if (flatMessagePart != null) { records.add(new ProducerRecord<String, String>(topicName, i, null, JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue))); } } } else { final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0; records.add(new ProducerRecord<String, String>(topicName, partition, null, JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue))); }

            // 每条记录需要flush
            produce(topicName, records, true);
            records.clear();
        }
    }

投递kafka端代码:

// 异步发送,因为在partition hash的时候已经按照每个分区合并了消息,走到这一步不需要考虑单个分区内的顺序问题 for (ProducerRecord record : records) { futures.add(producerTmp.send(record)); }

以上两处代码存在一个逻辑漏洞:

假设此处十条flatMessage, 这边逻辑是for循环对 单条flatMessage进行了分区,并非十条flatMessage的分区,最终拼接的发送顺序可能如下:

flatMessage1[0],flatMessage1[1],flatMessage2[0],flatMessage2[1]........ 然后kafka异步发送这个序列(kafka底层是nio发送 并不会阻塞) ,最终到达到kafka服务端 可能会是如下顺序: flatMessage2[0],flatMessage2[1],flatMessage1[0],flatMessage1[1]........ 然后 同一分区 顺序 就乱了------

建议:把这十条 flatMessage 进行统一分区 再异步提交 或者 单条分区完成后 就同步提交一次。

原提问者GitHub用户yangyiweigege

展开
收起
数据大拿 2023-05-04 11:15:10 135 0
1 条回答
写回答
取消 提交回答
  • kafka底层会按照分区做batch批量提交,同一个分区batch的合并过程是上层的for循环顺序,不同分区之间顺序无法保证

    原回答者GitHub用户agapple

    2023-05-05 10:12:45
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载