1 写作目的
最近发现项目内部和外部沟通频繁使用MQ,并通过tag进行消息过滤和隔离,因此想搞清楚tag在源码中使用的地方,毕竟消息中间件这块还是有很多该学习的地方。
2 版本及说明
RocketMQ-4.9.1
3 初识ConsumeQueue及tag
首先要RocketMQ的文件存储设计,本文主要关注CommitLog文件和ConsumeQueue文件,如下图所示(图片引自该处)。当消息生产者生产消息时,所有topic的消息都会顺序的保存在CommitLog文件里,如果只从CommitLog一个文件看,是没有办法快速定位到某个topic的消息,那么此时就需要ConsumeQueue登场了。
ConsumeQueue在不同的文件夹下,根据不同的文件夹可以区分不同的队列,而ConsumeQueue文件存储的是消息的索引信息。
如上图所示消息生产者每生产一条消息就对应这下图的一条索引记录。其中消息的真实内容存储在commitLog中。
●CommitLog Offset:指向commitLog中文件的偏移量。
●Size:该条消息的大小。
●Message Tag Hashcode:生产消息时指定的 tag 的hash 值。
4 tag跟踪及定位
整个流程为:
- 1.producer生产消息
- 2.broker存储消息
- 3.conusmer启动流程
- 4.broker给consumer消息(过滤tag)
- 5.consumer消费消息(过滤&消费)
其中
topic = TopicTest
tag = TagA
4.1 producer生产消息
一般producer生产消息时候会使用如下代码,其中消息要包含topic、tag和msg消息体。
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); Message msg = new Message( "TopicTest", // topic "TagA", // tag "OrderID188", // key "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // msg body SendResult sendResult = producer.send(msg);
其中上面的tag是存在哪呢?跟Message的构造方法可以看到tag其实是放在msg的properties里,MessageConst.PROPERTY_TAGS = TAGS
public void setTags(String tags) { this.putProperty(MessageConst.PROPERTY_TAGS, tags); }
跟上面的send方法中间会跟到
MQClientAPIImpl#sendMessage方法,方法中的一行代码如下图所示,创建Request,因为本次发送为单条消息,所以代码中的三元表达式中选择RequestCode.SEND_MESSAGE_V2(310)。
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
在往下跟其实就是通过Netty给borker发送消息了(非本次内容关注重点,忽略)。
总结:
tag放在msg的properties里
发送请求的code = RequestCode.SEND_MESSAGE_V2(310)
4.2 broker存储消息
本文关注的有两个文件,一个是存储消息的CommitLog文件和存储topic索引的ConsumeQueue文件。
CommitLog是对外暴露的是一个逻辑日志(而真正对应的物理日志是多个MappedFile文件组成的)。该逻辑日志有一个最大偏移量maxOffset(DefaultMessageStore.this.commitLog.getMaxOffset())。当有新消息发到broker时消息会写到CommitLog里并且maxOffset就会增加。
而ConsumeQueue的构成是由另一个类ReputMessageService异步线程进行处理,异步构建Consumequeue。
ReputMessageService是Runnable实现类,run方法会每隔1秒执行doReput方法,如下面代码所示。
public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { Thread.sleep(1); this.doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } DefaultMessageStore.log.info(this.getServiceName() + " service end"); }
ReputMessageService里有一个属性是reputFromOffset,该属性表示同步CommLog到Consumequeue的进度。
如果
this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset()
则说明有新的消息要从CommLog构建索引到Consumequeue。
而Consumequeue中的三个属性(commitlog offset、size、tag hashcode)是怎么来的?
本身我们是有一个CommitLog的偏移量(reputFromOffset),从这个偏移量开始往后解析我们是可以解析出整条消息的,消息格式如下图所示。
解析出整条消息后可以获取到
●commitlog offset :从消息中解析到
●size:解析消息后计算的
●tag hashcode :从消息中解析到msg的properties并获取到tags(字符串)然后获取hashcode。
那么就可以构建一条Consumequeue索引了。
总结:
broker收到消息后同步放在CommitLog中(本文没讲)
ReputMessageService通过异步不断扫描reputFromOffset和commitLog.getMaxOffset关系从而获取需要构建的通知。
解析消息获取Consumequeue参数并构建。
4.3 consumer启动流程
1、获取订阅的topic和Queue信息
2、通过Reblace获取被分配的Queue,开始拉取消息
4.3.1 consumer获取topic和Queue信息
消费者启动会调用
MQClientInstance#start()方法,start()方法里有会调用
MQClientInstance#startScheduledTask()方法,里面的一段代码如下,会每隔一段时间更新一下topic路由信息
//MQClientInstance###startScheduledTask() this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
会把路由信息保存到本地的一个HashMap里,这样消费者就拿到了topic的信息并且会把broker的信息保存下来
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) //根据主题从nameserver获取topic信息 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) //把主题和主题队列相关的broker保存下来 TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); }
总结:
消费者拿到主题的队列列表和broker信息
4.3.2 consumer拉取消息
consumer怎么开始拉取消息?这里其实是一个reblance的过程
MQClientInstance的start的方法里会开启一个rebalance的线程,如下面代码所示
//MQClientInstance###start() public void start() throws MQClientException { //省略 // Start rebalance service this.rebalanceService.start(); //省略 }
跟RebalanceService的run()方法一直跟下去最后跟到RebalanceImpl的rebalanceByTopic方法。如下面代码所示。根据主题队列列表和消费者组集合去做一个Rebalance,最后的返回结果是当前消费者需要消费的主题队列。
//RebalanceImpl##rebalanceByTopic private void rebalanceByTopic(final String topic, final boolean isOrder) { //获取订阅的主题的队列 //获取订阅的主题的队列 //获取订阅的主题的队列 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); //获取同消费者组的ClientID集合 //获取同消费者组的ClientID集合 //获取同消费者组的ClientID集合 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); //排序 //排序 //排序 Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { //rebalance算法核心实现,最后的结果是返回应该消费的队列 //rebalance算法核心实现,最后的结果是返回应该消费的队列 //rebalance算法核心实现,最后的结果是返回应该消费的队列 allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { //rebalance算法核心实现,最后的结果是返回应该消费的队列 //rebalance算法核心实现,最后的结果是返回应该消费的队列 //rebalance算法核心实现,最后的结果是返回应该消费的队列 allocateResultSet.addAll(allocateResult); } //此处看下面的消费者怎么去拉消息 //此处看下面的消费者怎么去拉消息 //此处看下面的消费者怎么去拉消息 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); } }
上面代码中allocateResultSet就是该consumerGroup被分配的Queue。后面会把每一个Queue包装成一个Task去对应的Broker中拉取消息。
总结:
如下图所示,RebalanceService线程会根据情况把请求放在PullMessageService的pullRequestQueue阻塞队列队列里,队列的每一个节点就是拉请求;PullMessageService线程就是不断去pullRequestQueue里拿任务然后去看一下broker中有没有数据,如果有数据就消费。
4.4 broker响应consumer请求(过滤tag)
首先Consumer给broker发送消息,请求code是 RequestCode.PULL_MESSAGE ,因此我们可以跟borker里对这个请求码的处理的processor,最后定位到
PullMessageProcessor#processRequest方法,方法里有如下的代码
final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
跟DefaultMessageStore#getMessage方法
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { //省略 for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { //获取消息的偏移量 long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); //获取消息的大小 int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); //获取消息的tag的hashcode long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); maxPhyOffsetPulling = offsetPy; if (nextPhyFileStartOffset != Long.MIN_VALUE) { if (offsetPy < nextPhyFileStartOffset) continue; } //省略 //省略 //省略 //查看消息tag是否匹配,此时在broker实现过滤 //查看消息tag是否匹配,此时在broker实现过滤 //查看消息tag是否匹配,此时在broker实现过滤 if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } continue; } //省略 //省略 //省略 return getResult; }
跟进匹配方法,此时能发现过滤方法是看subscriptionData里是否有包含tagsCode
//ExpressionMessageFilter#isMatchedByConsumeQueue @Override public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { //省略 //省略 //省略 //订阅主题里是否包含这个hashcode return subscriptionData.getCodeSet().contains(tagsCode.intValue()); } else { //省略 }
总结:broker是根据subscriptionData里的tag的hashcode列表去过滤消息,判断从ConsumeQueue中读取的tag的hashcode是否在subscriptionData里的tag的hashcode列表中。
4.5consumer消费消息(过滤tag&消费)
Consumer端在DefaultMQPushConsumerImpl#pullMessage方法里有一个PullCallback,此方法是一个给broker发送拉取消息后的一个回调方法
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); //省略 }
跟一下PullAPIWrapper#processPullResult方法
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { //省略 for (MessageExt msg : msgList) { if (msg.getTags() != null) { //Consumer端过滤消息 //Consumer端过滤消息 //Consumer端过滤消息 if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } } //省略 return pullResult; }
总结:broker端的消息过滤是通过看subscriptionData里的tag列表是否含有当前消息的tag