RocketMQ系列 | 全网最全的导致RocketMQ消息“丢失”的几个场景都在这了,肯定有你不知道!

简介: 发送时会丢失消息、消息存储场景丢失消息、消费时会丢失消息

RocketMQ 简介

RocketMQ 5.0:
云原生“消息、事件、流”实时数据处理平台,覆盖云边端一体化数据处理场景。


RocketMQ领域模型


如上图所示,Apache RocketMQ 中消息的生命周期主要分为消息生产消息存储消息消费这三部分。

生产者生产消息并发送至 Apache RocketMQ 服务端,消息被存储在服务端的主题[Topic]中,消费者通过订阅主题[Topic]消费消息。

消息生产

生产者(Producer):Apache RocketMQ 领域中用于产生消息的运行实体,一般集成于业务调用链路的上游。

消息存储

  • 主题(Topic):Apache RocketMQ 消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的。
  • 队列(MessageQueue):Apache RocketMQ 消息传输和存储的实际单元容器,类比于其他消息队列中的分区。Apache RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征。
  • 消息(Message):Apache RocketMQ 的最小传输单元。消息具备不可变性,在初始化发送和完成存储后即不可变。

消息消费

  • 消费者分组(ConsumerGroup):Apache RocketMQ 发布订阅模型中定义的独立的消费身份分组,用于统一管理底层运行的多个消费者(Consumer)。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。
  • 消费者(Consumer):Apache RocketMQ 消费消息的运行实体,一般集成在业务调用链路的下游。消费者必须被指定到某一个消费组中。
  • 订阅关系(Subscription):Apache RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。Apache RocketMQ 的订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留。


如何让“消息丢失”?

在“如何让消息丢失”之前,让我们梳理一下消息的生命周期,先对齐下整体的概念。

一条消息的历程

1、发送场景丢失消息

1.1 单向发送

/**     * 发送消息,Oneway形式,服务器不应答,     * 无法保证消息是否成功到达服务器     *     * @param message 要发送的消息     */voidsendOneway(finalMessagemessage);com.aliyun.openservices.ons.api.Producer#sendOneway


RocketMQ 提供三种方式来发送普通消息:同步(Sync)发送、异步(Async)发送和单向(Oneway)发送。

同步发送同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。

此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。



异步发送异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。

一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

单向发送发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。

此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。


1.2 发送失败时未重试或补偿

importcom.aliyun.openservices.ons.api.Message;importcom.aliyun.openservices.ons.api.Producer;importcom.aliyun.openservices.ons.api.SendResult;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;
importjava.nio.charset.StandardCharsets;
@Service@Slf4jpublicclassProductSender {
@AutowiredprivateProducerproducer;
publicvoidsendMsg(Stringcontent) {        try {            byte[] body=content.getBytes(StandardCharsets.UTF_8);            Messagemessage=newMessage("topicName", "tagName", "msgKey", body);            SendResultsendResult=producer.send(message);        } catch (Exceptionignored) {            // TODO: 2023/9/29 发送失败时无处理,网络抖动或服务不稳定时会造成消息丢失        }    }}


2、消息存储场景丢失消息

2.1 、Broker宕机或者磁盘损坏,Broker Server内存中的消息没有落盘

2.2 、过期清理机制引发消息丢失
Apache RocketMQ 中队列的定义,消息按照到服务器的先后顺序被存储到队列中,理论上每个队列都支持无限存储。但是在实际部署场景中,服务端节点的物理存储空间有限,消息无法做到永久存储。因此,在实际使用中需要考虑以下问题,消息在服务端中的存储以什么维度为判定条件?消息存储以什么粒度进行管理?消息存储超过限制后如何处理?这些问题都是由消息存储和过期清理机制来定义的。

Apache RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉。删除旧的没有使用过的消息是由后台定时任务完成的。

消息存储文件结构说明

3、消费场景丢失消息

3.1 消费失败,但消费消息的返回结果为成功

importcom.aliyun.openservices.ons.api.Action;importcom.aliyun.openservices.ons.api.ConsumeContext;importcom.aliyun.openservices.ons.api.Message;importcom.aliyun.openservices.ons.api.MessageListener;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Service;
importjava.nio.charset.StandardCharsets;
@Service@Slf4jpublicclassMissingMsgWhenConsumeFailimplementsMessageListener {
@OverridepublicActionconsume(Messagemessage, ConsumeContextcontext) {        
try {            Stringmsg=newString(message.getBody(), StandardCharsets.UTF_8);            returnAction.CommitMessage;        } catch (Exceptione) {            //丢失消息:消费失败了,但消费消息的返回结果为成功。            return Action.CommitMessage;        }    }}

RocketMQ消费场景引发的系统故障

3.2 订阅关系不一致导致消息丢失您可在云消息队列 RocketMQ 版控制台Group 详情页面查看指定Group的订阅关系是否一致。出现订阅关系不一致时,控制台中也会有告警:

同一个消费者Group ID下所有Consumer实例所订阅的Topic、Tag必须完全一致。如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。
如下图所示,一个队列中分发不同类型的消息。

如果一个消费者Group ID订阅了tagA和tagB,那么这个消费组下消费者绑定的队列中会被borker投递所订阅所有Tag的信息。

消息丢失的根因是,一个队列在同一时间只会被分配给一个消费者,这样队列上不符合消息过滤规则的消息消费会被忽略,并且消息消费的进度会向前移动,从而造成消息丢失。

经典实践
一个GroupId[消费组]只在一个JVM中使用

正确订阅关系一:相同Group ID的N个消费者订阅一个Topic且订阅一个Tag

正确订阅关系二:相同Group ID的N个消费者订阅一个Topic且订阅多个Tag

正确订阅关系三:相同Group ID的N个消费者订阅多个Topic且订阅多个Tag

小结

在RocketMQ领域中,一条消息从生产、存储、消费整个链路中都可以让消息“丢失”。
业务逻辑复杂,历史久远的接口出现数据错误怎么办?
干货|如何快速问题出在哪了?
从全链路视角看,让消息丢失的漏洞百出。
那么,你“学会”让消息丢失的"技巧"了吗?

参考

https://rocketmq.apache.org/zh/docs/发送普通消息(单向发送):https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/sample-code-2发送普通消息(三种方式):https://www.alibabacloud.com/help/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/send-normal-messages-in-one-of-three-modes消息存储机制:https://rocketmq.apache.org/zh/docs/featureBehavior/11messagestorepolicy消息在云消息队列 RocketMQ 版中能保存多久?https://www.alibabacloud.com/help/zh/apsaramq-for-rocketmq/faq-about-features#section-r2b-stc-pz6
常见订阅关系不一致问题 https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/use-cases/subscription-consistency


MQ相关阅读
RabbitMQ消息为什么变成了数字呢?
微服务+RabbitMQ之从零到yi


RocketMQ系列 | 如何让消息“丢失”?

https://mp.weixin.qq.com/s/RnS675dt-wErnEuolK6Zeg


RocketMQ系列 | 容量削峰填谷后,发送的消息“少”了怎么办!!??

https://mp.weixin.qq.com/s/kejgc_u8GHdXrI4uW9TWNw

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
消息中间件 存储 数据库
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
88647 15
|
消息中间件 弹性计算 Java
使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践
使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践
1318 10
|
2月前
|
消息中间件 存储 Java
MQ线上消息乱序问题处理及场景详解
【11月更文挑战第22天】在现代分布式系统中,消息队列(MQ)作为核心组件,承担着异步处理、削峰填谷和系统解耦的重任。
82 1
|
8月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
3月前
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
131 0
|
8月前
|
消息中间件 SQL 容灾
深度剖析 RocketMQ 5.0,消息进阶:如何支撑复杂业务消息场景?
本文主要学习 RocketMQ 的一致性特性,一致性对于交易、金融都是刚需。从大规模复杂业务出发,学习 RocketMQ 的 SQL 订阅、定时消息等特性。再从高可用的角度来看,这里更多的是大型公司对于高阶可用性的要求,如同城容灾、异地多活等。
108756 287
|
5月前
|
消息中间件 固态存储 RocketMQ
RocketMQ消息堆积常见场景与处理方案
文章分析了在使用RocketMQ时消息堆积的常见场景,如消费者注册失败或消费速度慢于生产速度,并提供了相应的处理方案,包括提高消费并行度、批量消费、跳过非重要消息以及优化消费代码业务逻辑等。
|
6月前
|
消息中间件 存储 RocketMQ
MetaQ/RocketMQ 原理问题之在解耦场景中,消息队列工作的问题如何解决
MetaQ/RocketMQ 原理问题之在解耦场景中,消息队列工作的问题如何解决
|
7月前
|
消息中间件 存储 运维
RocketMQ与Kafka深度对比:特性与适用场景解析
RocketMQ与Kafka深度对比:特性与适用场景解析
|
7月前
|
消息中间件 Serverless Windows
消息队列 MQ产品使用合集之MQTT协议是否可以应用于社交软件的系统通知场景
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。