Springboot----项目整合微信支付与RabbitMQ(使用RabbitMQ延迟插件实现订单管理)

简介: 主要介绍如何在Springboot项目支付模块中使用RabbitMQ实现延迟队列,采用的是RabbitMQ的延迟插件

一:🐱‍🏍问题引入

前面提到可以使用RabbitMQ实现订单到期自动取消以及当超过某一时间订单还是显示未支付时候就可以通过延迟队列主动向微信支付后台进行订单查询。

由于RabbitMQ是基于Erlang语言开发的,因此要使用RabbitMQ,首先要安装Erlang,至于安装教程可以自行百度解决,然后就是安装RabbitMQ并进行相关配置。

在RabbitMQ 3.6.X之前,要实现延迟队列只能通过TTL(生存时间)+ DLX(死信交换机)来实现,实现过程并不复杂。在RabbitMQ官方文档中有这样一句话:Dead letter exchanges (DLXs) are normal exchanges. They can be any of the usual types and are declared as usual. 意思是死信交换机是一个普通的交换机,它可以被当做普通交换机来使用。关键点在于这个交换机是用来存放过期消息的,所以这一交换机就称为死信交换机,流程图见下图:

设置过期时间有两种方法,一种是单独针对每一条消息进行设置,但是这样会因为时序问题形成队头阻塞现象。因为队列消息是按序消费的,如果队头的消息延迟时间是 10s, 后面的消息都要等至少 10s 后才可以进行消费。另一种方法是设置过期时间在消息队列上,如果过期时间设置在队列上,所有发送到队列的消息延迟时间都是该队列设定值,而业务需求延迟时间是随着重试次数线性增长的,这样就需要创建很多个固定延迟时间的队列。

可以看到无论采用哪一种方式都有很大的缺陷,但是在这个项目中是可以采用第二种方式的,因为针对每一笔订单设置的过期时间都为5分钟。

在RabbitMQ 3.6.X之后,RabbitMQ推出了delay-message 插件,该插件可以更好地实现延迟队列,当然,要使用这个插件还需要自行进行安装,具体安装过程可以自己百度解决。使用该插件的好处有两个方面,当然就是针对上面两种方案的缺陷来改进的。

首先,它是将延迟时间设置在消息上的,这样只要创建一个队列即可;

其次,指定为延迟类型的交换机在接收到消息后并未立即将消息投递至目标队列中,而是存储在 mnesia (一个分布式数据系统)表中,检测消息延迟时间,在达到可投递时间时才投递至目标队列,这样就不存在队头阻塞现象。

二:🐱‍🏍相关配置

#rabbitmq配置
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated  # 用来配置消息发送到交换器之后是否触发回调方法
    publisher-returns: true   # 触发路由失败消息的回调(用不上)
    listener:
      simple:
        acknowledge-mode: manual  #手动确认
        prefetch: 1    #限流(海量数据,同时只能过来一条)

需要说明的是,publisher-confirm-type设置为correlated表示消息发送到交换机之后会发送回调通知给生产者,如果由于RabbitMQ内部原因导致交换机接收失败返回失败回调信息之后需要进行异常处理。publisher-returns这一参数实质上是用不上的,因为延时消息是从磁盘上读取消息然后发送(后台任务),发送消息时候无法保证两点:

  1. 发送时消息路由队列还存在
  2. 发送时原连接仍然支持回调方法

因为消息写磁盘和读磁盘消息发送存在时间差,两个时间点的队列和连接情况可能不同,所以不支持Mandatory设置。(publisher-returns: true必须与template.mandatory: true一起设置路由失败消息的回调才能生效)。

此外,为了保证消息传递的可靠性,我将消息确认机制设置为手动确认,同时每次只能过来一条数据。

三:🐱‍🏍代码实现

3.1:初始化设置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayMessageConfig {
    //交换机名称
    public static final String DELAY_EXCHANGE_NAME = "plugin.delay.exchange";
    //消息队列名称
    public static final String DELAY_QUEUE_ORDER_NAME = "plugin.delay.order.queue";  //订单队列
    public static final String DELAY_QUEUE_REFUND_NAME = "plugin.delay.refund.queue";  //退款处理队列
    //路由名称
    public static final String ROUTING_KEY_ORDER = "plugin.delay.routing_order";  //订单路由名称
    public static final String ROUTING_KEY_REFUND = "plugin.delay.routing_refund";  //退款路由名称
    /**
     * 声明一个订单延迟队列
     * @return
     */
    @Bean("ORDER_DELAY_QUEUE")
    Queue orderDelayQueue(){
        return QueueBuilder.durable(DELAY_QUEUE_ORDER_NAME).build();
    }
    /**
     * 声明一个退款延迟队列
     * @return
     */
    @Bean("REFUND_DELAY_QUEUE")
    Queue refundDelayQueue(){
        return QueueBuilder.durable(DELAY_QUEUE_REFUND_NAME).build();
    }
    /**
     * 声明一个交换机
     * @return
     */
    @Bean("DELAY_EXCHANGE")
    CustomExchange delayExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true,false, args);
    }
    /**
     * 订单延迟队列绑定
     * @param orderDelayQueue
     * @param delayExchange
     * @return
     */
    @Bean
    Binding orderDelayQueueBinding(@Qualifier("ORDER_DELAY_QUEUE") Queue orderDelayQueue,@Qualifier("DELAY_EXCHANGE") CustomExchange delayExchange){
        return BindingBuilder.bind(orderDelayQueue).to(delayExchange).with(ROUTING_KEY_ORDER).noargs();
    }
    /**
     * 退款延迟队列绑定
     * @param refundDelayQueue
     * @param delayExchange
     * @return
     */
    @Bean
    Binding refundDelayQueueBinding(@Qualifier("REFUND_DELAY_QUEUE") Queue refundDelayQueue,@Qualifier("DELAY_EXCHANGE") CustomExchange delayExchange){
        return BindingBuilder.bind(refundDelayQueue).to(delayExchange).with(ROUTING_KEY_REFUND).noargs();
    }
}

说明:这里声明的交换机类型为直接交换机,交换机通过路由键与不同队列进行绑定,这里是一个交换机绑定了两个队列,因为除了用户下单需要用到延迟队列之外,用户退款也需要用到延迟队列,具体细节我会在后面讲解。

3.2:生产者

import com.fasterxml.jackson.databind.ObjectMapper;
import com.my.reggie.pojo.Orders;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @description : 消息生产者
 */
@Component
@Slf4j
public class RabbitmqDelayProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;
    /**
     *
     * @param No 消息
     * @param messageId 唯一id
     * @param exchangeName 交换机
     * @param key 路由键
     * @param delayTime 延迟时间(毫秒)
     */
    public void publish(String No, String messageId, String exchangeName, String key, Integer delayTime) {
        /* 确认的回调 确认消息是否到达 Broker 服务器 其实就是是否到达交换器
         * 如果发送时候指定的交换器不存在 ack 就是 false 代表消息不可达
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            assert correlationData != null;
            String message_Id = correlationData.getId();
            //返回成功,表示消息被正常投递到交换机
            if (ack) {
                log.info("信息投递到交换机成功,messageId:{}",message_Id);
            } else {
                log.error("交换机不可达,messageId:{} 原因:{}",message_Id,cause);
            }
        });
        /**
         * 延时消息是从磁盘读取消息然后发送(后台任务),发送消息的时候无法保证两点:
         *
         * 1、发送时消息路由的队列还存在
         * 2、发送时原连接仍然支持回调方法
         * 原因:消息写磁盘和从磁盘读取消息发送存在时间差,两个时间点的队列和连接情况可能不同。所以不支持Mandatory设置
         */
        /* 消息失败的回调
         * 例如消息已经到达交换器上,但路由键匹配任何绑定到该交换器的队列,会触发这个回调,此时 replyText: NO_ROUTE
         * 用不上
         */
        rabbitTemplate.setMandatory(false);
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            String message_Id = returnedMessage.getMessage().getMessageProperties().getMessageId();
                    byte[] message = returnedMessage.getMessage().getBody();
                    Integer replyCode = returnedMessage.getReplyCode();
                    String replyText = returnedMessage.getReplyText();
                    String exchange = returnedMessage.getExchange();
                    String routingKey = returnedMessage.getRoutingKey();
                    log.warn("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}",
                            new String(message),message_Id,replyCode,replyText,exchange,routingKey);
                }
        );
        // 在实际中ID 应该是全局唯一 能够唯一标识消息 消息不可达的时候触发ConfirmCallback回调方法时可以获取该值,进行对应的错误处理
        CorrelationData correlationData = new CorrelationData(messageId);
        rabbitTemplate.convertAndSend(exchangeName, key, No, message -> {
            // 设置延迟时间
            message.getMessageProperties().setDelay(delayTime);
            return message;
        }, correlationData);
    }
}

3.3:消费者

import com.my.reggie.service.WxPayService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RabbitmqDelayConsumer {
    @Autowired
    private WxPayService wxPayService;
    /**
     * 监听订单延迟队列
     * @param orderNo
     * @throws Exception
     */
    @RabbitListener(queues = {"plugin.delay.order.queue"})
    public void orderDelayQueue(String orderNo, Message message, Channel channel) throws Exception {
        log.info("订单延迟队列开始消费...");
        try {
            //处理订单
            wxPayService.checkOrderStatus(orderNo);
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            log.info("消息接收成功");
        } catch (Exception e) {
            e.printStackTrace();
            //消息重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
            log.info("消息接收失败,重新入队");
        }
    }
    /**
     * 监听退款延迟队列
     * @param refundNo
     */
    @RabbitListener(queues = {"plugin.delay.refund.queue"})
    public void refundDelayQueue(String refundNo, Message message, Channel channel) throws Exception {
        log.info("退款延迟队列开始消费...");
        try {
            //处理退款信息
            wxPayService.checkRefundStatus(refundNo);
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            log.info("消息接收成功");
        } catch (Exception e) {
            e.printStackTrace();
            //消息重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
            log.info("消息接收失败,重新入队");
        }
    }
}

由于开启了手动确认机制,假如成功处理消息,就需要向服务器告知消息已经成功被我消费,可以在队列中删除该条消息,否则服务器会不断重新发送消息,要是出现异常就需要将消息重新放回队列中。

四:🐱‍🏍友情链接

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 Java 网络架构
|
25天前
|
JSON 文字识别 小程序
微信小程序OCR插件,实现身份证、行驶证、银行卡、营业执照和驾驶证等识别
微信小程序OCR插件,实现身份证、行驶证、银行卡、营业执照和驾驶证等识别
151 0
|
3月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
3月前
|
消息中间件 存储 传感器
RabbitMQ 在物联网 (IoT) 项目中的应用案例
【8月更文第28天】随着物联网技术的发展,越来越多的设备被连接到互联网上以收集和传输数据。这些设备可以是传感器、执行器或其他类型的硬件。为了有效地管理这些设备并处理它们产生的大量数据,需要一个可靠的消息传递系统。RabbitMQ 是一个流行的开源消息中间件,它提供了一种灵活的方式来处理和转发消息,非常适合用于物联网环境。
147 1
|
3月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
713 2
|
3月前
|
消息中间件 Java Maven
|
4月前
|
消息中间件 物联网 API
消息队列 MQ使用问题之如何在物联网项目中搭配使用 MQTT、AMQP 与 RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
4月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
220 0