消息并发处理|学习笔记

简介: 快速学习消息并发处理

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息并发处理】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12503


消息并发处理

 

消息消费过程分析

消息消费的过程,消息消费是在消息从服务端拉取回来之后做的事情,是进行这个消息的一个消费的处理。PullMessageService 负责对消息队列进行消息拉取,把拉取到的消息放到 ProcessQueue 里,去提交给  ConsumeMessageService 进行消息的处理。

这里面 ConsumerMessageService 有两个实现类,一个是ConsumerMessageConcurrentlyService,这个实现类的特点是并发。另外一个是 ConsumerMessageOrderlyService 这个就是顺序。

并发消费的过程:

pullCallback 是消息拉取回来之后所进入到的回调函数。

在这个当中可以找到下面一行代码:DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest, 点击 submitConsumeRequest 会显示

consumer message service的两个时限,一个就是并发的,一个就是顺序的。如下图:先看并发:

image.png

并发代码:

if (msgs.size()<= consumeBatchSize){

ConsumeRequest consumeRequest = new ConsumeRequest(msgs,processQueue,messageQueue);

try  {

this.consumeExecutor . submit(consumeRequest);

} catch (RejectedExecutionException e) {

this.submitConsumeRequestLater(consumeRequest);

} else {

for(int total = 0 ; total < msgs.size(); ) {

List<MessageExt> msgThis = new ArrayList<~>(consumeBatchsize);

for (int i = 0; i < consumeBatchsize; i++, total++) {

if (total < msgs.size()) {

msgThis.add(msgs.get(total));

}else {

break;}

}

首先要去判断消息的数量,消息的数量最多一次是32。就是一次处理32个。如果不足32,那么就直接去提交给当前的消费线程,让它直接去处理,如果大于32,就对这个消息进行一个分二的处理,然后对每一页提交32个过去。

最终提交到消费者的线程池里:

this.cinsumeExecutor.submit(consumeRequest);

这个线程池会去执行里边的 run 方法。

在这个 run 方法当中,先去判断processQueue是不是正常的状态,如果它已经挂了,那么这里的消息全都挂了,所以要先去判断一下它的状态。对应代码是:

if(this.processQueue.isDropped()){

log.info(“the message queue not be able to consume, because it’s dropped.group={} {}”,ConsumerMessageConcurrentlyService;

Return;}

然后取出这个监听器(listener)。

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;

真正的将消息传递给监听器,之前要先判断看有没有对应的钩子(hook)方法。如果有,要执行钩子方法。

if (ConsumeMessageConcurrentlySerwice.this.defaultMQPushConsumerImpl.hasHook()) {

consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());

consumeMessageContext.setProps(new HashMap<string,String>());

ConsumeMessageContext.setMq(messageQueue);

consumeMessageContext.setMsgList(msgs);

consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyservice.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);

将消息提交到 listener当中,让他去处理。代码如下,

Status = listenner.consumeMessage(Collections.unmodifiableList(msga),context);

如果处理完之后,有一个钩子方法,那么以下代码位置是在执行钩子方法。

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsunerImpl.hasHook()){consumeMessagecontext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE,returnType .name());

}

以上是消息消费处理的过程。入口在PullCallback中,有两种方式,一种是并发的,一种是顺序。

相关文章
|
存储 缓存 负载均衡
数据库分库分表:提升系统性能的必由之路
数据库分库分表:提升系统性能的必由之路
343 1
|
算法 调度
详解操作系统的调度
详解操作系统的调度
839 0
|
druid Java 数据库
druid+springboot加解密Druid链接池配置加密密码链接数据库
druid+springboot加解密Druid链接池配置加密密码链接数据库
1374 0
|
7月前
|
设计模式 SQL 安全
并发设计模式实战系列(13):双重检查锁定(Double-Checked Locking)
🌟 大家好,我是摘星!🌟今天为大家带来的是并发设计模式实战系列,第十三章,废话不多说直接开始~
437 0
|
7月前
|
设计模式 缓存 安全
【设计模式】【创建型模式】单例模式(Singleton)
一、入门 什么是单例模式? 单例模式是一种设计模式,确保一个类只有一个实例,并提供一个全局访问点。它常用于需要全局唯一对象的场景,如配置管理、连接池等。 为什么要单例模式? 节省资源 场景:某些对象创
281 15
|
9月前
|
前端开发 Java 微服务
微服务——SpringBoot使用归纳——Spring Boot中的MVC支持——@PathVariable
`@PathVariable` 是 Spring Boot 中用于从 URL 中提取参数的注解,支持 RESTful 风格接口开发。例如,通过 `@GetMapping(&quot;/user/{id}&quot;)` 可以将 URL 中的 `{id}` 参数自动映射到方法参数中。若参数名不一致,可通过 `@PathVariable(&quot;自定义名&quot;)` 指定绑定关系。此外,还支持多参数占位符,如 `/user/{id}/{name}`,分别映射到方法中的多个参数。运行项目后,访问指定 URL 即可验证参数是否正确接收。
529 0
|
9月前
|
Java 微服务 Spring
微服务——SpringBoot使用归纳——Spring Boot使用slf4j进行日志记录——使用Logger在项目中打印日志
本文介绍了如何在项目中使用Logger打印日志。通过SLF4J和Logback,可设置不同日志级别(如DEBUG、INFO、WARN、ERROR)并支持占位符输出动态信息。示例代码展示了日志在控制器中的应用,说明了日志配置对问题排查的重要性。附课程源码下载链接供实践参考。
1076 0
|
Java 开发者 Spring
Spring AOP 底层原理技术分享
Spring AOP(面向切面编程)是Spring框架中一个强大的功能,它允许开发者在不修改业务逻辑代码的情况下,增加额外的功能,如日志记录、事务管理等。本文将深入探讨Spring AOP的底层原理,包括其核心概念、实现方式以及如何与Spring框架协同工作。
|
存储 NoSQL 算法
深入理解Redis数据类型Zset原理
本文深入探讨了Redis中的Zset(有序集合)数据类型,它是一种可以存储排序功能的集合,其中每个元素都具有一个浮点型的score属性,用于根据score进行排序。
深入理解Redis数据类型Zset原理
|
消息中间件 存储 NoSQL
[Kafka 常见面试题]如何保证消息的不重复不丢失
[Kafka 常见面试题]如何保证消息的不重复不丢失
1301 0