开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):消息并发处理】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12503
消息并发处理
消息消费过程分析
消息消费的过程,消息消费是在消息从服务端拉取回来之后做的事情,是进行这个消息的一个消费的处理。PullMessageService 负责对消息队列进行消息拉取,把拉取到的消息放到 ProcessQueue 里,去提交给 ConsumeMessageService 进行消息的处理。
这里面 ConsumerMessageService 有两个实现类,一个是ConsumerMessageConcurrentlyService,这个实现类的特点是并发。另外一个是 ConsumerMessageOrderlyService 这个就是顺序。
并发消费的过程:
pullCallback 是消息拉取回来之后所进入到的回调函数。
在这个当中可以找到下面一行代码:DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest, 点击 submitConsumeRequest 会显示
consumer message service的两个时限,一个就是并发的,一个就是顺序的。如下图:先看并发:
并发代码:
if (msgs.size()<= consumeBatchSize){
ConsumeRequest consumeRequest = new ConsumeRequest(msgs,processQueue,messageQueue);
try {
this.consumeExecutor . submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
} else {
for(int total = 0 ; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<~>(consumeBatchsize);
for (int i = 0; i < consumeBatchsize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
}else {
break;}
}
首先要去判断消息的数量,消息的数量最多一次是32。就是一次处理32个。如果不足32,那么就直接去提交给当前的消费线程,让它直接去处理,如果大于32,就对这个消息进行一个分二的处理,然后对每一页提交32个过去。
最终提交到消费者的线程池里:
this.cinsumeExecutor.submit(consumeRequest);
这个线程池会去执行里边的 run 方法。
在这个 run 方法当中,先去判断processQueue是不是正常的状态,如果它已经挂了,那么这里的消息全都挂了,所以要先去判断一下它的状态。对应代码是:
if(this.processQueue.isDropped()){
log.info(“the message queue not be able to consume, because it’s dropped.group={} {}”,ConsumerMessageConcurrentlyService;
Return;}
然后取出这个监听器(listener)。
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
真正的将消息传递给监听器,之前要先判断看有没有对应的钩子(hook)方法。如果有,要执行钩子方法。
if (ConsumeMessageConcurrentlySerwice.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<string,String>());
ConsumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyservice.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
将消息提交到 listener当中,让他去处理。代码如下,
Status = listenner.consumeMessage(Collections.unmodifiableList(msga),context);
如果处理完之后,有一个钩子方法,那么以下代码位置是在执行钩子方法。
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsunerImpl.hasHook()){consumeMessagecontext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE,returnType .name());
}
以上是消息消费处理的过程。入口在PullCallback中,有两种方式,一种是并发的,一种是顺序。
