【RocketMQ系列七】消费者和生产者的实现细节

简介: 【RocketMQ系列七】消费者和生产者的实现细节

本文首先会介绍消费者的推模式,拉模式,接着会介绍 生产者负载均衡策略。

介绍之前我们首先需要拉取RocketMQ的源码,源码地址是:https://github.com/apache/rocketmq.git

1. 消费者的消费模式

RocketMQ 同时支持消费者的推模式以及拉模式。推模式顾名思义就是broker将消息推送给消费者,拉模式则是消费者主动到队列中拉取消息。默认情况下,RocketMQ使用的是推模式。

在IDEA中导入RocketMQ源码之后,找到 example模块,然后在此模块中找到各种例子。

1.1.推模式

消费者推模式的例子就是 org.apache.rocketmq.example.simple.PushConsumer 。推模式的消费者的实现类是 DefaultMQPushConsumer 。之前的文章已经做了详细介绍,在此就不在赘述了。推模式适合于大部分正常消费的情况

public static final String TOPIC = "TopicTest";
    public static final String CONSUMER_GROUP = "CID_JODIE_1";
    public static final String NAMESRV_ADDR = "127.0.0.1:9876";
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
//        consumer.setNamesrvAddr(NAMESRV_ADDR);
        consumer.subscribe(TOPIC, "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

1.2. 拉模式

消费者拉模式的例子是:org.apache.rocketmq.example.simple.LitePullConsumerAssign 。拉模式主要适用于回溯消费消息。比如:某个消息你消费失败了,你现在想重新消费该消息的情况。我们知道RocketMQ中消息消费完之后不会里面会被删除,默认会在队列中保留48小时。通过broker配置文件中的fileReservedTime参数进行设置。

//1.创建DefaultLitePullConsumer实例
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
        litePullConsumer.setAutoCommit(false);
        //2.启动litePullConsumer实例
        litePullConsumer.start();
        //3.获取TopicTest主题下所有的队列
        Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
        List<MessageQueue> list = new ArrayList<>(mqSet);
        List<MessageQueue> assignList = new ArrayList<>();
        for (int i = 0; i < list.size() / 2; i++) {
            assignList.add(list.get(i));
        }
        //4.消费者需要拉取的队列的集合
        litePullConsumer.assign(assignList);
        //5.消费者需要定位,哪个队列,多少偏移量的消息。
        litePullConsumer.seek(assignList.get(0), 10);
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s %n", messageExts);
                litePullConsumer.commit();
            }
        } finally {
            litePullConsumer.shutdown();
        }

2. 生产者负载均衡策略

我们都知道一个主题下会有多个消息队列(MessageQueue),那么,生产者在发送消息的时候如何选择消息队列呢?

首先找到生产者的示例代码类:org.apache.rocketmq.example.simple.Producer。在该类中找到发送消息的方法 producer.send(msg)

接着找到发送消息的默认实现方法 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

在此方法中可以找到 selectOneMessageQueue 方法,从方法名可以知道此方法就是用来选出一个MessageQueue的

//省略部分代码
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);
                //省略部分代码

在selectOneMessageQueue方法中通过调用 tpInfo.selectOneMessageQueue 方法来获取

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
        BrokerFilter brokerFilter = threadBrokerFilter.get();
        brokerFilter.setLastBrokerName(lastBrokerName);
        if (this.sendLatencyFaultEnable) {
            if (resetIndex) {
                tpInfo.resetIndex();
            }
            MessageQueue mq = tpInfo.selectOneMessageQueue(availableFilter, brokerFilter);
            if (mq != null) {
                return mq;
            }
            mq = tpInfo.selectOneMessageQueue(reachableFilter, brokerFilter);
            if (mq != null) {
                return mq;
            }
            return tpInfo.selectOneMessageQueue();
        }
        MessageQueue mq = tpInfo.selectOneMessageQueue(brokerFilter);
        if (mq != null) {
            return mq;
        }
        return tpInfo.selectOneMessageQueue();
    }

那么最终的实现逻辑就是在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue 方法中了。我们可以查看此方法。

private MessageQueue selectOneMessageQueue(List<MessageQueue> messageQueueList, ThreadLocalIndex sendQueue, QueueFilter ...filter) {
       //省略非核心代码
        if (filter != null && filter.length != 0) {
            for (int i = 0; i < messageQueueList.size(); i++) {
                int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
                MessageQueue mq = messageQueueList.get(index);
                boolean filterResult = true;
                for (QueueFilter f: filter) {
                    Preconditions.checkNotNull(f);
                    filterResult &= f.filter(mq);
                }
                if (filterResult) {
                    return mq;
                }
            }
            return null;
        }

这里的核心代码就是下面这句代码:

int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());

首先通过 sendQueue.incrementAndGet() 方法获取当前线程下index值。然后对该主题下所有的队列数进行求模取余。也就是说RocketMQ默认会采取轮询的方式选择消息队列 接着我们来看下该方法的实现。

private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<>();
    private final Random random = new Random();
    public int incrementAndGet() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            index = random.nextInt();
        }
        this.threadLocalIndex.set(++index);
        return index & POSITIVE_MASK;
    }

首先从线程本地变量 threadLocalIndex 中获取索引值index,如果没有的话则随机取一个值。然后将取到index中进行加一操作放回threadLocalIndex中。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java 调度
消息队列 MQ使用问题之消费者自动掉线是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
4月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
4月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
4月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
112 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
78 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
67 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
55 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
73 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 生产者最佳实践总结
RocketMQ - 生产者最佳实践总结
52 0