一:🐱🏍问题引入
前面提到可以使用RabbitMQ实现订单到期自动取消以及当超过某一时间订单还是显示未支付时候就可以通过延迟队列主动向微信支付后台进行订单查询。
由于RabbitMQ是基于Erlang语言开发的,因此要使用RabbitMQ,首先要安装Erlang,至于安装教程可以自行百度解决,然后就是安装RabbitMQ并进行相关配置。
在RabbitMQ 3.6.X之前,要实现延迟队列只能通过TTL(生存时间)+ DLX(死信交换机)来实现,实现过程并不复杂。在RabbitMQ官方文档中有这样一句话:Dead letter exchanges (DLXs) are normal exchanges. They can be any of the usual types and are declared as usual. 意思是死信交换机是一个普通的交换机,它可以被当做普通交换机来使用。关键点在于这个交换机是用来存放过期消息的,所以这一交换机就称为死信交换机,流程图见下图:
设置过期时间有两种方法,一种是单独针对每一条消息进行设置,但是这样会因为时序问题形成队头阻塞现象。因为队列消息是按序消费的,如果队头的消息延迟时间是 10s, 后面的消息都要等至少 10s 后才可以进行消费。另一种方法是设置过期时间在消息队列上,如果过期时间设置在队列上,所有发送到队列的消息延迟时间都是该队列设定值,而业务需求延迟时间是随着重试次数线性增长的,这样就需要创建很多个固定延迟时间的队列。
可以看到无论采用哪一种方式都有很大的缺陷,但是在这个项目中是可以采用第二种方式的,因为针对每一笔订单设置的过期时间都为5分钟。
在RabbitMQ 3.6.X之后,RabbitMQ推出了delay-message 插件,该插件可以更好地实现延迟队列,当然,要使用这个插件还需要自行进行安装,具体安装过程可以自己百度解决。使用该插件的好处有两个方面,当然就是针对上面两种方案的缺陷来改进的。
首先,它是将延迟时间设置在消息上的,这样只要创建一个队列即可;
其次,指定为延迟类型的交换机在接收到消息后并未立即将消息投递至目标队列中,而是存储在 mnesia (一个分布式数据系统)表中,检测消息延迟时间,在达到可投递时间时才投递至目标队列,这样就不存在队头阻塞现象。
二:🐱🏍相关配置
#rabbitmq配置 rabbitmq: host: localhost port: 5672 username: guest password: guest publisher-confirm-type: correlated # 用来配置消息发送到交换器之后是否触发回调方法 publisher-returns: true # 触发路由失败消息的回调(用不上) listener: simple: acknowledge-mode: manual #手动确认 prefetch: 1 #限流(海量数据,同时只能过来一条)
需要说明的是,publisher-confirm-type设置为correlated表示消息发送到交换机之后会发送回调通知给生产者,如果由于RabbitMQ内部原因导致交换机接收失败返回失败回调信息之后需要进行异常处理。publisher-returns这一参数实质上是用不上的,因为延时消息是从磁盘上读取消息然后发送(后台任务),发送消息时候无法保证两点:
- 发送时消息路由队列还存在
- 发送时原连接仍然支持回调方法
因为消息写磁盘和读磁盘消息发送存在时间差,两个时间点的队列和连接情况可能不同,所以不支持Mandatory设置。(publisher-returns: true必须与template.mandatory: true一起设置路由失败消息的回调才能生效)。
此外,为了保证消息传递的可靠性,我将消息确认机制设置为手动确认,同时每次只能过来一条数据。
三:🐱🏍代码实现
3.1:初始化设置
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class DelayMessageConfig { //交换机名称 public static final String DELAY_EXCHANGE_NAME = "plugin.delay.exchange"; //消息队列名称 public static final String DELAY_QUEUE_ORDER_NAME = "plugin.delay.order.queue"; //订单队列 public static final String DELAY_QUEUE_REFUND_NAME = "plugin.delay.refund.queue"; //退款处理队列 //路由名称 public static final String ROUTING_KEY_ORDER = "plugin.delay.routing_order"; //订单路由名称 public static final String ROUTING_KEY_REFUND = "plugin.delay.routing_refund"; //退款路由名称 /** * 声明一个订单延迟队列 * @return */ @Bean("ORDER_DELAY_QUEUE") Queue orderDelayQueue(){ return QueueBuilder.durable(DELAY_QUEUE_ORDER_NAME).build(); } /** * 声明一个退款延迟队列 * @return */ @Bean("REFUND_DELAY_QUEUE") Queue refundDelayQueue(){ return QueueBuilder.durable(DELAY_QUEUE_REFUND_NAME).build(); } /** * 声明一个交换机 * @return */ @Bean("DELAY_EXCHANGE") CustomExchange delayExchange(){ Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true,false, args); } /** * 订单延迟队列绑定 * @param orderDelayQueue * @param delayExchange * @return */ @Bean Binding orderDelayQueueBinding(@Qualifier("ORDER_DELAY_QUEUE") Queue orderDelayQueue,@Qualifier("DELAY_EXCHANGE") CustomExchange delayExchange){ return BindingBuilder.bind(orderDelayQueue).to(delayExchange).with(ROUTING_KEY_ORDER).noargs(); } /** * 退款延迟队列绑定 * @param refundDelayQueue * @param delayExchange * @return */ @Bean Binding refundDelayQueueBinding(@Qualifier("REFUND_DELAY_QUEUE") Queue refundDelayQueue,@Qualifier("DELAY_EXCHANGE") CustomExchange delayExchange){ return BindingBuilder.bind(refundDelayQueue).to(delayExchange).with(ROUTING_KEY_REFUND).noargs(); } }
说明:这里声明的交换机类型为直接交换机,交换机通过路由键与不同队列进行绑定,这里是一个交换机绑定了两个队列,因为除了用户下单需要用到延迟队列之外,用户退款也需要用到延迟队列,具体细节我会在后面讲解。
3.2:生产者
import com.fasterxml.jackson.databind.ObjectMapper; import com.my.reggie.pojo.Orders; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @description : 消息生产者 */ @Component @Slf4j public class RabbitmqDelayProducer { @Resource private RabbitTemplate rabbitTemplate; /** * * @param No 消息 * @param messageId 唯一id * @param exchangeName 交换机 * @param key 路由键 * @param delayTime 延迟时间(毫秒) */ public void publish(String No, String messageId, String exchangeName, String key, Integer delayTime) { /* 确认的回调 确认消息是否到达 Broker 服务器 其实就是是否到达交换器 * 如果发送时候指定的交换器不存在 ack 就是 false 代表消息不可达 */ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { assert correlationData != null; String message_Id = correlationData.getId(); //返回成功,表示消息被正常投递到交换机 if (ack) { log.info("信息投递到交换机成功,messageId:{}",message_Id); } else { log.error("交换机不可达,messageId:{} 原因:{}",message_Id,cause); } }); /** * 延时消息是从磁盘读取消息然后发送(后台任务),发送消息的时候无法保证两点: * * 1、发送时消息路由的队列还存在 * 2、发送时原连接仍然支持回调方法 * 原因:消息写磁盘和从磁盘读取消息发送存在时间差,两个时间点的队列和连接情况可能不同。所以不支持Mandatory设置 */ /* 消息失败的回调 * 例如消息已经到达交换器上,但路由键匹配任何绑定到该交换器的队列,会触发这个回调,此时 replyText: NO_ROUTE * 用不上 */ rabbitTemplate.setMandatory(false); rabbitTemplate.setReturnsCallback(returnedMessage -> { String message_Id = returnedMessage.getMessage().getMessageProperties().getMessageId(); byte[] message = returnedMessage.getMessage().getBody(); Integer replyCode = returnedMessage.getReplyCode(); String replyText = returnedMessage.getReplyText(); String exchange = returnedMessage.getExchange(); String routingKey = returnedMessage.getRoutingKey(); log.warn("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}", new String(message),message_Id,replyCode,replyText,exchange,routingKey); } ); // 在实际中ID 应该是全局唯一 能够唯一标识消息 消息不可达的时候触发ConfirmCallback回调方法时可以获取该值,进行对应的错误处理 CorrelationData correlationData = new CorrelationData(messageId); rabbitTemplate.convertAndSend(exchangeName, key, No, message -> { // 设置延迟时间 message.getMessageProperties().setDelay(delayTime); return message; }, correlationData); } }
3.3:消费者
import com.my.reggie.service.WxPayService; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @Slf4j public class RabbitmqDelayConsumer { @Autowired private WxPayService wxPayService; /** * 监听订单延迟队列 * @param orderNo * @throws Exception */ @RabbitListener(queues = {"plugin.delay.order.queue"}) public void orderDelayQueue(String orderNo, Message message, Channel channel) throws Exception { log.info("订单延迟队列开始消费..."); try { //处理订单 wxPayService.checkOrderStatus(orderNo); //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("消息接收成功"); } catch (Exception e) { e.printStackTrace(); //消息重新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true); log.info("消息接收失败,重新入队"); } } /** * 监听退款延迟队列 * @param refundNo */ @RabbitListener(queues = {"plugin.delay.refund.queue"}) public void refundDelayQueue(String refundNo, Message message, Channel channel) throws Exception { log.info("退款延迟队列开始消费..."); try { //处理退款信息 wxPayService.checkRefundStatus(refundNo); //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("消息接收成功"); } catch (Exception e) { e.printStackTrace(); //消息重新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true); log.info("消息接收失败,重新入队"); } } }
由于开启了手动确认机制,假如成功处理消息,就需要向服务器告知消息已经成功被我消费,可以在队列中删除该条消息,否则服务器会不断重新发送消息,要是出现异常就需要将消息重新放回队列中。