最佳实践:如何基于消息服务MNS实现严格有序队列

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000次 1年
阿里云盘企业版 CDE,企业版用户数5人 500GB空间
简介:
问题背景
阿里 消息 服务 提供的队列(queue)主要特点是高可靠、高可用、高并发。每个队列的 数据 都会被持久化三份到阿里云的飞天分布式平台;其中每个队列至少有2台 服务器 向外提供服务;同时每台服务器都支持高并发访问。这些分布式特性,也导致了消息服务队列无法像传统单机队列那样保证严格的消息FIFO特点,只能做到基本有序。


我们的队列如果同时有多个消息发送者(sender),由于并发和网络延迟不一等问题,消息的严格顺序本身就是失去了意义,因为在这种情况下,我们根本无法获知消息在多个sender上的实际发送顺序和消息到达服务器端的真实顺序。同样的,当有多个接收者并发接收消息时,其真正的处理顺序也是不可获知的。

因此,我们认为只有一个发送者(一个进程,可以是多个线程), 一个接收者时,消息顺序才有意义,也只有在这种情况下我们能够感知和记录消息的真实发送顺序和消息的真实接收顺序。

解决方案
基于上述假设,同时为了满足部分用户对于消息消费顺序性的要求,我们设计了下面的方案,确保消息按照用户发送顺序被接收和消费。
主要步骤:
1.消息在发送端进行染色,加上SeqId(例如:#num#)
2.消息在接收端进行还原,并根据SeqId 排序后返回给上层,同时对于已经receive的消息会有后台线程保证消息不会被重复消费。
3.为了避免因为发送者fail,或者接收者fail,导致seqid 丢失。seqid 会被持久存储到本地磁盘文件。
   当然也可以存储到其他存储或 数据库 :例如OSS,OTS, RDS

程序说明
附件提供了python版的方案实现(依赖MNS Python SDK)。其中,主要提供了OrderedQueueWrapper 类(oredered_queue.py文件),可以将普通的mns queue包装成有序queue。
OrderedQueueWrapper  提供两个方法:SendMessageInOrder()和ReceiveMessageInOrder()。send中对消息进行染色,receive还原消息,并且按顺序返回给接收者。

另外,send_message_in_order.py和receive_message_in_order.py提供了发送者和接收者使用OrderedQueueWrapper的程序示例。
send_message_in_order.py:
    #init orderedQueue
    seqIdConfig = {"localFileName":"/tmp/mns_send_message_seq_id"}   # 指定持久化发送SeqId的磁盘文件。
    seqIdPS = LocalDiskStorage(seqIdConfig)
    orderedQueue = OrderedQueueWrapper(myQueue, sendSeqIdPersistStorage = seqIdPS)
    orderedQueue.SendMessageInOrder(message)

receive_message_in_order.py:
    #init orderedQueue
    seqIdConfig = {"localFileName":"/tmp/mns_receive_message_seq_id"} #指定持久化接收SeqId的磁盘文件
    seqIdPS = LocalDiskStorage(seqIdConfig)
    orderedQueue = OrderedQueueWrapper(myQueue, receiveSeqIdPersistStorage = seqIdPS)
    recv_msg = orderedQueue.ReceiveMessageInOrder(wait_seconds)


运行方法:
1.配置send_message_in_order.py 和receive_message_in_order.py 中下列 配置
g_endpoint,g_accessKeyId,g_accessKeySecret,g_testQueueName 
2.运行send_message_in_order.py
3.运行receive_message_in_order.py (可以不用等步骤2程序运行完成)
发送程序会发送20条消息,接收程序会按顺序消费20条消息



也可以运行oredered_queue.py (需配置endpoint 和AK)的测试case对比普通mns queue的区别:
运行命令:$python oredered_queue.py
非严格有序:(整体有序,部分相邻消息无序,同时侧面证明MNS 的单个queue同时有多个服务器在提供服务)




严格有序:


注意事项:
1.本帖主要目的是展示顺序消息的解决方案,本帖中的代码未经过严格测试,不建议不加测试直接用于生产环境。同时程序仓促完成,难免由瑕疵,欢迎回帖指正。
2.正常情况下,发送端和接收端的seqid应该和queue中的消息(染色)匹配,当出现删除queue重新创建等 操作 时,请注意磁盘文件中的seqid 是否和queue中的真实情况相符,同时建议不要往染色的消息队列里发送非染色消息。
3.队列的消息有效期设置过短或者每条消息的实际处理结果都有可能会对消息有序性造成影响,在您的程序中需要对这些情况所导致的的乱序现象进行处理。


示例程序下载:
 ordered_queue_wrapper.tar.gz (4 K) 
相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
52 1
|
24天前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
96 6
|
2月前
|
消息中间件 JSON Java
|
2月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
2月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
80 0
|
3月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
78 2
|
4月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
4月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
65 0
说说RabbitMQ延迟队列实现原理?
|
4月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
121 1
|
5月前
|
消息中间件 存储 监控
RabbitMQ 死信队列
RabbitMQ的死信队列(DLQ)是存储无法正常消费消息的特殊队列,常见于消息被拒绝、过期或队列满时。DLQ用于异常处理、任务调度和监控,通过绑定到普通队列自动路由死信消息。通过监听死信队列,可以对异常消息进行补偿和进一步处理,提升系统稳定性和可维护性。
79 1