实时更新消息消费队列与索引文件流程说明|学习笔记

简介: 快速学习实时更新消息消费队列与索引文件流程说明

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)实时更新消息消费队列与索引文件流程说明】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12483


实时更新消息消费队列与索引文件流程说明

 

内容介绍:

一.消息数据分发流程

二.流程回顾

 

一.消息数据分发流程

实时更新消息消费队列与索引文件相关的知识,消息被发送到MQ 之后,首先会将消息存储到CommitLog, CommitLog 再负责将消息分发到 Consumerqueue 以及 indexFile 中,如果在消息分发时出现延迟,会造成当前的消费者不能够及时消费消息,根据indexFile 不会立即能够查找到对应的消息,RocketMQ 为了提高消息分发的准确性,专门有一个线程进行处理,通过ReputMessageService 这个专门的线程来处理消息数据的分发。

image.png

该线程的开启随着 DefaultMessageStore 这个类的启动会被开启。

Public void statrc() throws Exception{

This.reputMessageService.start();

image.png在 DefaultMessageStore 中消息存储 start方法被打开调用之后,有一个reportMessageService,这个其实是一个线程,开启了线程 start 方法本质上在调用线程中的 run 方法,run。中会每隔一毫秒执行数据分发,数据分发的业务逻辑都在 doReport 中。

image.png在 doReport 中首先根据当前 report FromOffset 转发的位移量开始查找全部有效的数据,先将数据从 CommitLog 中查找出来,对应代码就是如下一行:

DefaultMessageStore.this.comitlog.getData(report FromOffset );

根据 report FromOffset 从 CommitLog 中找到对应的数据,数据就是一条条的消息,消息有许多条,所以做一个循环遍历 result

For(int readSize = 0; readSize < result.getSize() && doNext;)

遍历完成之后进行分发时,分发到两个位置 Consumerqueue 以及 indexFile,所以有一个关键地方如下图

image.png

这个地方开始进行数据的分发的处理。在 doDispatch 方法中,又是一个遍历,该遍历中 CommitLog Dispatcher。是一个接口,它的实现类有两个,一个是CommitLogDispatcherBuildConsumeQueue(专门给 Consumerqueue 分发的实现类)一个是给 indexFile 专门进行分发的实现类

image.png

这两个类具体分发的业务对应构建消息队列然后调用CommitLogDispatcherBuildConsumeQueue 给Consumerqueue 分发,然后调用CommitLogDispatcherBuildIndex 给 index 索引文件进行分发处理。

 

二.流程回顾

数据分发基本流程如上。整个数据分发的服务在当前DefaultMessageStore 方法中有一个开启线程的 start 方法,然后在 start。中reputMessageService 是一个线程,会调用线程的 run 方法,在 run 中通过doReport 进行数据分发。在数据分发时第一步是查找数据,查找出来之后,遍历每一条消息,调用接口中的 doDispatch 方法进行分发。接口在调用时使用的是接口的实现类,实现类只有有两个CommitLogDispatcherBuildConsumeQueue 以及 CommitLogDispatcherBuildIndex。

相关文章
|
人工智能 搜索推荐 大数据
EDM营销是什么意思?
EDM营销是什么意思?
|
4月前
|
人工智能 安全 云计算
非洲首届奥运赛事将跑在阿里云上!
阿里云将为2026年达喀尔青奥会提供全程云计算与AI支持,覆盖赛事核心应用及服务,助力赛事全面上云,提升运营效率与观众体验。这将是奥运史上首届在非洲举办的综合性赛事,也将首次由云计算和AI技术全面支撑运行。
176 0
|
7月前
|
安全 应用服务中间件 网络安全
从零(服务器、域名购买)开始搭建雷池WAF到应用上线简明指南
本文详细介绍了基于雷池WAF的网站防护部署全流程,涵盖服务器与域名准备、WAF安装配置、网站接入设置及静态文件站点搭建等内容。通过最低1核CPU/1GB内存的服务器配置,完成Docker环境搭建、雷池一键安装及端口设置,实现域名解析、SSL证书配置和防护策略优化。同时支持301重定向与HTTP到HTTPS自动跳转,确保访问安全与规范。最后还提供了使用静态文件搭建网站的方法,帮助用户快速构建具备基础WAF防护能力的网站系统。
从零(服务器、域名购买)开始搭建雷池WAF到应用上线简明指南
|
11月前
|
Java Linux iOS开发
如何配置 Java 环境变量:设置 JAVA_HOME 和 PATH
本文详细介绍如何在Windows和Linux/macOS系统上配置Java环境变量。
13492 12
|
Kubernetes 大数据 容器
三、kubernetes 集群 YAML 文件详解
是一个可读性高,用来表大数据列的格式。Yaml的意思其实是:仍是一种标记语言,但是为了强调这种语言以数据为中心。而不是以标记语言为重点。
1415 0
三、kubernetes 集群 YAML 文件详解
|
Java 数据库连接 数据库
Spring与Mybatis集成且Aop整合(放飞双手,迅速完成CRUD及分页)
Spring与Mybatis集成且Aop整合(放飞双手,迅速完成CRUD及分页)
364 0
|
存储 Java Linux
纯C语言Socket实现聊天室
最近在学习嵌入式开发,练习C语言小项目,基本是参考别人的代码,做了些修改实现了聊天室,纯C语言编写。
502 0
纯C语言Socket实现聊天室
|
设计模式 测试技术 开发工具
Git操作指南: 企业级项目分支管理流程 - SourceTree Mac 版(1)
发现每到一家公司,公司的管理流程都差距甚远,大的互联网公司注重细节和内部实现,代码质量要求简直就是苛刻,新员工&lt;review code + 导师 &gt;是少不了的,每写一行代码,非得说清楚里面原理是什么?为什么要这样写,是不是用啥设计模式会更有拓展性一点,业务文档和技术文档也是强制性的,各种流程像流水线办公类似,一个细节出问题,影响的就是整个发版,复杂的需求可以划分为多个需求id,然后每个人只负责自己的小需求模块即可,小团队的话则更注重整体业务把握,基本上搞透一个点,做着做着就得梳理之前的业务流程,刚好国庆挤了点时间把git总结了一下,然后再逐步分析一下企业真实的git管理是怎么样的,希望对职场的
703 0
Git操作指南: 企业级项目分支管理流程 - SourceTree Mac 版(1)
|
Shell API 数据库
综合扫描工具 -- recon-ng(v5.0.1)
综合扫描工具 -- recon-ng(v5.0.1)
345 0
综合扫描工具 -- recon-ng(v5.0.1)