RabbitMQ中延迟队列的全方位解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 工作中有些场景需要用到延迟队列,大概对RabbitMQ延迟队列场景有一些了解,网上大部分的场景应用于:订单超时、定时执行等。而我需要延迟队列的场景是:有一批机器需要监控这个延迟队列长度,一旦满足就提前预备机器,准备执行任务。通过监控延迟队列,我可以准确、可靠的清楚,接下来的某个时间我一定会执行哪些任务。相较于传统通过API来唤醒设备,提升了稳定性。我只需要关注一点:发布消息。

前言

工作中有些场景需要用到延迟队列,大概对RabbitMQ延迟队列场景有一些了解,网上大部分的场景应用于:订单超时、定时执行等。

而我需要延迟队列的场景是:有一批机器需要监控这个延迟队列长度,一旦满足就提前预备机器,准备执行任务。通过监控延迟队列,我可以准确、可靠的清楚,接下来的某个时间我一定会执行哪些任务。相较于传统通过API来唤醒设备,提升了稳定性。我只需要关注一点:发布消息。

同时也了解到,大部分使用RabbitMQ实现延迟队列从两个方向入手:

  • 死信队列+TTL
  • 延迟队列插件

接下来,我将会从这两个方向逐一进行,分析利弊。

阅读本文,你将获得:

  1. 死信队列的场景化使用
  2. 队列TTL及消息TTL出现的各种问题解决
  3. RabbitMQ在docker下插件安装
  4. 完整的Python实现源代码

死信队列

通过死信队列来解决延迟队列给人的感觉是「曲线救国」的方案,因为在原生RabbitMQ中并不直接支持延迟队列。其原理就是把消息发给一个中间队列,这个中间队列预设了TTL过期时间,并绑定了死信队列和死信交换机,当中间队列的消息过期时,就会发送到死信队列中。届时,死信队列就充当了延迟队列。

网络异常,图片无法展示
|

Python代码如下:

# -*- coding: utf-8 -*-
from mq import channel
DEAD_EXCHANGE_NAME = 'dead.exchange'
DEAD_QUEUE_NAME = 'dead.queue'
DEAD_ROUTING_KEY = 'dead.routing.key'
QUEUE_NAME = 'normal.queue'
# 声明死信交换机
channel.exchange.declare(DEAD_EXCHANGE_NAME, exchange_type="direct")
# 声明死信队列
channel.queue.declare(DEAD_QUEUE_NAME, durable=True)
# 死信队列绑定死信交换机和死信路由
channel.queue.bind(
    queue=DEAD_QUEUE_NAME,
    exchange=DEAD_EXCHANGE_NAME,
    routing_key=DEAD_ROUTING_KEY,
)
# 声明正常队列,并绑定死信交换机和死信路由,约束队列TTL为10秒
arguments = {
    "x-dead-letter-exchange": DEAD_EXCHANGE_NAME,
    "x-dead-letter-routing-key": DEAD_ROUTING_KEY,
    "x-message-ttl": 10 * 1000
}
channel.queue.declare(
    QUEUE_NAME, durable=True, arguments=arguments
)
复制代码

接下来,我们将对正常的中间队列发一个测试消息,并且监控死信队列,也就是我们的延迟队列。

# 往正常队列发送一个测试消息
channel.basic.publish("test_message", QUEUE_NAME)
# 监控死信队列(延迟队列)
for idx in range(0, 99):
  time.sleep(1)
  print("第[{}]次尝试获取...".format(idx + 1))
  message = channel.basic.get(DEAD_QUEUE_NAME)
  if message:
      print("获取到死信消息:{}".format(message.body))
      break
复制代码
第[1]次尝试获取...
第[2]次尝试获取...
第[3]次尝试获取...
...
第[10]次尝试获取...
获取到死信消息:test_message
复制代码

问题:不同消息不同延迟

通过对队列设置TTL,我们实现了一个基础版的延迟队列功能,但是目前还存在一个问题,由于我们的TTL是预设在队列上的,一旦业务变化,我得需要多个不同的TTL,那与之产生的问题就是我得基于这个模式,新建多个中间队列,每个队列代表不同的TTL,然后不同的延迟消息发往不同的队列。

为解决这个问题,我们可以尝试将TTL的属性挂载到消息上。

channel.basic.publish(
  "test_message", 
  QUEUE_NAME, 
  properties={"expiration": "3000"}
)
复制代码
第[1]次尝试获取...
第[2]次尝试获取...
第[3]次尝试获取...
获取到死信消息:test_message
复制代码

问题:消息TTL超过队列TTL

又发现问题:如果自定义消息TTL==超过==队列TTL,则优先触发最小值。

channel.basic.publish(
  "test_message", 
  QUEUE_NAME, 
  properties={"expiration": "13000"} # 超过队列TTL
)
复制代码
第[1]次尝试获取...
第[2]次尝试获取...
第[3]次尝试获取...
...
第[10]次尝试获取...
获取到死信消息:test_message
复制代码

无TTL队列+消息TTL

索性我们将队列的TTL取消,采用直接对消息的TTL进行控制。

QUEUE_NAME = 'normal.queue.nottl'
# 声明正常队列,并绑定死信交换机和死信路由,约束队列TTL为10秒
arguments = {
    "x-dead-letter-exchange": DEAD_EXCHANGE_NAME,
    "x-dead-letter-routing-key": DEAD_ROUTING_KEY,
    # "x-message-ttl": 10 * 1000
}
channel.queue.declare(
    QUEUE_NAME, durable=True, arguments=arguments
)
复制代码

我们仅需对上文源码中arguments的TTL属性屏蔽,并新命名一个队列。

  • 测试三秒延迟消息:
channel.basic.publish(
  "test_message", 
  QUEUE_NAME, 
  properties={"expiration": "3000"}
)
复制代码
第[1]次尝试获取...
第[2]次尝试获取...
第[3]次尝试获取...
获取到死信消息:test_message
复制代码
  • 测试十三秒延迟消息:
channel.basic.publish(
  "test_message", 
  QUEUE_NAME, 
  properties={"expiration": "13000"}
)
复制代码
第[1]次尝试获取...
第[2]次尝试获取...
第[3]次尝试获取...
...
第[13]次尝试获取...
获取到死信消息:test_message
复制代码

问题:队列优先级

你以为这就结束了吗?又又又发现新的问题,先上代码:

channel.basic.publish(
    "test_message_3000", 
    QUEUE_NAME, 
    properties={"expiration": "3000"}
)
channel.basic.publish(
    "test_message_5000", 
    QUEUE_NAME, 
    properties={"expiration": "5000"}
)
# 监控死信队列(延迟队列)
for idx in range(0, 99):
    time.sleep(1)
    print("第[{}]次尝试获取...".format(idx + 1))
    message = channel.basic.get(DEAD_QUEUE_NAME)
    if message:
        print("获取到死信消息:{}".format(message.body))
        channel.basic.ack(delivery_tag=message.delivery_tag)
        # break
复制代码

我们同时发布两条消息,一条3秒到达,一条5秒到达。查看接收情况:

第[1]次尝试获取...
第[2]次尝试获取...
第[3]次尝试获取...
获取到死信消息:test_message_3000
第[4]次尝试获取...
第[5]次尝试获取...
获取到死信消息:test_message_5000
复制代码

完美达到我们的目的,但是问题暴露在,一旦发送消息的顺序发生改变,也就是大的延迟在前,小延迟在后。

第[1]次尝试获取...
第[2]次尝试获取...
第[3]次尝试获取...
第[4]次尝试获取...
第[5]次尝试获取...
获取到死信消息:test_message_5000
第[6]次尝试获取...
获取到死信消息:test_message_3000
复制代码

原因在于,不同TTL的消息发给队列,队列如果想要知道哪条消息优先弹出,就需要扫描整个队列。而目前的策略是先发的消息在顶端,监控TTL也是顶端的这条消息,所以会导致后发的消息不管TTL是多少,都会卡在后面。

延迟队列插件

安装延迟队列插件

在官方文档的社区插件页面有一个延迟队列插件解决眼下的问题。

我们是docker安装的RabbitMQ,所以我们需要将下载的插件拷贝到容器内plugins目录下

docker cp /Users/miclon/Downloads/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq:/plugins
复制代码

不妨进容器看看有没有放进去

> docker exec -it rabbitmq /bin/bash
> cd plugins
> ls -a
复制代码

网络异常,图片无法展示
|

接着在容器中启动这个插件:

> rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码
Enabling plugins on node rabbit@rabbitmq:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_prometheus
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@rabbitmq...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange
started 1 plugins.
复制代码

紧接着,我们退出容器后重启RabbitMQ服务。

> docker restart rabbitmq
复制代码

刷新web管理端即可看到:

网络异常,图片无法展示
|

延迟队列实现

from mq import channel
DELAYED_EXCHANGE_NAME = 'delayed.exchange'
DELAYED_QUEUE_NAME = 'delayed.queue'
DELAYED_ROUTING_KEY = 'delayed.routing.key'
# 声明延迟交换机
arguments = {"x-delayed-type": "direct"}
channel.exchange.declare(DELAYED_EXCHANGE_NAME,
                         exchange_type="x-delayed-message",
                         durable=True,
                         arguments=arguments)
# 声明延迟队列
channel.queue.declare(DELAYED_QUEUE_NAME, durable=True)
# 延迟队列绑定延迟交换机和延迟路由
channel.queue.bind(
    queue=DELAYED_QUEUE_NAME,
    exchange=DELAYED_EXCHANGE_NAME,
    routing_key=DELAYED_ROUTING_KEY,
)
复制代码

少了原来死信队列的中间队列,代码里也少了很多。

相较于之前,我们在发送消息的时候需要携带 headers ,并且需要指定 x-delay 的值,这个参数表示消息延时的时间(毫秒)。

channel.basic.publish(
    "test_message_5000",
    DELAYED_ROUTING_KEY,
    DELAYED_EXCHANGE_NAME,
    properties={'headers': {"x-delay": 5000}}
)
channel.basic.publish(
    "test_message_3000",
    DELAYED_ROUTING_KEY,
    DELAYED_EXCHANGE_NAME,
    properties={'headers': {"x-delay": 3000}}
)
复制代码

输出结果:

第[1]次尝试获取...
第[2]次尝试获取...
第[3]次尝试获取...
获取到死信消息:test_message_3000
第[4]次尝试获取...
第[5]次尝试获取...
获取到死信消息:test_message_5000
复制代码

总结

rabbitmq_delayed_message_exchange
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1天前
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
15 3
|
5天前
|
消息中间件 JSON Java
|
26天前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
42 2
|
26天前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
39 2
|
1月前
|
消息中间件 存储 RocketMQ
2分钟看懂RocketMQ延迟消息核心原理
本文从源码层面解析了RocketMQ延迟消息的实现原理,包括延迟消息的使用、Broker端处理机制以及定时任务对延迟消息的处理流程。
2分钟看懂RocketMQ延迟消息核心原理
|
21天前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
34 0
|
23天前
|
消息中间件 Kafka Apache
kafka vs rocketmq: 不要只顾着吞吐量而忘了延迟这个指标
这篇文章讨论了Apache RocketMQ和Kafka的对比,强调RocketMQ在低延迟、消息重试与追踪、海量Topic、多租户等方面进行了优化,特别是在小包非批量和大量分区场景下的吞吐量超越Kafka,适合电商和金融领域等高并发、高可靠和高可用场景。
47 0
|
2月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
2月前
|
SQL 安全 网络安全
网络安全与信息安全:从漏洞到防护的全方位解析
【7月更文挑战第14天】在数字时代的浪潮中,网络安全与信息安全成为维护社会稳定和保护个人隐私的关键。本文深入探讨了网络环境中常见的安全漏洞、先进的加密技术以及提升安全意识的有效策略。通过分析最新的网络攻击案例和防御手段,旨在为读者提供一套实用的网络安全知识体系,帮助公众和企业构建更为坚固的信息安全防线。
|
2月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
48 0
说说RabbitMQ延迟队列实现原理?

推荐镜像

更多