深入理解RocketMQ广播消费

简介: 这篇文章我们聊聊广播消费,因为广播消费在某些场景下真的有奇效。笔者会从基础概念、实现机制、实战案例、注意事项四个方面一一展开,希望能帮助到大家。

这篇文章我们聊聊广播消费,因为广播消费在某些场景下真的有奇效。笔者会从基础概念实现机制实战案例注意事项四个方面一一展开,希望能帮助到大家。

1 基础概念

RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。

集群消费

同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。

广播消费

当使用广播消费模式时,每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。

2 源码解析

首先下图展示了广播消费的代码示例。

public class PushConsumer {
   
    public static final String CONSUMER_GROUP = "myconsumerGroup";
    public static final String DEFAULT_NAMESRVADDR = "localhost:9876";
    public static final String TOPIC = "mytest";
    public static final String SUB_EXPRESSION = "TagA || TagC || TagD";

    public static void main(String[] args) throws InterruptedException, MQClientException {
   
        // 定义 DefaultPushConsumer 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        // 定义名字服务地址
        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        // 定义消费读取位点
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 定义消费模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 订阅主题信息
        consumer.subscribe(TOPIC, SUB_EXPRESSION);
        // 订阅消息监听器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
   
            try {
   
                for (MessageExt messageExt : msgs) {
   
                    System.out.println(new String(messageExt.getBody()));
                }
            }catch (Exception e) {
   
                e.printStackTrace();
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}

和集群消费不同的点在于下面的代码:

consumer.setMessageModel(MessageModel.BROADCASTING);

接下来,我们从源码角度来看看广播消费和集群消费有哪些差异点 ?

首先进入 DefaultMQPushConsumerImpl 类的 start 方法 , 分析启动流程中他们两者的差异点:

▍ 差异点1:拷贝订阅关系

private void copySubscription() throws MQClientException {
   
    try {
   
       Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
       if (sub != null) {
   
          for (final Map.Entry<String, String> entry : sub.entrySet()) {
   
              final String topic = entry.getKey();
              final String subString = entry.getValue();
              SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            }
        }
       if (null == this.messageListenerInner) {
   
          this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
       }
       // 注意下面的代码 , 集群模式下自动订阅重试主题 
       switch (this.defaultMQPushConsumer.getMessageModel()) {
   
           case BROADCASTING:
               break;
           case CLUSTERING:
                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                break;
            default:
                break;
        }
    } catch (Exception e) {
   
        throw new MQClientException("subscription exception", e);
    }
}

在集群模式下,会自动订阅重试队列,而广播模式下,并没有这段代码。也就是说广播模式下,不支持消息重试

▍ 差异点2:本地进度存储

switch (this.defaultMQPushConsumer.getMessageModel()) {
   
    case BROADCASTING:
        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    case CLUSTERING:
        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    default:
        break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

我们可以看到消费进度存储的对象是: LocalFileOffsetStore , 进度文件存储在如下的主目录/{用户主目录}/.rocketmq_offsets

public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
    "rocketmq.client.localOffsetStoreDir",
    System.getProperty("user.home") + File.separator + ".rocketmq_offsets");

进度文件是 /mqClientId/{consumerGroupName}/offsets.json

this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator + this.groupName + File.separator + "offsets.json";

笔者创建了一个主题 mytest , 包含4个队列,进度文件内容如下:

消费者启动后,我们可以将整个流程简化如下图,并继续整理差异点:

▍ 差异点3:负载均衡消费该主题的所有 MessageQueue

进入负载均衡抽象类 RebalanceImplrebalanceByTopic方法 。

private void rebalanceByTopic(final String topic, final boolean isOrder) {
   
    switch (messageModel) {
   
        case BROADCASTING: {
   
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
   
                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                // 省略代码
            } else {
   
                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
            }
            break;
        }
        case CLUSTERING: {
   
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
            // 省略代码
            if (mqSet != null && cidAll != null) {
   
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);

                Collections.sort(mqAll);
                Collections.sort(cidAll);

                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                List<MessageQueue> allocateResult = null;
                try {
   
                     allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
   
                        // 省略日志打印代码
                        return;
                    }
                Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                if (allocateResult != null) {
   
                    allocateResultSet.addAll(allocateResult);
                }
                boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                //省略代码
            }
            break;
        }
        default:
            break;
    }
}

从上面代码我们可以看到消息模式为广播消费模式时,消费者会订阅该主题下所有的 messageQueue ,这一点也可以从本地的进度文件 offsets.json 得到印证。

▍ 差异点4:不支持顺序消息

顺序消费会向 Borker 申请锁 。消费者根据分配的队列 messageQueue ,向 Borker 申请锁 ,如果申请成功,则会拉取消息,如果失败,则定时任务每隔20秒会重新尝试。

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
   
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
   
        @Override
        public void run() {
   
            try {
   
                ConsumeMessageOrderlyService.this.lockMQPeriodically();
            } catch (Throwable e) {
   
                log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
            }
        }
    }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}

从上面的代码,我们发现只有在集群消费的时候才会定时申请锁,这样就会导致广播消费时,无法为负载均衡的队列申请锁,导致拉取消息服务一直无法获取消息数据。

为了再次验证,我们修改例子,消费模式从并发消费修改为顺序消费 。

consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
   
    try {
   
        for (MessageExt messageExt : msgs) {
   
            System.out.println(new String(messageExt.getBody()));
        }
    }catch (Exception e) {
   
        e.printStackTrace();
    }
    return ConsumeOrderlyStatus.SUCCESS;
});

从图中,笔者观察到拉取消息的线程无法发起拉取消息请求到 Broker ,因为负载均衡后的队列无法获取到锁。

因此,广播消费模式并不支持顺序消息

▍ 差异点5:并发消费消费失败时,没有重试

进入并发消息消费类ConsumeMessageConcurrentlyService 的处理消费结果方法processConsumeResult

switch (this.defaultMQPushConsumer.getMessageModel()) {
   
    case BROADCASTING:
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
   
            MessageExt msg = consumeRequest.getMsgs().get(i);
            log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
        }
        break;
    case CLUSTERING:
        List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
   
            MessageExt msg = consumeRequest.getMsgs().get(i);
            boolean result = this.sendMessageBack(msg, context);
            if (!result) {
   
                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                msgBackFailed.add(msg);
            }
        }

        if (!msgBackFailed.isEmpty()) {
   
            consumeRequest.getMsgs().removeAll(msgBackFailed);

            this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
        }
        break;
    default:
        break;
}

消费消息失败后,集群消费时,消费者实例会通过 CONSUMER_SEND_MSG_BACK 请求,将失败消息发回到 Broker 端。

但在广播模式下,仅仅是打印了消息信息。因此,广播模式下,并没有消息重试

3 实战案例

笔者第一次接触广播消费的业务场景是神州专车司机端消息推送。 用户下单之后,订单系统生成专车订单,派单系统会根据相关算法将订单派给某司机,司机端就会收到派单推送。

推送架构图如下:

司机端启动后,会通过负载均衡和推送服务创建长连接,推送服务会保存 TCP 连接引用 (比如司机编号和 TCP channel 的引用)。

推送服务是一个 TCP 服务(自定义协议),同时也是一个消费者服务,消息模式是广播消费。

派单服务是生产者,将派单数据发送到 MetaQ , 每个推送服务都会消费到该消息,推送服务判断本地内存中是否存在该司机的 TCP channel , 若存在,则通过 TCP 连接将数据推送给司机端。

肯定有同学会问:假如网络原因,推送失败怎么处理 ?有两个要点:

  1. 司机端定时主动拉取派单信息;
  2. 当推送服务没有收到司机端的 ACK 时 ,也会一定时限内再次推送,达到阈值后,不再推送。

4 注意事项

集群消费和广播消费模式下,各功能的支持情况如下:

功能 集群消费 广播消费
顺序消息 支持 不支持
重置消费位点 支持 不支持
消息重试 支持 不支持
消费进度 服务端维护 客户端维护

参考资料 :

https://www.51cto.com/article/714277.html

https://ost.51cto.com/posts/21100


如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 负载均衡 RocketMQ
RocketMQ的消费模式
在公司的技术分享中,就有聊到rocketmq的消费模式,特此总结一下。 在说消费之前,这里先说一下rocketmq中group的概念吧,一个group代表的是逻辑相同的一组实例,最可以表达这个概念的是我们将一个项目部署多个实例,那么这个项目的集群就可以称之为一个group。
1048 0
RocketMQ的消费模式
|
5月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
134 0
|
5月前
|
消息中间件 缓存 API
RocketMQ - 生产者消息发送流程
RocketMQ - 生产者消息发送流程
93 0
|
8月前
|
消息中间件 RocketMQ
RocketMq消费者/生产者配置
RocketMq消费者/生产者配置
|
8月前
|
消息中间件 存储 RocketMQ
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
80 0
|
消息中间件 存储 安全
RocketMQ-消息消费模式 顺序消费
RocketMQ-消息消费模式 顺序消费
228 0
|
消息中间件 RocketMQ
RocketMQ消费者没有成功消费消息的问题排查
RocketMQ消费者没有成功消费消息的问题排查
1665 1
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2775 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 Java API
RocketMq-消息模式讲解
RocketMq-消息模式讲解
RocketMq-消息模式讲解
|
消息中间件 缓存 负载均衡
RocketMQ消息生产者是如何选择Broker的
RocketMQ消息生产者是如何选择Broker的
563 1