MQ四兄弟:如何实现延时消息

简介: 本文介绍了几种常见的消息队列系统(RabbitMQ、RocketMQ、Kafka和Pulsar)实现延时消息的方式。RabbitMQ通过死信队列或延时插件实现;RocketMQ内置延时消息支持,可通过设置`delayTimeLevel`属性实现;Kafka不直接支持延时消息,但可以通过时间戳、延时Topic、Kafka Streams等方法间接实现;Pulsar自带延时消息功能,提供`deliverAfter`和`deliverAt`两种方式。每种方案各有优劣,适用于不同的应用场景。

添加图片注释,不超过 140 字(可选)


RabbitMQ延时消息


添加图片注释,不超过 140 字(可选)


RabbitMQ 本身并没有直接支持延时消息的功能,但是可以通过使用 RabbitMQ 插件或构建消息死信队列(Dead Letter Exchange, DLX)的方式来实现延时消息。以下是两种实现延时消息的方法:

1、死信队列 (Dead-Letter Queue):

  • 当消息被拒绝接收,或者在队列中的存活时间超过了设置的TTL(Time-To-Live),或者队列达到最大长度时,消息会变成死信。
  • 死信会被重新发布到另一个交换机上,这个交换机就是DLX(Dead-Letter Exchange)。
  • 在声明业务队列时,可以添加一个x-dead-letter-exchange参数,值为死信交换机,这样死信就会被RabbitMQ重新发布到配置的这个交换机上.

2、延时插件 (Delayed Message Plugin):

需要先安装 RabbitMQ Delayed Message 插件

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ezmv rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-<version>/plugins/ rabbitmq-plugins enable rabbitmq_delayed_message_exchange

  • 声明一个类型为x-delayed-message的交换机,并添加一个x-delayed-type参数,值为交换机的类型,用于路由键的映射。
  • 这种方式允许消息在交换机中延迟一定时间,然后再根据路由键发送到相应的队列.


RocketMQ延时消息


添加图片注释,不超过 140 字(可选)


RocketMQ把持 实现延时消息相对简单,因为它内置了对延时消息的支持。RocketMQ 通过设置消息的 定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。

比如,在分布式定时调度场景下,需要实现各类精度的定时任务,例如每天5点执行文件清理,每隔2分钟触发一次消息推送等需求。


添加图片注释,不超过 140 字(可选)

以下是实现延时消息的详细步骤:

1、Producer发送延时消息

RocketMQ 通过设置消息的 delayTimeLevel 属性来实现延时投递。

在 Producer 端,通过设置消息的 delayTimeLevel 属性来发送延时消息。RocketMQ 内置了一组延迟级别,可以通过 delayTimeLevel 来指定延迟时间。

// 创建消息 Message message = new Message("TopicTest", "TagA", "delayed message".getBytes()); // 设置延时级别 message.setDelayTimeLevel(3);  // 延时级别 3,对应于延时 10 秒

2、延时级别对照表

RocketMQ 预定义了多个延时级别,每个级别对应不同的延时时间。以下是默认的延时级别对照表:


添加图片注释,不超过 140 字(可选)


可以根据需要选择合适的延时级别,并设置到 message.setDelayTimeLevel(level) 方法中。

3、Consumer 接收消息

在 Consumer 端,接收和处理延时消息与普通消息相同。

4、调整配置

如果默认的延时级别不满足需求,可以通过修改 RocketMQ 配置文件 broker.conf 来调整延时级别和对应的延时时间:

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

修改完配置文件后,重启 Broker 生效。

小结

可以看到RocketMQ 5.x已经更新了延时消息的实现,在官方文档可以看到已经改成定时、延时消息,原本4.x文档中的延时级别对照表已经去掉了,统一成时间戳的实现。

RocketMQ 5.x 版本对延时消息的实现进行了重大更新。与之前版本相比,5.x 版本可以更灵活地处理消息的延迟时间,允许用户指定准确的延迟时间而不仅仅是预设的延迟级别。


添加图片注释,不超过 140 字(可选)


下面来具体看看,实时/延时消息概念:

定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。

  • 定时消息:例如,当前系统时间为2022-06-09 17:30:00,您希望消息在下午19:20:00定时投递,则定时时间为2022-06-09 19:20:00,转换成时间戳格式为1654773600000。
  • 延时消息:例如,当前系统时间为2022-06-09 17:30:00,您希望延时1个小时后投递消息,则您需要根据当前时间和延时时长换算成定时时刻,即消息投递时间为2022-06-09 18:30:00,转换为时间戳格式为1654770600000。

Kafka延时消息


添加图片注释,不超过 140 字(可选)


在Apache Kafka中,延时消息的处理也不是内置功能,但可以通过一些设计模式和技术手段来实现。以下是几种常见的方法来处理Kafka中的延时消息:

1、基于时间戳的延时消息

  • 生产者在发送消息时,可以在消息的头部添加一个时间戳字段,表示消息应该被消费的时间。
  • 消费者在接收到消息后,检查时间戳,如果未到处理时间,则暂时不处理此消息,直到达到指定时间。

2、基于单独的延时主题(Topic)

  • 创建一个专门的延时Topic
  • 生产者先将延时消息发送到延时Topic
  • 消费者从延时Topic拉取未到期的消息放入延时队列
  • 延时消息到期后,再发送到目标Topic供实际消费

3、利用Kafka Stream做中间处理

  • 创建一个Kafka Streams应用程序,用于处理延时消息。
  • 定义输入Topic,用于接收原始延时消息。同时定义输出Topic,用于发送到期的延时消息。
  • 使用Kafka Streams DSL定义Topology,对输入消息进行处理。
  • 使用自定义的Punctuator定期从State Store中读取到期的延时消息,发送到输出Topic。

这种方式的优点是延迟时间计算由Kafka Streams完成,不需要额外线程控制。缺点是需要应用程序支持并维护Kafka Streams,运维成本较高。

4、基于第三方中间件或工具

  • 利用redis、rabbitmq等其它中间件,构建一个延时消息系统
  • 延时消息从外部系统发往Kafka时已经延时完成
  • 例如利用RabbitMQ的延时队列特性实现


Pulsar延时消息


添加图片注释,不超过 140 字(可选)


Pulsar自带了延时消息功能,可以在发送消息时设置消息的deliverAt或deliverAfter属性。

Apache Pulsar实现延时消息的方案:

  • deliverAfter方法:通过指定一个延时时长来发送消息,消息将在该时长后被投递。
  • deliverAt方法:通过指定一个具体的未来时间戳来发送消息,消息将在该时间点被投递。

这两种方法都可以通过Pulsar的客户端API实现,简单且直接。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
存储 消息中间件 Java
SpringBoot整合RocketMQ发送延时消息
当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息
1119 0
|
7月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
8月前
|
消息中间件 存储 RocketMQ
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
82 0
|
消息中间件 SQL 存储
微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题
微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题
145 0
|
消息中间件 Shell RocketMQ
RocketMQ进阶-延时消息
RocketMQ进阶-延时消息
1130 0
|
消息中间件 缓存 Java
5 张图带你理解 RocketMQ 延时消息机制
5 张图带你理解 RocketMQ 延时消息机制
401 1
5 张图带你理解 RocketMQ 延时消息机制
|
消息中间件 存储 Java
RocketMQ延时消息的原理与实现
本文分享了RocketMQ的延时消息的原理和实现,手把手带你从源码角度了解到内部实现机制。
848 13
RocketMQ延时消息的原理与实现
|
消息中间件 Apache RocketMQ
RocketMq-延时消息
RocketMq-延时消息
RocketMq-延时消息
|
消息中间件 Java 中间件
SpringBoot整合RabbitMQ实现消息的发送与接收,确认消息,延时消息
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
SpringBoot整合RabbitMQ实现消息的发送与接收,确认消息,延时消息
|
消息中间件 NoSQL Java
使用 Kotlin+RocketMQ 实现延时消息
使用 Kotlin+RocketMQ 实现延时消息
401 0
使用 Kotlin+RocketMQ 实现延时消息