消息存储的流程|学习笔记

简介: 快速学习消息存储的流程

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息存储的流程】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12478


消息存储的流程

 

消息存储整个的入口在defaultmessagestore消息存储最核心的这个类当中

image.png

有一个put message这个方法,是在这里面去处理的。如下代码所示:

PublicPutMessageResult putMessage(MessageExBatch ,essageExtBatch)

这个消息存储的流程当中,rocket M Q为了提高消息存储的效率,首先将这个消息追加到内存当中去,并不会直接写入到磁盘,最后会通过专门的刷盘机制,再将内存当中的消息写到磁盘里面。

所以putMessages方法,最核心的是来看它如何将这个消息追加到内存当中。

消息存储流程的第一步是先去判断当前broker是不是slave,如果broker是slave代表当前这个broker是一个从节点,从节点是不能够去写的,所以先去做一个判断。

第二步判断一下消息主题的程度,包括消息属性的程度等等,这里其实都是在去校验当前这个broker接收到消息的合法性。如图所示:

image.png还做了一个校验,在os pagecatchebusy的一个处理,当前内存当中它是否可用,如果内存不可用,那么进行不了存储。所以前面都是在做校验的工作。

校验如果没有问题之后,那么真正消息写入内存的这个操作是通过commit log去处理的。

进到commit log boot message方法当中。在commit log这个方法当中,它先去设置消息存储时间的一个节点,对这个消息进行时间的设定,然后拿到消息存储的服务,它状态的一个服务。就是如下这行代码:Storestatsservicestorestatsservice=this.defaultMessagestore.gestore,getstorestatsservice

然后就是处理延迟消息的主题和队列,对于延迟消息来讲不需要立即去进行存储,所以以下代码位置进行了对应处理。

If(messageExtBatch.getDelayTimeLevel() > 0) {

Return new PutMessageResult(putMessageStatus.MESSAGE_ILLEGAL,appendMessageResult:null);

如果当前的是延迟消息,就要进行一个特殊的处理。

现在要去存储这个消息了,rocket M Q它用的是这种文件内存映射的机制,首先是拿到了这个文件存储的commit log所对应的mapped file的映射文件,拿到最后一个映射文件。如下代码所示:

MappedFile mappedFile=this.mappledFileQueue.getLastMappedFile()

拿到这个映射文件之后从如下图所示的这个地方开始,去给这个映射文件进行存储。

image.png

对于如下图上面来看:

image.png

整个的存储的过程当中它是一个同步的,并且在这里是加锁的状态:

如下代码所示:

Put Message?Lock.lock()

同步之后是为了多线程情况之下,保证线程的安全。消息的存储拿到这个mapped file之后先去判断了一下,看这个mapped file是不是为空,或者它是不是已经写满,如果为空或者写满都要重新的去创建一个新的。

If(null == mappedFile || mappedFile.isFull()) {

mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset:0);}

如果没有问题就开始去进行消息的追加,把它追加到这个内存映射文件当中去。在mapped file这里面调用了它的一个open messages这个方法,它的这个方法在这里面又调用了一个append message inner这个方法。

最终其实用的就是mapped file这里面的append message inner这个方法再进行消息的一个追加。

它的追加的过程中,是先去获得了消息的写的指针,看现在是往哪个位置去写,下图就是拿到它写的位置。如下代码所示:

Int currentPos=this.wrotePosition.ger()

If(currentPos<this.fileSize)

如果当前这个指针小于文件的总的大小,那说明当前没有写满,这个写的操作是合法的。如果走到这一步发现这个写的这个指针已经大于并且超过文件的总大小了,就不能去写了。

如下图:

image.png在这里面,在整个过程当中首先将这个消息,将它写入到缓冲区里,byteBuffer它其实就是在对外申请了一个缓冲区,然后通过这个cb.doAppend真正去完成追加消息的处理。

cb.doAppend里。它首先拿到当前写的位置。

Long wroteoffest=fileFromoffest+byteBuffer.positon()

然后又设置了一个消息的I D,如下图所示:

就在之前开头图上的这一步:

image.png将这个获得消息在队列当中设一个偏移量。现在要往哪个位置上去写就要把这个位置确定下来。

Long  queueOffset = CommitLog.this.topicQueueTable.get(key);

因为commit log这个映射文件,它有可能是已经写了一半了,要得知道往哪个位置去写。这就是拿到这个消息后写入的一个偏移量。

所以整个在这个下图中就开始写,如图所示:

image.png

注意:比如写的这个偏移量它应该是在这里,但是整个消息的长度如果是这么长。就是现在从左到右这里面已经写满了。如果消息的总长度是这么长,而消息的总长度是这么大。那么这个消息肯定是写不到里面的所以在真正在去写入之前,它又去对整个长度进行了处理。示意图如下图所示:

image.png

计算消息的长度再计算整个消息的总长度,判断一下,消息的总长度如果超过了文件剩余的大小,那么就要去创建新文件了。

从如下代码这个位置开始:

if (propertiesLength> Short MAX-VALUE)

If(

拿出这个属性的集合,再去算出属性的长度,这是topic topic的长度

如下代码所示:

final byte[] topicData =msgInner.getTopic(). getBytes(MessageDecoder.CHARSET UTF8);

这是body的长度,如下代码所示:

final int bodyLength=msgInner. getBody()==null: msgInner.getBody().longth; 

这是消息的长度

如下代码所示:

final int msglencaLMsglength(bodyLength, topiclength, propertieslength) 

通过bodyLength这个方法计算总长度,如果总长度大于剩余的文件的大小,那么就让它在如下图位置中重新再创建一个新的文件,然后再去写。

给byteBuffer当中写入这个消息,写0到msgLen,如下代码 :

byteBuffer.put(this.msgstoreItemMemory.array(), offset:0 msgLen

整个过程是它把这个消息真正的写入到了内存里,并没有直接把它写入到这个词盘里。

image.png

那么对于图上面来看,就是在如下这个位置。

image.png

它写完之后去更新了一下消息队列的一个偏移量。

在如下这个位置

If((totalMsgLen+END FILE MIN BLANK LENGTH)>maxBlank)

在这里看到的就是消息的长度,如果大于总的长度,要去进行新的commitlog文件的创建。如果都没有问题,就开始去写。写完之后它对于整个消息的偏移量进行了更新的处理。

整个这个位置就会通过doAppend回调,整个写完之后就可以去返回。它是从以下这个位置回调的:result = cb.doAppEend(this . getFileFromOffset(), byteBuffer,maxBlank; this. filesize . currentPos, (MessageExtBrokerInne <

在cb.doAppend,它整个写完后将这个结果返回给mappedfile,mappedfile再将这个结果返回给commitlog。

result = mappedFile.appendMessages(messageExtBatch,this.appendMessageCallBack);

来找一下commit log能够看到mappedfile再从下图找mappedfile去写。

image.png写完之后,mappedfile就把结果返回来了,可以发现整个方法就已经完成了对内存的追加。追加完了后整个过程下面还有两个代码。如下代码所示:result = cb.doAppEend(this . getFileFromOffset(), byteBuffer,maxBlank; this. filesize . currentPos, (MessageExtBrokerInne <

这两个代码是在刷盘。

之前所看的整个追加的过程,其实仅仅只是仅仅将消息追加到内存当中去。真正的写到磁盘要在它追加完了后得通过handle DiskFlush这个方法去进行刷盘的处理。代码如下所示:

handleDiskFlush(result, putMessageResult, messageExtBatch);

以上是文件存储的流程。

相关文章
|
22天前
|
数据采集 人工智能 供应链
2025年适合汽车行业与互联网企业的BI产品选型指南
2025年,数字化转型加速,BI工具成企业决策核心。本文对比瓴羊Quick BI、Power BI、Tableau、永洪科技、Domo五大主流产品,从能力、行业适配、案例等维度解析,重点推荐阿里云旗下瓴羊Quick BI,其在汽车与互联网行业表现突出,兼具AI分析、高性能计算与信创合规优势,助力企业实现数据价值最大化。
|
消息中间件 存储 XML
Kettle实现rabbitMQ的生产与消费
文章目录 一、Kettle为什么可以读取流数据? 二、rabbitMQ中启动MQTT插件并创建队列和路由键 三、Kettle实现rabbitMQ的生产与消费 Kettle是一款非常强大的ETL工具,不仅可以使用图形化界面,还可以处理各种数据,今天记录一下本人使用Kettle中MQTT组件来实现从rabbitMQ中读取流数据,并进行解析和处理。 提示:以下是本篇文章正文内容,下面案例可供参考
|
缓存 NoSQL Java
避免缓存失效的三大杀手:缓存击穿、穿透与雪崩的解决方案
避免缓存失效的三大杀手:缓存击穿、穿透与雪崩的解决方案
1701 0
|
Linux
Linux命令行文档查看cat、less、more、head、tail和图片查看
Linux命令行文档查看cat、less、more、head、tail和图片查看
312 0
|
Python
python字符串的拼接和拆分,看这一篇就够了
python字符串的拼接和拆分,看这一篇就够了
411 0
|
SQL IDE 前端开发
神器你值得拥有——CoolFormat代码一键自动格式化工具,支持Verilog
神器你值得拥有——CoolFormat代码一键自动格式化工具,支持Verilog
784 0
神器你值得拥有——CoolFormat代码一键自动格式化工具,支持Verilog
|
Java 应用服务中间件 Maven
如何在 IDEA 中配置 tomcat,运行 web 项目?
如何在 IDEA 中配置 tomcat,运行 web 项目?对这个问题找了很久的答案,各种参差不全,其实非常的简单了,网上很多人的文章搞得很复杂,完全没必要。下面是我的采坑记录,写下来帮助大家。首先,需要确保: • JDK 环境已经安装 • Tomcat 已经下载好(不需要配置环境变量,下载一个压缩包解压出来就行)
951 0
如何在 IDEA 中配置 tomcat,运行 web 项目?
img图片下方出现空隙的原因及解决办法
img图片下方出现空隙的原因及解决办法
473 0
|
存储 JSON NoSQL
暂缓MongoDB 4.4.2 、4.4.3、 4.4.4版本升级: 存在严重Bug
暂缓MongoDB 4.4.2 、4.4.3、 4.4.4版本升级: 存在严重Bug
416 0
暂缓MongoDB 4.4.2 、4.4.3、 4.4.4版本升级: 存在严重Bug
|
Java
Java 水仙花数(解析说明)
水仙花数也被称为超完全数字不变数、自恋数、自幂数、阿姆斯壮数或阿姆斯特朗数,水仙花数是指一个 3 位数,它的每个位上的数字的 3次幂之和等于它本身(例如:13+53+33=153)
658 0