开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):批量消息发送】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12476
批量消息发送
内容介绍:
一.什么是批量消息发送
二.批量消息发送的业务逻辑
三.消息打包的过程
一.批量消息发送
批量消息发送,顾名思义就是一次性的给M Q当中发送多条消息。这个多条消息有一个条件就是它们都是同一个主题的,将同一个主题的多条消息给一起打包发送到消息服务端。
批量发送消息,它的优点是可以减少网络调用的次数,提高网络传输的效率。就是原来要发十条单条消息,现在可以把它们打包成一条消息向外去发送。
那么是不是以后都用这个批量消息去发送的就比较好?其实也并不是。
因为同一批次中发送的消息的数量不是越多越好,它在打包时,如果单条消息的内容比较长,那打包的时候会耗费非常多的时间,会影响到其他线程发送消息的响应时间,而且单批次消息的总长度不能超过默认的消息的总长度,总长度是四兆。
所以说不是批量消息的数量发的越多越好。
整个的批量消息发送时,它最核心的处理的方式就是将这多条消息打包成一条消息,打包成一条消息之后,整个消息发送的流程就可以附用上面消息发送的流程了。
二.业务逻辑
找一下有一个Send collection方法
在这个方法里它调用的就是在之前的send方法。代码如下所示:Collectio<Message> nsgs) throus MQClientException,RemotingException, MQBrokerException, InterruptedException{ return this. defaultMQProducerImpl.send(batch(msgs))
只不过现在对于集合消息的collection的集合先进了一个batch,这个batch就是进行打包。代码如下:
PrivateMessageBatchBatch(collection<Message>msgs)throwsMQclientExcition
三.消息打包的过程
首先将这些消息封装到了一个message batch当中,如下图:
这个message batch里它维护了一个集合。
代码如下所示:
Private final List<Message>message;
其实它继承了message,也是message的实例。给它创建了一个message batch之后,便利它里面的每一个message。
批量消息在发的时候也要保证单条消息的合法性,而且要去设置每一条单条消息的I D,绑定它的topic。
代码如下所示:
Message.setTopic(withNamespace(Message.getTopic()));
要注意的是,在这里要对这个消息进行编码,如下代码所示:
MsgBatch,setbody(msgbatch.encode))
进行编码的目的是要减小它的数量,也就是减小它的大小。
整个的编法完成之后,把它放到body当中,如下代码所示:
msgBatch. setbody(msgbatch.encode))
对于整个的message batch再去设置一个topic,如下代码所示:
msgBatch.setTopic(withNamespace(msgBatch.getTopic)));
然后去返回,如下代码所示:
Return msgBatch
返回到这里:
Return this .defaultMQproducerImpl,serd(batch(msgs))
返回之后的业务逻辑,是和发送单条消息是一模一样的。
整个的批量消息发送,它主要的核心就是在batch方法这里,去对整个消息进行一个编码,然后将多条消息编码压缩成一条消息,就可以使用单条消息的业务逻辑去发送了。
以上是批量消息发送的基本过程。



