- 消息模型设置为集群,则修改实例名称
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } public void changeInstanceNameToPID() { if (this.instanceName.equals("DEFAULT")) { this.instanceName = UtilAll.getPid() + "#" + System.nanoTime(); } }
将实例名称修改为 pid + 时间戳
// format: "pid@hostname" final static String HOST_NAME = ManagementFactory.getRuntimeMXBean().getName(); public static int getPid() { try { return Integer.parseInt(HOST_NAME.substring(0, HOST_NAME.indexOf('@'))); } catch (Exception e) { return -1; } }
- 获取全局唯一的实例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); // 一个clientId对应一个实例 if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); // 一个clientId对应一个实例 MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } }
在一个JVM中所有消费者、生产者持有同一个MQClientInstance实例,采用了双重检查的方式,来确定有且只有一个MQClientInstance实例。
- 负载均衡参数设置
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
3. 注册过滤消息的钩子函数
这里注册过滤消息的钩子函数有什么用呢,我们可以思考一下 ?
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); // 注册过滤消息 钩子函数 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
4. 根据不同模式,将消息进度存储在不同的地方
switch (this.defaultMQPushConsumer.getMessageModel()) { // 广播模式 case BROADCASTING: // 消息进度存储在本地文件 this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; // 集群模式 case CLUSTERING: // 消息进度存储在Broker 服务器上 this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; this.offsetStore.load(); }
目的是初始化消息进度,以及确定消息存储的位置。
5. 根据不同的消息监听器初始化消费消息线程池和扫描过期消息清除线程池
顺序消费模式与并发消费模式有一些的不同
根据不同的消息监听器初始化消费消息线程池、扫描过期消息清除线程池
顺序消息模式,不初始化扫描过期消息清除线程池,只初始化消费消息线程池
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); }
初始化消费消息线程池、扫描过期线程池
if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); }
6. 启动消费消息服务
入口:org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#start
// 启动消费消息服务 this.consumeMessageService.start();
- 顺序消费模式
顺序消费模式,启动的是线程池名称为ConsumeMessageScheduledThread_开头的定时线程池,每秒扫描一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } catch (Throwable e) { log.error("scheduleAtFixedRate lockMQPeriodically exception", e); } } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
lockMQPeriodically()这个方法就是给当前客户端所消费的所有队列去borker进行上锁。
public synchronized void lockMQPeriodically() { if (!this.stopped) { this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); } }
- 并发消费模式
启动的线程池是清除过期消息定时线程池,每15分钟扫描一次
// 开启过期消息清除,定时器 this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { cleanExpireMsg(); } catch (Throwable e) { log.error("scheduleAtFixedRate cleanExpireMsg exception", e); } } }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
7. 将消费者组注入消费者容器
入口:org.apache.rocketmq.client.impl.factory.MQClientInstance#registerConsumer
// 将消费者组注入消费者容器 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
消费者容器
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>(); MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer); if (prev != null) { log.warn("the consumer group[" + group + "] exist already."); return false; }
使用consumerTable来保存消费者组与消费者关系
在后续更新主题路由信息,检查客户端与Broker的关系等
8. 启动消息监听
入口:org.apache.rocketmq.client.impl.factory.MQClientInstance#start
mQClientFactory.start();
这一个环节的内容过多,我们将在另外写一篇文章中,去了解RocketMQ是如何做消息监听的
9. 最后向Broker设置心跳检测
更新主题发布详情,当发布者变动时
this.updateTopicSubscribeInfoWhenSubscriptionChanged(); Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); } }
检查Broker状态
this.mQClientFactory.checkClientInBroker();
向Broker设置心跳检测
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
Consumer跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,然后开始消费消息。
总结
我们主要了解的RocketMQ消息消费的启动流程,还是看得懂的,抓住这一条主线,我们在接下去的过程中,我们可以给自己留下一些问题,比如他是如何做负载均衡的?如何监听消息?在这过程中他使用到了线程池,我们能够去监听线程池的情况等等问题。
我用思维导图整理了今天分享的内容,如果有不对的地方欢迎大家指出~