批量消息发送|学习笔记

简介: 快速学习批量消息发送

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)批量消息发送】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12476


批量消息发送

 

内容介绍:

一.什么是批量消息发送

二.批量消息发送的业务逻辑

三.消息打包的过程

 

一.批量消息发送

批量消息发送,顾名思义就是一次性的给M Q当中发送多条消息。这个多条消息有一个条件就是它们都是同一个主题的,将同一个主题的多条消息给一起打包发送到消息服务端。

批量发送消息,它的优点是可以减少网络调用的次数,提高网络传输的效率。就是原来要发十条单条消息,现在可以把它们打包成一条消息向外去发送。

那么是不是以后都用这个批量消息去发送的就比较好?其实也并不是。

因为同一批次中发送的消息的数量不是越多越好,它在打包时,如果单条消息的内容比较长,那打包的时候会耗费非常多的时间,会影响到其他线程发送消息的响应时间,而且单批次消息的总长度不能超过默认的消息的总长度,总长度是四兆。

所以说不是批量消息的数量发的越多越好。

整个的批量消息发送时,它最核心的处理的方式就是将这多条消息打包成一条消息,打包成一条消息之后,整个消息发送的流程就可以附用上面消息发送的流程了。

image.png


二.业务逻辑

找一下有一个Send collection方法

image.png在这个方法里它调用的就是在之前的send方法。代码如下所示:Collectio<Message> nsgs) throus MQClientException,RemotingException, MQBrokerException, InterruptedException{ return this. defaultMQProducerImpl.send(batch(msgs))

只不过现在对于集合消息的collection的集合先进了一个batch,这个batch就是进行打包。代码如下:

PrivateMessageBatchBatch(collection<Message>msgs)throwsMQclientExcition

 

三.消息打包的过程

首先将这些消息封装到了一个message batch当中,如下图:

image.png这个message batch里它维护了一个集合。

代码如下所示:

Private final List<Message>message;

其实它继承了message,也是message的实例。给它创建了一个message batch之后,便利它里面的每一个message。

批量消息在发的时候也要保证单条消息的合法性,而且要去设置每一条单条消息的I D,绑定它的topic。

代码如下所示:

Message.setTopic(withNamespace(Message.getTopic()));

要注意的是,在这里要对这个消息进行编码,如下代码所示:

MsgBatch,setbody(msgbatch.encode))

进行编码的目的是要减小它的数量,也就是减小它的大小。

整个的编法完成之后,把它放到body当中,如下代码所示:

msgBatch. setbody(msgbatch.encode))

对于整个的message batch再去设置一个topic,如下代码所示:

msgBatch.setTopic(withNamespace(msgBatch.getTopic)));

然后去返回,如下代码所示:

Return msgBatch

返回到这里:

Return this .defaultMQproducerImpl,serd(batch(msgs))

返回之后的业务逻辑,是和发送单条消息是一模一样的。

整个的批量消息发送,它主要的核心就是在batch方法这里,去对整个消息进行一个编码,然后将多条消息编码压缩成一条消息,就可以使用单条消息的业务逻辑去发送了。

image.png

以上是批量消息发送的基本过程。

相关文章
|
消息中间件 存储 监控
云消息队列 RocketMQ 版(原ONS)体验
云消息队列 RocketMQ 版(原ONS)是阿里云基于 Apache RocketMQ 构建的低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台。它在阿里集团内部业务、阿里云以及开源社区中得到广泛应用。最新的版本进一步优化了高可靠低延迟的特性,并提供了多场景容灾解决方案,使其成为金融级业务消息的首选方案。由于专业及能力问题,本次我只能从产品功能体验方面进行简单的一些分析。
1933 64
|
数据库 OceanBase 索引
在OceanBase数据库中,REPLACE INTO和insert update在效率上可能有所不同
【2月更文挑战第30天】在OceanBase数据库中,REPLACE INTO和insert update在效率上可能有所不同
788 1
|
存储 Java 数据格式
|
JSON 自然语言处理 Java
ElasticSearch 实现分词全文检索 - 搜素关键字自动补全(Completion Suggest)
ElasticSearch 实现分词全文检索 - 搜素关键字自动补全(Completion Suggest)
524 1
|
NoSQL Java Redis
如何在 Java 中操作这些 Redis 数据结构的基本方法
如何在 Java 中操作这些 Redis 数据结构的基本方法
162 2
|
Web App开发 缓存 前端开发
《手把手教你》系列技巧篇(四十四)-java+ selenium自动化测试-处理https 安全问题或者非信任站点-下篇(详解教程)
【5月更文挑战第8天】这篇文档介绍了如何在IE、Chrome和Firefox浏览器中处理不信任证书的问题。作者北京-宏哥分享了如何通过编程方式跳过浏览器的证书警告,直接访问不受信任的HTTPS网站。文章分为几个部分,首先简要介绍了问题背景,然后详细讲解了在Chrome浏览器中的两种方法,包括代码设计和运行效果,并给出了其他浏览器的相关信息和参考资料。最后,作者总结了处理此类问题的一些通用技巧。
521 2
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
【4月更文挑战第6天】Java并发编程中,线程池通过重用线程降低性能开销,控制并发级别。关键在于理解线程池工作原理:核心线程数、最大线程数、队列和拒绝策略。优化技巧包括合理设置线程池大小、选择合适队列、避免过度使用、自定义拒绝策略和正确关闭线程池。I/O密集型应用案例:大核心线程数、使用 `CachedThreadPool`、`LinkedBlockingQueue` 和定制拒绝策略。正确配置和管理线程池对提升应用性能至关重要。
861 3
|
存储 NoSQL Redis
Docker 安装 Redis 6.2.6
Docker 安装 Redis 6.2.6
1133 0
|
消息中间件 存储 监控
【消息中间件】详解mq消息积压
【消息中间件】详解mq消息积压
698 0
|
SQL Java 数据库连接
对 MyBatis Plus SaveBatch 调优提升25倍性能!!!
最近在压测一批接口,发现接口处理速度慢的有点超出预期,感觉很奇怪,后面定位发现是数据库批量保存这块很慢。这个项目用的是,批量保存直接用的是提供的 saveBatch。于是开始排查之路。所以如果有使用 jdbc 的 Batch 性能方面的需求,要将rewriteBatchedStatements 设置为 true,这样能提高很多性能。然后如果喜欢手动拼接 sql 要注意一次拼接的数量,分批处理。
1065 1