从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(3): 消息读写队列,消息存储,消息发送,消息消费关联流程和原理

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(3): 消息读写队列,消息存储,消息发送,消息消费关联流程和原理

本文承接上文《从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(2):路由注册,消息发送核心流程原理》

ae48956613064dabae7290d6474de3c4.png

闲话少说,我们直接上图,我这特意用颜色标注了一下,注意观察颜色相同的部分


d9999dc8c74c4d62a5d5d2d6b668b61a.png

流程描述

消息生产-存储流程

1.首选生产者从本地缓存或者从nameserver 获取到对应topic 对应的broker路由以及quene 写队列

2.生产者本地使用负载均衡策略选择一个broker和队列进行发送

3.broker 接到消息后会直接保存或者通过page cache 和内存映射首先将消息保存如内存中,

然后定时去保存到commitLog里,具体看是同步保存还是异步保存

4.broker 会启动定时任务监听commitLog 文件更新,如果有更新,

会同步到consumeQuene和index中,comsumeQuene结构为/topic名/queneid/xxx

消息消费-存储流程

1.消费者从nameserver 获取到对应topic 对应的broker路由以及quene 读队列

2.然后开启一个线程去批量拉取消息,将消息放入消息租possessMessage 内

3.处理possessMessage ,处理完一批后保存消费进度到本地

4.启动定时任务发送消费进度到broker端

5.broker 同步进度文件consumeOffset.json

消息存储结构

消息存储结构图

RocketMQ存储路径为${ROCKET_HOME}/store

5714c557fcc340d3b281e768257be72b.png

核心文件数据结构介绍

commitLog 数据结构

消息主体以及元数据的存储主体,存储消息生产端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1GB,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824。第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件  

81592cd38a6147b0a84a9edd3414d4a2.png


673069028ec74661a5cb4576ce7f8e38.png

RocketMQ基于主题订阅模式实现消息消费,消费者关心的是一个主题下的所有消息,但同一主题的消息是不连续地存储在CommitLog文件中的。如果消息消费者直接从消息存储文件中遍历查找订阅主题下的消息,效率将极其低下。RocketMQ为了适应消息消费的检索需求,设计了ConsumeQueue文件,该文件可以看作CommitLog关于消息消费的“索引”文件,ConsumeQueue的第一级目录为消息主题,第二级目录为主题的消息队列


单个ConsumeQueue文件中默认包含30万个条目,单个文件的长度为3×106×20字节,单个ConsumeQueue文件可以看作一个ConsumeQueue条目的数组,其下标为ConsumeQueue的逻辑偏移量,消息消费进度存储的偏移量即逻辑偏移量。ConsumeQueue即为CommitLog文件的索引文件,其构建机制是当消息到达CommitLog文件后,由专门的线程产生消息转发任务  

index 数据结构


ConsumeQueue是RocketMQ专门为消息订阅构建的索引文件,目的是提高根据主题与消息队列检索消息的速度。另外,RocketMQ引入哈希索引机制为消息建立索引,HashMap的设计包含两个基本点:哈希槽与哈希冲突的链表结构。




Index包含Index文件头、哈希槽、Index条目(数据)。Index文件头包含40字节,记录该Index的统计信息,其结构如下。

1)beginTimestamp:Index文件中消息的最小存储时间。

2)endTimestamp:Index文件中消息的最大存储时间。

3)beginPhyoffset:Index文件中消息的最小物理偏移量(CommitLog文件偏移量)。

4)endPhyoffset:Index文件中消息的最大物理偏移量(CommitLog文件偏移量)。

5)hashslotCount:hashslot个数,并不是哈希槽使用的个数,在这里意义不大。

6)indexCount:Index条目列表当前已使用的个数,Index条目在Index条目列表中按顺序存储。

一个Index默认包含500万个哈希槽。哈希槽存储的是落在该哈希槽的哈希码最新的Index索引。默认一个Index文件包含2000万个条目,每个Index条目结构如下。

1)hashcode:key的哈希码。

2)phyoffset:消息对应的物理偏移量。


3)timedif:该消息存储时间与第一条消息的时间戳的差值,若小于0,则该消息无效。

4)pre index no:该条目的前一条记录的Index索引,当出现哈希冲突时,构建链表结构。

接下来重点分析如何将Map<String/*消息索引key*/,long phyOffset/*消息物理偏移量*/>存入Index文件,以及如何根据消息索引key快速查找消息。

RocketMQ将消息索引键与消息偏移量的映射关系写入Index的实现方法为public boolean putKey(final String key, final long phyOffset, final long storeTimestamp),参数含义分别为消息索引、消息物理偏移量、消息存储时间

消息读写队列的概念

每个tpoic 在broker 中创建的时候都会默认创建4个读队列和4个写队列

独写队列不是我们传统意义理解的独写分离实际存在的队列,实际上只是两个数字变量,

用来返回给消息生产者和消息消费者选择发送队列用的,

比如生产者连接broker topic-1的时候如果写队列设置4,那么就会返回broker-0 ,broker-1,broker-2,broker-3

这时候就会从0~3选择一个发送到broker ,消费者连接borker topic-1的时候如果读队列设置未4,根据nameserver 负载均衡后

那么就会会返回broker-0 ,broker-1,broker-2,broker-3,一个或者多个,


注意点:无论一发送端还是消费端,实际上都是针对文件的操作,

也就是上面提到的commitLog 和consumeQuene,而不是针对的java的实际几个队列,主要流程图下图

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
Java 开发者 微服务
从单体到微服务:如何借助 Spring Cloud 实现架构转型
**Spring Cloud** 是一套基于 Spring 框架的**微服务架构解决方案**,它提供了一系列的工具和组件,帮助开发者快速构建分布式系统,尤其是微服务架构。
243 69
从单体到微服务:如何借助 Spring Cloud 实现架构转型
|
22天前
|
搜索推荐 NoSQL Java
微服务架构设计与实践:用Spring Cloud实现抖音的推荐系统
本文基于Spring Cloud实现了一个简化的抖音推荐系统,涵盖用户行为管理、视频资源管理、个性化推荐和实时数据处理四大核心功能。通过Eureka进行服务注册与发现,使用Feign实现服务间调用,并借助Redis缓存用户画像,Kafka传递用户行为数据。文章详细介绍了项目搭建、服务创建及配置过程,包括用户服务、视频服务、推荐服务和数据处理服务的开发步骤。最后,通过业务测试验证了系统的功能,并引入Resilience4j实现服务降级,确保系统在部分服务故障时仍能正常运行。此示例旨在帮助读者理解微服务架构的设计思路与实践方法。
69 16
|
24天前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
|
6月前
|
Java UED Sentinel
微服务守护神:Spring Cloud Sentinel,让你的系统在流量洪峰中稳如磐石!
【8月更文挑战第29天】Spring Cloud Sentinel结合了阿里巴巴Sentinel的流控、降级、熔断和热点规则等特性,为微服务架构下的应用提供了一套完整的流量控制解决方案。它能够有效应对突发流量,保护服务稳定性,避免雪崩效应,确保系统在高并发下健康运行。通过简单的配置和注解即可实现高效流量控制,适用于高并发场景、依赖服务不稳定及资源保护等多种情况,显著提升系统健壮性和用户体验。
115 1
|
2月前
|
运维 监控 Java
为何内存不够用?微服务改造启动多个Spring Boot的陷阱与解决方案
本文记录并复盘了生产环境中Spring Boot应用内存占用过高的问题及解决过程。系统上线初期运行正常,但随着业务量上升,多个Spring Boot应用共占用了64G内存中的大部分,导致应用假死。通过jps和jmap工具排查发现,原因是运维人员未设置JVM参数,导致默认配置下每个应用占用近12G内存。最终通过调整JVM参数、优化堆内存大小等措施解决了问题。建议在生产环境中合理设置JVM参数,避免资源浪费和性能问题。
100 3
|
6月前
|
Cloud Native Java Nacos
微服务时代的新宠儿!Spring Cloud Nacos实战指南,带你玩转服务发现与配置管理,拥抱云原生潮流!
【8月更文挑战第29天】Spring Cloud Nacos作为微服务架构中的新兴之星,凭借其轻量、高效的特点,迅速成为服务发现、配置管理和治理的首选方案。Nacos(命名和配置服务)由阿里巴巴开源,为云原生应用提供了动态服务发现及配置管理等功能,简化了服务间的调用与依赖管理。本文将指导你通过五个步骤在Spring Boot项目中集成Nacos,实现服务注册、发现及配置动态管理,从而轻松搭建出高效的微服务环境。
337 0
|
5月前
|
负载均衡 Java 网络架构
实现微服务网关:Zuul与Spring Cloud Gateway的比较分析
实现微服务网关:Zuul与Spring Cloud Gateway的比较分析
227 5
|
5月前
|
Java API 对象存储
微服务魔法启动!Spring Cloud与Netflix OSS联手,零基础也能创造服务奇迹!
这段内容介绍了如何使用Spring Cloud和Netflix OSS构建微服务架构。首先,基于Spring Boot创建项目并添加Spring Cloud依赖项。接着配置Eureka服务器实现服务发现,然后创建REST控制器作为API入口。为提高服务稳定性,利用Hystrix实现断路器模式。最后,在启动类中启用Eureka客户端功能。此外,还可集成其他Netflix OSS组件以增强系统功能。通过这些步骤,开发者可以更高效地构建稳定且可扩展的微服务系统。
81 1
|
5月前
|
缓存 Java 应用服务中间件
随着微服务架构的兴起,Spring Boot凭借其快速开发和易部署的特点,成为构建RESTful API的首选框架
【9月更文挑战第6天】随着微服务架构的兴起,Spring Boot凭借其快速开发和易部署的特点,成为构建RESTful API的首选框架。Nginx作为高性能的HTTP反向代理服务器,常用于前端负载均衡,提升应用的可用性和响应速度。本文详细介绍如何通过合理配置实现Spring Boot与Nginx的高效协同工作,包括负载均衡策略、静态资源缓存、数据压缩传输及Spring Boot内部优化(如线程池配置、缓存策略等)。通过这些方法,开发者可以显著提升系统的整体性能,打造高性能、高可用的Web应用。
89 2
|
5月前
|
Java 对象存储 开发者
微服务世界的双雄争霸:Spring Cloud与Netflix OSS——谁将引领下一次企业级应用变革的风暴?
Spring Cloud与Netflix OSS是微服务架构的核心组件集,分别以其与Spring Boot的紧密集成及为大规模分布式系统设计的特性,在Java开发社区中广受青睐。前者通过Eureka提供服务发现机制,简化服务注册与定位;后者借助Hystrix增强系统弹性和可靠性,避免雪崩效应。此外,二者还包含负载均衡(Ribbon)、声明式HTTP客户端(Feign)及API网关(Zuul)等功能,共同构建强大微服务体系,助力开发者聚焦业务逻辑,提升系统灵活性与性能。
71 0