这篇文章我准备来聊一聊RocketMQ消息的一生。
不知你是否跟我一样,在使用RocketMQ的时候也有很多的疑惑:
- 消息是如何发送的,队列是如何选择的?
- 消息是如何存储的,是如何保证读写的高性能?
- RocketMQ是如何实现消息的快速查找的?
- RocketMQ是如何实现高可用的?
- 消息是在什么时候会被清除?
- ...
本文就通过探讨上述问题来探秘消息在RocketMQ中短暂而又精彩的一生。
核心概念
- NameServer :可以理解为是一个注册中心,主要是用来保存topic路由信息,管理Broker。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的。
- Broker :核心的一个角色,主要是用来保存消息的,在启动时会向NameServer进行注册。Broker实例可以有很多个,相同的BrokerName可以称为一个Broker组,每个Broker组只保存一部分消息。
- topic :可以理解为一个消息的集合的名字,一个topic可以分布在不同的Broker组下。
- 队列(queue) :一个topic可以有很多队列,默认是一个topic在同一个Broker组中是4个。如果一个topic现在在2个Broker组中,那么就有可能有8个队列。
- 生产者 :生产消息的一方就是生产者
- 生产者组 :一个生产者组可以有很多生产者,只需要在创建生产者的时候指定生产者组,那么这个生产者就在那个生产者组
- 消费者 :用来消费生产者消息的一方
- 消费者组 :跟生产者一样,每个消费者都有所在的消费者组,一个消费者组可以有很多的消费者,不同的消费者组消费消息是互不影响的。
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
消息诞生与发送
我们都知道,消息是由业务系统在运行过程产生的,当我们的业务系统产生了消息,我们就可以调用RocketMQ提供的API向RocketMQ发送消息,就像下面这样
DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer"); //指定NameServer的地址 producer.setNamesrvAddr("localhost:9876"); //启动生产者 producer.start(); //省略代码。。 Message msg = new Message("sanyouTopic", "TagA", "三友的java日记 ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息并得到消息的发送结果,然后打印 SendResult sendResult = producer.send(msg);
虽然代码很简单,我们不经意间可能会思考如下问题:
- 代码中只设置了NameServer的地址,那么生产者是如何知道Broker所在机器的地址,然后向Broker发送消息的?
- 一个topic会有很多队列,那么生产者是如何选择哪个队列发送消息?
- 消息一旦发送失败了怎么办?
路由表
当Broker在启动的过程中,Broker就会往NameServer注册自己这个Broker的信息,这些信息就包括自身所在服务器的ip和端口,还有就是自己这个Broker有哪些topic和对应的队列信息,这些信息就是路由信息,后面就统一称为路由表。
Broker向NameServer注册
当生产者启动的时候,会从NameServer中拉取到路由表,缓存到本地,同时会开启一个定时任务,默认是每隔30s从NameServer中重新拉取路由信息,更新本地缓存。
队列的选择
好了通过上一节我们就明白了,原来生产者会从NameServer拉取到Broker的路由表的信息,这样生产者就知道了topic对应的队列的信息了。
但是由于一个topic可能会有很多的队列,那么应该将消息发送到哪个队列上呢?
面对这种情况,RocketMQ提供了两种消息队列的选择算法。
- 轮询算法
- 最小投递延迟算法
轮询算法 就是一个队列一个队列发送消息,这些就能保证消息能够均匀分布在不同的队列底下,这也是RocketMQ默认的队列选择算法。
但是由于机器性能或者其它情况可能会出现某些Broker上的Queue可能投递延迟较严重,这样就会导致生产者不能及时发消息,造成生产者压力过大的问题。所以RocketMQ提供了最小投递延迟算法。
最小投递延迟算法 每次消息投递的时候会统计投递的时间延迟,在选择队列的时候会优先选择投递延迟时间小的队列。这种算法可能会导致消息分布不均匀的问题。
如果你想启用最小投递延迟算法,只需要按如下方法设置一下即可。
producer.setSendLatencyFaultEnable(true);
当然除了上述两种队列选择算法之外,你也可以自定义队列选择算法,只需要实现MessageQueueSelector接口,在发送消息的时候指定即可。
SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //从mqs中选择一个队列 return null; } }, new Object());
MessageQueueSelector RocketMQ也提供了三种实现
- 随机算法
- Hash算法
- 根据机房选择算法(空实现)
其它特殊情况处理
发送异常处理
终于,不论是通过RocketMQ默认的队列选择算法也好,又或是自定义队列选择算法也罢,终于选择到了一个队列,那么此时就可以跟这个队列所在的Broker机器建立网络连接,然后通过网络请求将消息发送到Broker上。
但是不幸的事发生了,Broker挂了,又或者是机器负载太高了,发送消息超时了,那么此时RockerMQ就会进行重试。
RockerMQ重试其实很简单,就是重新选择其它Broker机器中的一个队列进行消息发送,默认会重试两次。
当然如果你的机器比较多,可以将设置重试次数设置大点。
producer.setRetryTimesWhenSendFailed(10);
消息过大的处理
一般情况下,消息的内容都不会太大,但是在一些特殊的场景中,消息内容可能会出现很大的情况。
遇到这种消息过大的情况,比如在默认情况下消息大小超过4M的时候,RocketMQ是会对消息进行压缩之后再发送到Broker上,这样在消息发送的时候就可以减少网络资源的占用。
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
消息存储
好了,经过以上环节Broker终于成功接收到了生产者发送的消息了,但是为了能够保证Broker重启之后消息也不丢失,此时就需要将消息持久化到磁盘。
如何保证高性能读写
由于涉及到消息持久化操作,就涉及到磁盘数据的读写操作,那么如何实现文件的高性能读写呢?这里就不得不提到的一个叫零拷贝的技术。
传统IO读写方式
说零拷贝之前,先说一下传统的IO读写方式。
比如现在需要将磁盘文件通过网络传输出去,那么整个传统的IO读写模型如下图所示
传统的IO读写其实就是read + write的操作,整个过程会分为如下几步
- 用户调用read()方法,开始读取数据,此时发生一次上下文从用户态到内核态的切换,也就是图示的切换1
- 将磁盘数据通过DMA拷贝到内核缓存区
- 将内核缓存区的数据拷贝到用户缓冲区,这样用户,也就是我们写的代码就能拿到文件的数据
- read()方法返回,此时就会从内核态切换到用户态,也就是图示的切换2
- 当我们拿到数据之后,就可以调用write()方法,此时上下文会从用户态切换到内核态,即图示切换3
- CPU将用户缓冲区的数据拷贝到Socket缓冲区
- 将Socket缓冲区数据拷贝至网卡
- write()方法返回,上下文重新从内核态切换到用户态,即图示切换4
整个过程发生了4次上下文切换和4次数据的拷贝,这在高并发场景下肯定会严重影响读写性能。
所以为了减少上下文切换次数和数据拷贝次数,就引入了零拷贝技术。
零拷贝
零拷贝技术是一个思想,指的是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。
实现零拷贝的有以下几种方式
- mmap()
- sendfile()
mmap()
mmap(memory map)是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。
简单地说就是内核缓冲区和应用缓冲区共享,从而减少了从读缓冲区到用户缓冲区的一次CPU拷贝。
比如基于mmap,上述的IO读写模型就可以变成这样。
基于mmap IO读写其实就变成mmap + write的操作,也就是用mmap替代传统IO中的read操作。
当用户发起mmap调用的时候会发生上下文切换1,进行内存映射,然后数据被拷贝到内核缓冲区,mmap返回,发生上下文切换2;随后用户调用write,发生上下文切换3,将内核缓冲区的数据拷贝到Socket缓冲区,write返回,发生上下文切换4。
整个过程相比于传统IO主要是不用将内核缓冲区的数据拷贝到用户缓冲区,而是直接将数据拷贝到Socket缓冲区。上下文切换的次数仍然是4次,但是拷贝次数只有3次,少了一次CPU拷贝。
在Java中,提供了相应的api可以实现mmap,当然底层也还是调用Linux系统的mmap()实现的
FileChannel fileChannel = new RandomAccessFile("test.txt", "rw").getChannel(); MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileChannel.size());
如上代码拿到MappedByteBuffer,之后就可以基于MappedByteBuffer去读写。
sendfile()
sendfile()跟mmap()一样,也会减少一次CPU拷贝,但是它同时也会减少两次上下文切换。
如图,用户在发起sendfile()调用时会发生切换1,之后数据通过DMA拷贝到内核缓冲区,之后再将内核缓冲区的数据CPU拷贝到Socket缓冲区,最后拷贝到网卡,sendfile()返回,发生切换2。
同样地,Java也提供了相应的api,底层还是操作系统的sendfile()
FileChannel channel = FileChannel.open(Paths.get("./test.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE); //调用transferTo方法向目标数据传输 channel.transferTo(position, len, target);
通过FileChannel的transferTo方法即可实现。
transferTo方法(sendfile)主要是用于文件传输,比如将文件传输到另一个文件,又或者是网络。
在如上代码中,并没有文件的读写操作,而是直接将文件的数据传输到target目标缓冲区,也就是说,sendfile是无法知道文件的具体的数据的;但是mmap不一样,他是可以修改内核缓冲区的数据的。假设如果需要对文件的内容进行修改之后再传输,只有mmap可以满足。
通过上面的一些介绍,主要就是一个结论,那就是基于零拷贝技术,可以减少CPU的拷贝次数和上下文切换次数,从而可以实现文件高效的读写操作。
RocketMQ内部主要是使用基于mmap实现的零拷贝(其实就是调用上述提到的api),用来读写文件,这也是RocketMQ为什么快的一个很重要原因。
RocketMQ中使用mmap代码
CommitLog
前面提到消息需要持久化到磁盘文件中,而CommitLog其实就是存储消息的文件的一个称呼,所有的消息都存在CommitLog中,一个Broker实例只有一个CommitLog。
由于消息数据可能会很大,同时兼顾内存映射的效率,不可能将所有消息都写到同一个文件中,所以CommitLog在物理磁盘文件上被分为多个磁盘文件,每个文件默认的固定大小是1G。
当生产者将消息发送过来的时候,就会将消息按照顺序写到文件中,当文件空间不足时,就会重新建一个新的文件,消息写到新的文件中。
消息在写入到文件时,不仅仅会包含消息本身的数据,也会包含其它的对消息进行描述的数据,比如这个消息来自哪台机器、消息是哪个topic的、消息的长度等等,这些数据会和消息本身按照一定的顺序同时写到文件中,所以图示的消息其实是包含消息的描述信息的。
刷盘机制
RocketMQ在将消息写到CommitLog文件中时并不是直接就写到文件中,而是先写到PageCache,也就是前面说的内核缓存区,所以RocketMQ提供了两种刷盘机制,来将内核缓存区的数据刷到磁盘。
异步刷盘
异步刷盘就是指Broker将消息写到PageCache的时候,就直接返回给生产者说消息存储成功了,然后通过另一个后台线程来将消息刷到磁盘,这个后台线程是在RokcetMQ启动的时候就会开启。异步刷盘方式也是RocketMQ默认的刷盘方式。
其实RocketMQ的异步刷盘也有两种不同的方式,一种是固定时间,默认是每隔0.5s就会刷一次盘;另一种就是频率会快点,就是每存一次消息就会通知去刷盘,但不会去等待刷盘的结果,同时如果0.5s内没被通知去刷盘,也会主动去刷一次盘。默认的是第一种固定时间的方式。
同步刷盘
同步刷盘就是指Broker将消息写到PageCache的时候,会等待异步线程将消息成功刷到磁盘之后再返回给生产者说消息存储成功。
同步刷盘相对于异步刷盘来说消息的可靠性更高,因为异步刷盘可能出现消息并没有成功刷到磁盘时,机器就宕机的情况,此时消息就丢了;但是同步刷盘需要等待消息刷到磁盘,那么相比异步刷盘吞吐量会降低。所以同步刷盘适合那种对数据可靠性要求高的场景。
如果你需要使用同步刷盘机制,只需要在配置文件指定一下刷盘机制即可。








