canal server 版本为 1.1.5-snapshot
canal中的instance配置文件中,配置了
canal.mq.topic=customer_info canal.mq.partitionsNum=4 canal.mq.partitionHash=scb_customer.customer_info:sid,scb_customer.label_customer:sid
新增customer_info表一条记录
Broker | 队列 | 消费者终端 | 代理者位点 | 消费者位点 | 差值 | 上次时间 broker-a | 0 | | 2 | 0 | 2 | 2020-12-10 22:34:24 broker-a | 1 | | 2 | 0 | 2 | 1970-01-01 08:00:00 broker-a | 2 | | 2 | 0 | 2 | 1970-01-01 08:00:00 broker-a | 3 | | 2 | 0 | 2 | 1970-01-01 08:00:00
上面的4个队列 都增加了消息;;;每新增一条记录,4个队列都插入消息
也就是会产生4个消息(只有1个消息的消息body里面的内容是正确的;其他3个消息 MessageBody 为 null )
原提问者GitHub用户gujiachun
原因应该找到了
// 并发构造 MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor); // 串行分区 List flatMessages = MQMessageUtils.messageConverter(datas, message.getId()); // 初始化分区合并队列 if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) { List partitionFlatMessages = new ArrayList<>(); for (int i = 0; i < destination.getPartitionsNum(); i++) { partitionFlatMessages.add(new ArrayList<>()); }
for (FlatMessage flatMessage : flatMessages) {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
partitionNum,
destination.getPartitionHash(),
mqProperties.isDatabaseHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
}
}
ExecutorTemplate template = new ExecutorTemplate(sendExecutor);
for (int i = 0; i < partitionFlatMessages.size(); i++) {
final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
**if (flatMessagePart != null) {**
final int index = i;
template.submit(() -> {
List<Message> messages = flatMessagePart.stream()
.map(flatMessage -> new Message(topicName, JSON.toJSONBytes(flatMessage,
SerializerFeature.WriteMapNullValue)))
.collect(Collectors.toList());
// 批量发送
sendMessage(messages, index);
});
}
}
源码中 if (flatMessagePart != null) 这个判断 有问题;;这个是不可能为null的;调试后,[null];(有值,只是值为null而已)
这个是上面的2个for循环导致的
改了源码,修复了此bug了 改动的地方
for (int i = 0; i < length; i++) { if (partitionFlatMessage[i] != null) {//增加null判断 partitionFlatMessages.get(i).add(partitionFlatMessage[i]); } }
改动的第二个地方
if (flatMessagePart != null && flatMessagePart.size() > 0) {//判断加上size要大于0 final int index = i; template.submit(() -> { List messages = flatMessagePart.stream() .map(flatMessage -> new Message(topicName, JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue))) .collect(Collectors.toList()); // 批量发送 sendMessage(messages, index); }); }
原回答者GitHub用户gujiachun
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。