RocketMQ 消息的顺序和重复

简介: 这篇文章探讨了RocketMQ中消息顺序和重复的问题,解释了为什么RocketMQ不保证消息顺序和不重复,并提供了解决这些问题的策略,包括消费端幂等性处理和使用日志表记录已处理消息ID,同时介绍了RocketMQ的事务消息、Producer和Consumer的最佳实践,以及其他配置和RocketMQ的基本概念。

世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!—— 沈询

一、消息顺序

分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。而谈到消息系统的设计,就回避不了两个问题:

  1. 消息的顺序问题
  2. 消息的重复问题

有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。如果硬要把时间花在解决它们身上,实际上是浪费的,效率低下的。从这个角度来看消息的顺序问题,我们可以得出两个结论:

1. 不关注乱序的应用实际大量存在

2. 队列无序并不意味着消息无序

最后我们从源码角度分析RocketMQ怎么实现发送顺序消息。

一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业务,比如说订单号相同的消息发送到同一个队列。下面的示例中,OrderId相同的消息,会发送到同一个队列:

// RocketMQ默认提供了两种MessageQueueSelector实现:随机/HashSendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Overridepublic MessageQueue select(List mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); }}, orderId);

在获取到路由信息以后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的队列是同一个队列。

private SendResult send() { // 获取topic路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; // 根据我们的算法,选择一个发送队列// 这里的arg = orderId mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); if (mq != null) { returnthis.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout); } }}

二、消息重复

上面在解决消息顺序问题时,引入了一个新的问题,就是消息重复。那么RocketMQ是怎样解决消息重复的问题呢?还是“恰好”不解决。

造成消息的重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是不解决,转而绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

1. 消费端处理消息的业务逻辑保持幂等性

2. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

我们可以看到第1条的解决方式,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率不一定大,且由消息系统实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。

RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

三、事务消息

RocketMQ除了支持普通消息,顺序消息,另外还支持事务消息。

大事务 = 小事务 + 异步

将大事务拆分成多个小事务异步执行。这样基本上能够将跨机事务的执行效率优化到与单机一致。

那问题是:我们是先扣款还是先发送消息呢?

可能大家会有很多的方法来解决这个问题,比如:直接将发消息放到Bob扣款的事务中去,如果发送失败,抛出异常,事务回滚。这样的处理方式也符合“恰好”不需要解决的原则。

四、Producer

// 构造ProducerDefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");// 初始化Producer,整个应用生命周期内,只需要初始化1次

producer.start();// 构造

MessageMessage msg = new Message("TopicTest1",// topic"TagA",// tag:给消息打标签,用于区分一类消息,可为null"OrderID188",// key:自定义Key,可以用于去重,可为null ("Hello MetaQ").getBytes());// body:消息内容// 发送消息并返回结果SendResult sendResult = producer.send(msg);// 清理资源,关闭网络连接,注销自己producer.shutdown();

在整个应用生命周期内,生产者需要调用一次start方法来初始化,初始化主要完成的任务有:

1. 如果没有指定namesrv地址,将会自动寻址

2. 启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向所有broker发送心跳…

3. 启动负载均衡的服务

五、消息存储

RocketMQ的消息存储是由consume queue和commit log配合完成的。

1、Consume Queue

consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置。

我们可以在配置中指定consumequeue与commitlog存储的目录

每个topic下的每个queue都有一个对应的consumequeue文件,比如:

${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

  1. 根据topic和queueId来组织文件,图中TopicA有两个队列0,1,那么TopicA和QueueId=0组成一个ConsumeQueue,TopicA和QueueId=1组成另一个ConsumeQueue。
  2. 按照消费端的GroupName来分组重试队列,如果消费端消费失败,消息将被发往重试队列中,比如图中的%RETRY%ConsumerGroupA。
    1. 按照消费端的GroupName来分组死信队列,如果消费端消费失败,并重试指定次数后,仍然失败,则发往死信队列,比如图中的%DLQ%ConsumerGroupA。

死信队列(Dead Letter Queue)一般用于存放由于某种原因无法传递的消息,比如处理失败或者已经过期的消息。

Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,如下图所示:

CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量

Size存储中消息的大小

Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)

2、Commit Log

CommitLog:消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。

文件的默认位置如下,仍然可通过配置文件修改:

${user.home} \store\${commitlog}\${fileName}

六、消息订阅

RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到MQServer拉取。但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。

RocketMQ最佳实践

一、Producer最佳实践

1、一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。

2、每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。

3、消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。

4、对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。

5、某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。

二、Consumer最佳实践

  1. 消费过程要做到幂等(即消费端去重)
  2. 尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量
  3. 优化每条消息消费过程

三、其他配置

线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false。

RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。后果就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。

所以基于目前RocketMQ的设计,建议关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。

RocketMQ 版提供三种方式来发送普通消息:同步(Sync)发送、异步(Async)发送和单向(Oneway)发送。

一、RocketMQ中的专业术语

  • Topic

topic表示消息的第一级类型,比如一个电商系统的消息可以分为:交易消息、物流消息…… 一条消息必须有一个Topic。

  • Tag

Tag表示消息的第二级类型,比如交易消息又可以分为:交易创建消息,交易完成消息….. 一条消息可以没有Tag。RocketMQ提供2级消息分类,方便大家灵活控制。

  • Queue

一个topic下,我们可以设置多个queue(消息队列)。当我们发送消息时,需要要指定该消息的topic。RocketMQ会轮询该topic下的所有队列,将消息发送出去。

  • Producer 与 Producer Group

Producer表示消息队列的生产者。消息队列的本质就是实现了publish-subscribe模式,生产者生产消息,消费者消费消息。所以这里的Producer就是用来生产和发送消息的,一般指业务系统。

Producer Group是一类Producer的集合名称,这类Producer通常发送一类消息,且发送逻辑一致。

  • Consumer 与 Consumer Group

消息消费者,一般由后台系统异步消费消息。

  • Push Consumer

Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。

  • Pull Consumer

Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。

Consumer Group是一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。

  • Broker

消息的中转者,负责存储和转发消息。可以理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。broker是RocketMQ的核心,它不不能挂的,所以需要保证broker的高可用。

  • 广播消费

一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次。在广播消费中的Consumer Group概念可以认为在消息划分方面无意义。

  • 集群消费

一个Consumer Group中的Consumer实例平均分摊消费消息。例如某个Topic有 9 条消息,其中一个Consumer Group有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。

  • NameServer

NameServer即名称服务,两个功能:

接收broker的请求,注册broker的路由信息

接口client的请求,根据某个topic获取其到broker的路由信息

NameServer没有状态,可以横向扩展。每个broker在启动的时候会到NameServer注册;Producer在发送消息前会根据topic到NameServer获取路由(到broker)信息;Consumer也会定时获取topic路由信息。

二、RocketMQ Overview

Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列;如果做集群消费,则多个Consumer实例平均消费这个Topic对应的队列集合。

再看下RocketMQ物理部署结构图:

RocketMQ网络部署特点:

Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId=0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。

Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer 完全无状态,可集群部署。

Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic 路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

参考:MQ如何解决消息的顺序&重复两大硬伤?-CSDN博客

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 RocketMQ
Rocketmq如何保证消息不丢失
文章分析了RocketMQ如何通过生产者端的同步发送与重试机制、Broker端的持久化存储与消息重试投递策略、以及消费者端的手动提交ack与幂等性处理,来确保消息在整个传输和消费过程中的不丢失。
|
消息中间件 RocketMQ
如何保证RocketMQ消息有序?
如何保证RocketMQ消息有序?
|
5月前
|
消息中间件 缓存 监控
MQ消息积压 / Rocketmq 积压 最全的处理方案。 (秒懂+图解+史上最全)
MQ消息积压 / Rocketmq 积压 最全的处理方案。 (秒懂+图解+史上最全)
MQ消息积压 / Rocketmq 积压 最全的处理方案。 (秒懂+图解+史上最全)
|
8月前
|
消息中间件 大数据 关系型数据库
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
592 64
RocketMQ实战—3.基于RocketMQ升级订单系统架构
|
2月前
|
SQL 监控 关系型数据库
mysql 索引失效?怎么解决? (重点知识,建议收藏,读10遍+)
本文总结MySQL索引失效的八大常见场景,如函数操作、类型不匹配、OR连接、违背最左前缀等,并结合实际案例解析。通过EXPLAIN分析执行计划,帮助开发者识别问题,提供优化策略,提升查询性能。
|
6月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
4854 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
11月前
|
IDE Java 数据库连接
SpringBoot整合XXL-JOB【02】- 启动调度中心
本文介绍了如何初始化和配置XXL-JOB调度中心。首先,从GitHub或Gitee获取源码;接着,执行`tables_xxl_job.sql`脚本初始化数据库。然后,在IDE中打开项目并修改`application.properties`中的数据库连接和`accessToken`配置。完成配置后,启动`XxlJobAdminApplication`,访问http://localhost:8080/xxl-job-admin/进行登录。最后,简要介绍了调度中心的主要功能模块,包括运行报表、任务管理、调度日志、执行器管理和用户管理。下篇将通过实例演示如何使用XXL-JOB执行定时任务。
506 6
SpringBoot整合XXL-JOB【02】- 启动调度中心
|
消息中间件 存储 Kafka
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
|
12月前
|
消息中间件 Java Kafka
MQ四兄弟:如何保证消息顺序性
在分布式系统中,消息队列(MQ)是确保组件间高效通信的关键。RabbitMQ、RocketMQ、Kafka和Pulsar通过不同机制保证消息顺序性:RabbitMQ依赖单一队列和消费者模式;RocketMQ使用MessageQueueSelector;Kafka基于Partition和Key;Pulsar通过分区主题和键路由。这些系统的核心思想是将相同特征的消息发送到同一队列或分区,并按先进先出原则消费,从而确保消息顺序性。
830 0