rocketMQConsumer.subscribe(this.topic, "*"); 不是应该替换成 rocketMQConsumer.subscribe(this.topic, filter); 更加合理吗
public synchronized void subscribe(String filter) throws CanalClientException {
if (connected) {
return;
}
try {
if (rocketMQConsumer == null) {
this.connect();
}
rocketMQConsumer.subscribe(this.topic, "*");
rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
boolean isSuccess = process(messageExts);
if (isSuccess) {
return ConsumeOrderlyStatus.SUCCESS;
} else {
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
rocketMQConsumer.start();
} catch (MQClientException ex) {
connected = false;
logger.error("Start RocketMQ consumer error", ex);
}
connected = true;
}
一个topic里 不可能全都是canal发的消息,也有别的业务系统发的消息,这些消息都是通过tag区分的, 然而rocketMQConsumer.subscribe(this.topic, "*"); 这里订阅了所有的tag,那RocketMQCanalConnector 解析到其他系统的消息就出错了
服务端发送消息也没有tag com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer#send, 应该做成可配置吧
Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data, mqProperties.isFilterTransactionEntry()));
原提问者GitHub用户sunlightzy
目前filter主要发生在写入MQ这一段,后续可以考虑将表名加到tag里,允许客户端进行过滤
原回答者GitHub用户agapple
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。