开发者社区> 问答> 正文

RocketMQCanalConnector#subscribe(String filter) 这个

rocketMQConsumer.subscribe(this.topic, "*"); 不是应该替换成 rocketMQConsumer.subscribe(this.topic, filter); 更加合理吗

public synchronized void subscribe(String filter) throws CanalClientException {
    if (connected) {
        return;
    }
    try {
        if (rocketMQConsumer == null) {
            this.connect();
        }
        rocketMQConsumer.subscribe(this.topic, "*");
        rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                boolean isSuccess = process(messageExts);
                if (isSuccess) {
                    return ConsumeOrderlyStatus.SUCCESS;
                } else {
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
        });
        rocketMQConsumer.start();
    } catch (MQClientException ex) {
        connected = false;
        logger.error("Start RocketMQ consumer error", ex);
    }
    connected = true;
}

一个topic里 不可能全都是canal发的消息,也有别的业务系统发的消息,这些消息都是通过tag区分的, 然而rocketMQConsumer.subscribe(this.topic, "*"); 这里订阅了所有的tag,那RocketMQCanalConnector 解析到其他系统的消息就出错了

服务端发送消息也没有tag com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer#send, 应该做成可配置吧

Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data, mqProperties.isFilterTransactionEntry()));

原提问者GitHub用户sunlightzy

展开
收起
古拉古拉 2023-05-08 17:06:47 99 0
1 条回答
写回答
取消 提交回答
  • 目前filter主要发生在写入MQ这一段,后续可以考虑将表名加到tag里,允许客户端进行过滤

    原回答者GitHub用户agapple

    2023-05-09 18:36:51
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载