3.死信消息的浪浪山
3.1 ttl
举一个栗子,订单超时未支付则自动取消。
3.1.1 设置队列TTL
下面用代码实现下第一种方式吧。
生产者模块新增配置类TTLRabbitConfiguration.
@Configuration public class TTLRabbitConfiguration { public static final String QUEUE_NAME = "ttl_queue_test"; public static final String EXCHANGE_NAME = "ttl_exchange_test"; @Bean("ttlExchange") public Exchange ttlExchange() { return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build(); } @Bean("ttlQueue") public Queue ttlQueue() { Map<String, Object> args = new HashMap<>(); // 设置5s的过期时间 args.put("x-message-ttl", 5000); return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build(); } @Bean public Binding bindTTLQueueExchange(@Qualifier("ttlQueue")Queue queue, @Qualifier("ttlExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs(); }
测试类新增方法。
@Test public void testTTLSend() { rabbitTemplate.convertAndSend(TTLRabbitConfiguration.EXCHANGE_NAME, "ttl", "ttl mq haha~~~~"); }
启动运行,可以看到下图中ttl_queue_test会有标识TTL,有1条消息ready。
5s以后,ready的消息数会变成0条。
3.1.2 设置消息TTL
配置类
@Configuration public class MSGTTLRabbitConfig { public static final String QUEUE_NAME = "msg_queue"; public static final String EXCHANGE_NAME = "msg_exchange"; @Bean("msgExchange") public Exchange msgExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } @Bean("msgQueue") public Queue msgQueue() { return QueueBuilder.durable(QUEUE_NAME).build(); } @Bean public Binding bindMSGQueueExchange(@Qualifier("msgQueue")Queue queue, @Qualifier("msgExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("msg_ttl").noargs(); } }
测试方法。
@Test public void testMsgTTLSend() { MessagePostProcessor postProcessor= new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); message.getMessageProperties().setContentEncoding("UTF-8"); return message; } }; rabbitTemplate.convertAndSend(MSGTTLRabbitConfig.EXCHANGE_NAME, "msg_ttl", "msg_ttl mq haha~~~~",postProcessor); }
读者可以自测。
3.2 死信交换机
如果ttl到达,直接将消息删除,消息永久就消失了。实际上业务往往不会真的删除,而是将过期队列中过期的消息移入死信交换机。
注意与前面所学的消息失败的异常交换机进行对比。可以发现,异常消息是消费者将其投递到异常队列,而死信消费者可不会管事哦。
死信交换机当然也可以做异常兜底,但是他还有其它的应用场景。建议异常兜底方案还是使用异常交换机来搞。
由于死信消息会直接由普通队列投递到死信队列,而不是通过consumer,因此,需要在投递时指定死信交换机和对应的路由key。
3.3 延迟队列
手工去实现延迟队列多少有点繁琐,可以使用官方插件来快速做。
下面来安装下延迟队列插件。
官方的安装指南地址为:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
下面我们会讲解基于Docker来安装RabbitMQ插件,如果您是通过其它方式安装的RabbitMQ,可以选择使用docker再装下或者自己查找对应的插件安装方式。
3.3.1下载插件
RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html
其中包含各种各样的插件,包括我们要使用的DelayExchange插件:
3.3.2 上传插件
因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。
我们之前设定的RabbitMQ的数据卷名称为mq-plugins
,所以我们使用下面命令查看数据卷:
docker volume inspect mq-plugins
可以得到下面结果:
接下来,将插件上传到这个目录即可:
3.3.3 安装插件
最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq
,所以执行下面命令:
docker exec -it mq bash
执行时,请将其中的 -it
后面的mq
替换为你自己的容器名.
进入容器内部后,执行下面命令开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
结果如下:
3.3.4 使用插件
在管控台。
或者也可以在代码中做上面同样的工作。
声明下死信交换机。
代码贴下。
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay" )) public void listenDelayExchange(String msg) { log.info("消费者接收到了delay.queue的延迟消息"); }
发消息。
代码如下。
@Test public void testSendDelayMessage() throws InterruptedException { // 1.准备消息 Message message = MessageBuilder .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .setHeader("x-delay", 5000) .build(); // 2.准备CorrelationData CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 3.发送消息 rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData); log.info("发送消息成功"); }
跑一下,会发现一个问题
实际上消息只是延迟了,但是异常队列处理了它。因此我们需要对之前的异常策略进行下增强。将生产者的config进行下增强,判断下是否是延迟消息。
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate对象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 配置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 判断是否是延迟消息 Integer receivedDelay = message.getMessageProperties().getReceivedDelay(); if (receivedDelay != null && receivedDelay > 0) { // 是一个延迟消息,忽略这个错误提示 return; } // 记录日志 log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有需要的话,重发消息 }); } }