【消息中间件】默认的RocketMQ消息消费者是如何启动的?(下)

本文涉及的产品
云原生网关 MSE Higress,422元/月
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。
  1. 消息模型设置为集群,则修改实例名称
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    this.defaultMQPushConsumer.changeInstanceNameToPID();
}
public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
    }
}

将实例名称修改为 pid + 时间戳

// format: "pid@hostname"
final static String HOST_NAME = ManagementFactory.getRuntimeMXBean().getName();
public static int getPid() {
    try {
        return Integer.parseInt(HOST_NAME.substring(0, HOST_NAME.indexOf('@')));
    } catch (Exception e) {
        return -1;
    }
}
  1. 获取全局唯一的实例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 一个clientId对应一个实例
if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            // 一个clientId对应一个实例
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

在一个JVM中所有消费者、生产者持有同一个MQClientInstance实例,采用了双重检查的方式,来确定有且只有一个MQClientInstance实例。

  1. 负载均衡参数设置
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

3. 注册过滤消息的钩子函数

这里注册过滤消息的钩子函数有什么用呢,我们可以思考一下 ?

private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
// 注册过滤消息 钩子函数
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

4. 根据不同模式,将消息进度存储在不同的地方

switch (this.defaultMQPushConsumer.getMessageModel()) {
    // 广播模式
    case BROADCASTING:
        // 消息进度存储在本地文件
        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    // 集群模式
    case CLUSTERING:
        // 消息进度存储在Broker 服务器上
        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    default:
        break;
    this.offsetStore.load();
}

目的是初始化消息进度,以及确定消息存储的位置。

5. 根据不同的消息监听器初始化消费消息线程池和扫描过期消息清除线程池

顺序消费模式与并发消费模式有一些的不同

根据不同的消息监听器初始化消费消息线程池、扫描过期消息清除线程池

顺序消息模式,不初始化扫描过期消息清除线程池,只初始化消费消息线程池

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
    } 

初始化消费消息线程池、扫描过期线程池

if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
        this.consumeOrderly = false;
        this.consumeMessageService =
        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

6. 启动消费消息服务

入口:org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#start

// 启动消费消息服务
this.consumeMessageService.start();
  1. 顺序消费模式

顺序消费模式,启动的是线程池名称为ConsumeMessageScheduledThread_开头的定时线程池,每秒扫描一次

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        ConsumeMessageOrderlyService.this.lockMQPeriodically();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
                    }
                }
            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);

lockMQPeriodically()这个方法就是给当前客户端所消费的所有队列去borker进行上锁。

public synchronized void lockMQPeriodically() {
        if (!this.stopped) {
            this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
        }
    }
  1. 并发消费模式

启动的线程池是清除过期消息定时线程池,每15分钟扫描一次

// 开启过期消息清除,定时器
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            cleanExpireMsg();
        } catch (Throwable e) {
           log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
        }
    }
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);

7. 将消费者组注入消费者容器

入口:org.apache.rocketmq.client.impl.factory.MQClientInstance#registerConsumer

// 将消费者组注入消费者容器
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);

消费者容器

private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
        if (prev != null) {
            log.warn("the consumer group[" + group + "] exist already.");
            return false;
        }

使用consumerTable来保存消费者组与消费者关系

在后续更新主题路由信息,检查客户端与Broker的关系等

8. 启动消息监听

入口:org.apache.rocketmq.client.impl.factory.MQClientInstance#start

mQClientFactory.start();

这一个环节的内容过多,我们将在另外写一篇文章中,去了解RocketMQ是如何做消息监听的

9. 最后向Broker设置心跳检测

更新主题发布详情,当发布者变动时

this.updateTopicSubscribeInfoWhenSubscriptionChanged();
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
    for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
        final String topic = entry.getKey();
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    }
}

检查Broker状态

this.mQClientFactory.checkClientInBroker();

向Broker设置心跳检测

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

Consumer跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,然后开始消费消息。

总结

我们主要了解的RocketMQ消息消费的启动流程,还是看得懂的,抓住这一条主线,我们在接下去的过程中,我们可以给自己留下一些问题,比如他是如何做负载均衡的?如何监听消息?在这过程中他使用到了线程池,我们能够去监听线程池的情况等等问题。

我用思维导图整理了今天分享的内容,如果有不对的地方欢迎大家指出~

网络异常,图片无法展示
|

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
6月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
6月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
4月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
235 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
5月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
182 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
4月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
10月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
135 0
|
9月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
2709 0
|
8月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
312 3
|
7月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】

相关产品

  • 云消息队列 MQ