rabbitMQ延时队列与TTL和DLX、延迟队列的相关介绍

简介: TTL是Time To Live的缩写, 也就是生存时间。 RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。 如果两种方式一起使用消息的TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。 默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。
  1. 场景:“订单下单成功后,15分钟未支付自动取消”
    1.传统处理超时订单
    采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,
    并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再优化一下,
    即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,
    然后再做其他的业务操作

    2.rabbitMQ延时队列方案
    一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的,
    并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失

  2. TTL和DLX
    rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换机(DLX)和设置过期时间(TTL)结合起来实现延迟队列

    1.TTL
    TTL是Time To Live的缩写, 也就是生存时间。
    RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。
    如果两种方式一起使用消息的TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。
    默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。

    设置消息的过期时间用 x-message-ttl 参数实现,单位毫秒。
    设置队列的过期时间用 x-expires 参数,单位毫秒,注意,不能设置为0。

    消息:生产者 -> 交换机 消息在生产者制造消息的时候就开始计算了TTL TTL=5
    队列:生产者 -> 交换机 -> 路由键 -> 队列 当消息送达到队列的时候才开始计算TTL TTL=10

    2.DLX和死信队列
    DLX即Dead-Letter-Exchange(死信交换机),它其实就是一个正常的交换机,能够与任何队列绑定。

    死信队列是指队列(正常)上的消息(过期)变成死信后,能够发送到另外一个交换机(DLX),然后被路由到一个队列上,
    这个队列,就是死信队列

    成为死信一般有以下几种情况:
    消息被拒绝(basic.reject or basic.nack)且带requeue=false参数
    消息的TTL-存活时间已经过期
    队列长度限制被超越(队列满)

    注1:如果队列上存在死信, RabbitMq会将死信消息投递到设置的DLX上去 ,
    注2:通过在队列里设置x-dead-letter-exchange参数来声明DLX,如果当前DLX是direct类型还要声明

       x-dead-letter-routing-key参数来指定路由键,如果没有指定,则使用原队列的路由键
    
    
  3. 延迟队列
    通过DLX和TTL模拟出延迟队列的功能,即,消息发送以后,不让消费者拿到,而是等待过期时间,变成死信后,发送给死信交换机再路由到死信队列进行消费

    注1:延迟队列(即死信队列)产生流程见“images/01 死信队列产生流程.png”

  4. 开发步骤
    1.生产者创建一个正常消息,并添加消息过期时间/死信交换机/死信路由键这3个参数

    关键代码1
    new Queue(name, durable, exclusive, autoDelete, arguments);
    new Queue(NORMAL_QUEUE, true, false, false, map)
    参数说明:
    name:队列名字
    durable:true则持久队列
    exclusive:如果我们声明一个排他队列(该队列将仅由声明者的连接使用),则为true
    autoDelete:服务器不再使用时应删除队列,则为true
    arguments:用于声明队列的参数

    map.put("x-message-ttl", 10000);//message在该队列queue的存活时间最大为10秒
    map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
    map.put("x-dead-letter-routing-key", DELAY_ROUTING_KEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键
    

    关键代码2
    new DirectExchange(NORMAL_EXCHANGE, true, false);

    2.消费者A
    正常情况下,由消费者A去消费队列“normal-queue”中的消息,但实际上没有,而是等消息过期

    3.消费者B
    消息过期后,变成死信,根据配置会被投递到DLX,然后根据死信路由键投到死信队列(即延时队列)中

  5. json转换
    1.生产者
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {

     RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
     rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());//指定json转换器
     return rabbitTemplate;

    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){

     return new Jackson2JsonMessageConverter();

    }

    2.消费者
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {

     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
     factory.setConnectionFactory(connectionFactory);
     factory.setMessageConverter(jackson2JsonMessageConverter());
     return factory;

    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){

     return new Jackson2JsonMessageConverter();

    }

附录一:英文
delay:延迟的
normal:正常
exchange、route、queue
Receiver

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7天前
|
消息中间件 JSON Java
|
1天前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
8 0
|
28天前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
44 2
|
1月前
|
消息中间件 存储 RocketMQ
2分钟看懂RocketMQ延迟消息核心原理
本文从源码层面解析了RocketMQ延迟消息的实现原理,包括延迟消息的使用、Broker端处理机制以及定时任务对延迟消息的处理流程。
2分钟看懂RocketMQ延迟消息核心原理
|
25天前
|
消息中间件 Kafka Apache
kafka vs rocketmq: 不要只顾着吞吐量而忘了延迟这个指标
这篇文章讨论了Apache RocketMQ和Kafka的对比,强调RocketMQ在低延迟、消息重试与追踪、海量Topic、多租户等方面进行了优化,特别是在小包非批量和大量分区场景下的吞吐量超越Kafka,适合电商和金融领域等高并发、高可靠和高可用场景。
48 0
|
2月前
|
消息中间件 存储 RocketMQ
消息队列 MQ使用问题之进行超过3天的延迟消息投递,采用多次投递的策略是否有风险
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
199 0
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
110 0
|
消息中间件 Ubuntu Shell
ubuntu安装rabbitmq教程 避坑
ubuntu安装rabbitmq教程 避坑
428 0
|
消息中间件 存储 网络协议
Rabbitmq的安装与使用
Rabbitmq的安装与使用
247 0