MQ系列5:RocketMQ消息的发送模式

简介: MQ系列5:RocketMQ消息的发送模式

在之前的篇章中,我们学习了RocketMQ的原理,以及RocketMQ中 命名服务 ServiceName 的运行流程,本篇从消息的生产、消费来理解一条消息的生命周期。

1 消息生产

在RocketMQ中,消息生产指的是 消息生产者往消息队列中写入数据的过程。因为业务场景的复杂性,RocketMQ架构设计了多种不同的发送策略。下面先讨论几种常见的场景:

  • 同步发送: 整个过程业务是阻塞等待的,消息发送之后等待 Broker 响应,得到响应结果之后再传递给业务线程。
  • 异步发送: 调用RocketMQ 的 Async API,消息生产者只要把消息发送任务放进线程池就返回给业务线程。所有的逻辑处理、IO操作、网络请求 都由线程池处理,处理完成之后,调用业务程序定义好的回调函数来告知业务最终的结果。
  • OneWay(单向)发送: 只负责触发对消息的发送,发送出即完成任务,不需要对发送的状态、结果负责。
  • 延迟发送: 指定延迟的时间,在延迟时间到达之后再进行消息的发送。
  • 批量发送: 对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。

以下是生产者实例化启动的过程:

1.1 消息发送步骤

一般情况下,我们发送消息,会使用默认的DefaultMQProducer类,经过以下几个步骤实现:

  • 创建消息生产者Producer,并设置Producer的GroupName(生产组)。
  • 设置InstanceName(实例名称),当你的业务需要启用多个Producer的时候,使用不同的InstanceName来区分。
  • 设置NameServer地址,这样Producer才能从NameServer中得到路由信息
  • 完成其他的初始化配置,比如配置异常重试次数(降低消息丢失的可能性),通信模块初始化等。
  • 组装消息对象,指定主题Topic、Tag和消息体Message 等信息。
  • 通过NameServer获取到的Broker路由地址,将消息发送。

1.2 消息发生返回状态

消息发送之后,会相应的拿到回执。返回对象中的状态(SendResult.SendStatus)有4种,如下:

  • FLUSH_DISK_TIMEOUT 刷盘超时

如果将Broker的刷盘策略设置成

SYNC_FLUSH,那么没有在规定的时间完成刷盘则会报该错误。

  • FLUSH_SLAVE_TIMEOUT 主从同步超时

主从模式下(也可以叫主备),Broker配置为

SYNC_MASTER模式,如果没有在设定时间内完成主从同步,则会报该错误。

  • SLAVE_NOT_AVAILABLE 未找到Slave Broker

主从模式下,且Broker配置为

SYNC_MASTER,如果未找到Slave的Broker,则会报该错误。

  • SEND_OK

表示发送成功。

1.3 发送同步消息

实时同步消息是一种对可靠性、实时性要求比较高的场景,使用的也比较广泛,比如:

  • 重要的消息通知,比如验证码,不能超过太长时间推送,那样可能失效
  • 消费记录确认
  • 数据实时处理和推送 等等

public class SyncProducerApplication {

   public static void main(String[] args) throws Exception {

       // 1、创建生产者producer,并指定生产者组名为 testSyncGroup

       DefaultMQProducer producer = new DefaultMQProducer("testSyncGroup");

       // 2、指定NameServer的地址,以获取Broker路由地址

       producer.setNamesrvAddr("192.168.139.1:9876");

       // 3、启动producer

       producer.start();

       // 4、创建消息,并指定Topic,Tag和消息体

       Message msg = new Message("testTopic","sync", "测试同步消息".getBytes("UTF-8"));

       // 5、发送消息到一个Broker

       SendResult sendResult = producer.send(msg);

       // 6、通过sendResult返回消息是否成功送达

       System.out.printf("%s%n", sendResult);

       // 7、如果不再发送消息,关闭生产者Producer

       producer.shutdown();

   }

}

image.png

1.4 发送异步消息

我们知道,异步主要用于那些对实时响应不敏感的业务,可以容忍一定时间的等待,只要能达到最终一致性即可。

有时候为了在流量高峰期进行削峰和分流,缓解压力,我们经常采用异步消息的发送模式。这种业务场景也很常见,比如:

  • 消费信息的推送,可能在你买单之后的几分钟才送达
  • 数据统计、文件打包下载等需要长耗时的任务

public class AsyncProducerApplication {

   public static void main(String[] args) throws Exception {

       // 1、创建生产者producer,并指定生产者组名为 testAsyncGroup

       DefaultMQProducer producer = new DefaultMQProducer("testAsyncGroup");

       // 2、指定NameServer的地址,以获取Broker路由地址

       producer.setNamesrvAddr("192.168.139.1:9876");

       // 3、启动producer

       producer.start();

       // 4、创建消息,并指定Topic,Tag和消息体

       Message msg = new Message("testTopic","async", "测试异步消息".getBytes("UTF-8"));

       // 5、发送消息到一个Broker

       SendResult sendResult = producer.send(msg);

       // 6. 发送异步消息,SendCallback是处理异步回调的方法

       producer.send(msg, new SendCallback() {

           @Override

           public void onSuccess(SendResult sendResult) {  // 成功回调

               System.out.println("success: " + sendResult);

           }

           @Override

           public void onException(Throwable throwable) {  // 失败回调

               System.out.println("fail: " + throwable);

           }

       });

       // 7、如果不再发送消息,关闭生产者Producer

       producer.shutdown();

   }

}

image.png

1.5 单向发送消息

OneWay的模式主要用在Care发送结果的场景,只要消息发送出去即完成任务,不需要对发送的状态、结果负责。常见的使用场景如

  • 普通日志记录
  • 非核心的埋点上报等

public class OneWayProducerApplication {

   public static void main(String[] args) throws Exception {

       // 1、创建生产者producer,并指定生产者组名为 testOneWayGroup

       DefaultMQProducer producer = new DefaultMQProducer("testOneWayGroup");

       // 2、指定NameServer的地址,以获取Broker路由地址

       producer.setNamesrvAddr("192.168.139.1:9876");

       // 3、启动producer

       producer.start();

       // 4、创建消息,并指定Topic,Tag和消息体

       Message msg = new Message("testTopic","oneway", "测试单向发送消息".getBytes("UTF-8"));

       // 5、发送消息到一个Broker

       producer.sendOneway(msg);

       // 6、如果不再发送消息,关闭生产者Producer

       producer.shutdown();

   }

}

1.6 发送延时消息

指定延迟的时间,在延迟时间到达之后再进行消息的发送。这种的使用场景也很多:

  • 比如火车票订购,提交了一个订单就把车票给占位了,这时候可以发送一个延时确认的消息,15m 未付款,就要把该车票释放,让其他人去购买。
  • 还比如购买了电影票,可以发送一个核销信息,在电影开场前15分钟就无法退票了。

1.6.1 延时时间的使用限制

延时时间并不是随意指定的,Rocket源码中指定了18种等级,分别代表不同的时间时长,如下:

// org/apache/rocketmq/store/config/MessageStoreConfig.java 的第198行 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

  • RocketMq不支持任意时间延时,需设置固定的延时等级,从1s到2h分别对应着等级1到18
  • 可以使用setDelayTimeLevel(int level) 方法设置延时等级,level 从 0 开始

1.6.2 发送延时消息具体实现

通过下面的代码,可以得到的结果是消费的时间点比信息记录的时间点延迟了1分钟,这是因为我们在send的时候做了delay。

public class DelayProducerApplication {

   public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException , UnsupportedEncodingException {

       // 1、创建生产者producer,并指定生产者组名为 testDelayGroup

       DefaultMQProducer producer = new DefaultMQProducer("testDelayGroup");

       // 2、指定NameServer的地址,以获取Broker路由地址

       producer.setNamesrvAddr("192.168.139.1:9876");

       // 3、启动producer

       producer.start();

       // 4、创建消息,并指定Topic,Tag和消息体

       Message msg = new Message("testTopic","delay", "测试延迟发送消息".getBytes("UTF-8"));

       // 5、设置延时等级4,对应1m,所以这个消息在一分钟之后发送

       msg.setDelayTimeLevel(4);

       // 6、发送消息到一个Broker

       SendResult sendResult = producer.send(msg);

       // 7、通过sendResult返回消息是否成功送达

       System.out.printf("%s%n", sendResult);

       // 8、如果不再发送消息,关闭生产者Producer

       producer.shutdown();

   }

}

image.png

1.7 发送批量消息

  • 对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。
  • 批量发送消息须有相同的topic,相同的waitStoreMsgOK,且不能是延时消息。

waitStoreMsgOK: 消息发送时是否等消息存储完成后再返回。

  • 一批次的消息总大小不应超过4MB。

public class BatchProducerApplication {

   public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {

       // 1、创建生产者producer,并指定生产者组名为 testBatchGroup

       DefaultMQProducer producer = new DefaultMQProducer("testBatchGroup");

       // 2、指定NameServer的地址,以获取Broker路由地址

       producer.setNamesrvAddr("192.168.139.1:9876");

       // 3、启动producer

       producer.start();

       // 4、创建消息列表,并指定Topic,Tag和消息体

       List<Message> messages = new ArrayList<>();

       String topic = "testTopic";

       messages.add(new Message(topic, "batch", "测试批量发送消息 0".getBytes("UTF-8")));

       messages.add(new Message(topic, "batch", "测试批量发送消息 1".getBytes("UTF-8")));

       messages.add(new Message(topic, "batch", "测试批量发送消息 2".getBytes("UTF-8")));

       // 5、发送消息到一个Broker

       SendResult sendResult = producer.send(messages);

       // 6、通过sendResult返回消息是否成功送达

       System.out.printf("%s%n", sendResult);

       // 7、如果不再发送消息,关闭生产者Producer

       producer.shutdown();

   }

}

1.8 如何提升消息生产的性能

消息的发送一般是经过 client发送、Broker服务器接收并处理、Broker服务器返回应答 三个步骤。

如果我们想要提高消息生产的效率,一般有如下方法:

  • Oneway方式发送

Oneway方式发送用在一些性能要求高,可靠性要求低的场景下,比如日志采集,非核心的埋点上报等。Oneway方式发送请求无需应答,即将数据写入客户端的Socket缓冲区就返回,不等待结果的返回。

所以这种模式是极快的,可以把发送消息时长缩短至微秒级。

  • 增加Producer的并发量,使用多个Producer同时发送

RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续数据写入文件系统。

顺序执行CommitLog让RocketMQ可以保持较高的写入性能。

  • 恰当的批量发送

对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。批量发送消息须有相同的topic,相同的waitStoreMsgOK,且不能是延时消息。

对于消息体的大小也要注意不能超过4MB。

根据阿里内部调优后的性能测试报告,消息的写入性能达到90万+的TPS,我们可以朝着这个指标进行优化。

2 总结

本篇介绍了RocketMQ 消息生产与发送的几种模式:

  • 同步发送:整个过程业务是阻塞等待的,消息发送之后等待 Broker 响应,得到响应结果之后再传递给业务线程。
  • 异步发送:调用RocketMQ 的 Async API,消息生产者只要把消息发送任务放进线程池就返回给业务线程。所有的逻辑处理、IO操作、网络请求 都由线程池处理,处理完成之后,调用业务程序定义好的回调函数来告知业务最终的结果。
  • OneWay(单向)发送:只负责触发对消息的发送,发送出即完成任务,不需要对发送的状态、结果负责。
  • 延迟发送:指定延迟的时间,在延迟时间到达之后再进行消息的发送。
  • 批量发送:对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。

可以根据实际的业务场景选择适当的发送模式。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
4月前
|
消息中间件 Java Kafka
消息传递新纪元:探索RabbitMQ、RocketMQ和Kafka的魅力所在
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。其中,RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型;RocketMQ 则是由阿里巴巴开源的具备高性能、高可用性和高可靠性的分布式消息队列,支持事务消息等多种特性;而 Kafka 作为一个由 LinkedIn 开源的分布式流处理平台,以高吞吐量和良好的可扩展性著称。此外,还提供了使用这三种消息队列发送和接收消息的代码示例。总之,这三种消息队列各有优势,适用于不同的业务场景。
72 3
|
24天前
|
消息中间件 大数据 Kafka
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
本文深入探讨了消息队列的核心概念、应用场景及Kafka、RocketMQ、RabbitMQ的优劣势比较,大厂面试高频,必知必会,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
|
1月前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
91 1
|
2月前
|
消息中间件 数据采集 数据库
小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite
小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite
33 1
|
4月前
|
消息中间件 存储 监控
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别,设计目标、适用场景、吞吐量、消息存储和持久化、可靠性、集群负载均衡
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别
|
4月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
77 2
|
4月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
76 1
|
4月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
4月前
|
消息中间件 Java Maven
RabbitMQ通配符模式
RabbitMQ通配符模式
67 0