RabbitMQ死信队列实战——解决订单超时未支付

简介: RabbitMQ死信队列实战——解决订单超时未支付

正文


一、死信队列


DLX(dead-letter-exchange),死信队列也是一般的队列,当消息变成死信时,消息会投递到死信队列中,经过死信队列进行消费的一种形式,对应的交换机叫死信交换机DLX。


二、产生原因


1、当消息投递到mq后,没有消费者去消费,而消息过期后会进入死信队列。


package com.xiaojie.springboot.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
 * @author xiaojie
 * @version 1.0
 * @description:死信队列配置
 * @date 2021/10/8 21:07
 */
@Component
public class DLXConfig {
    //定义队列
    private static final String MY_DIRECT_QUEUE = "snail_direct_queue";
    //定义队列
    private static final String MY_DIRECT_DLX_QUEUE = "xiaojie_direct_dlx_queue";
    //定义死信交换机
    private static final String MY_DIRECT_DLX_EXCHANGE = "xiaojie_direct_dlx_exchange";
    //定义交换机
    private static final String MY_DIRECT_EXCHANGE = "snail_direct_exchange";
    //死信路由键
    private static final String DIRECT_DLX_ROUTING_KEY = "msg.dlx";
    //绑定死信队列
    @Bean
    public Queue dlxQueue() {
        return new Queue(MY_DIRECT_DLX_QUEUE);
    }
    //绑定死信交换机
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(MY_DIRECT_DLX_EXCHANGE);
    }
    @Bean
    public Queue snailQueue() {
        Map<String, Object> args = new HashMap<>(2);
        // 绑定我们的死信交换机
        args.put("x-dead-letter-exchange", MY_DIRECT_DLX_EXCHANGE);
        // 绑定我们的路由key
        args.put("x-dead-letter-routing-key", DIRECT_DLX_ROUTING_KEY);
        return new Queue(MY_DIRECT_QUEUE, true, false, false, args);
    }
    @Bean
    public DirectExchange snailExchange() {
        return new DirectExchange(MY_DIRECT_EXCHANGE);
    }
    //绑定队列到交换机
    @Bean
    public Binding snailBindingExchange(Queue snailQueue, DirectExchange snailExchange) {
        return BindingBuilder.bind(snailQueue).to(snailExchange).with("msg.send");
    }
    //绑定死信队列到死信交换机
    @Bean
    public Binding dlxBindingExchange(Queue dlxQueue, DirectExchange dlxExchange) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DIRECT_DLX_ROUTING_KEY);
    }
}


生产者产生消息后,并没与消费者去消费,等待消息过期后,自动进入死信队列


public class DLXProvider {
    //定义交换机
    private static final String MY_DIRECT_EXCHANGE = "snail_direct_exchange";
    //普通队列路由键
    private static final String DIRECT_ROUTING_KEY = "msg.send";
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public String sendDlxMsg(){
        String msg="我是模拟死信队列的消息。。。。。";
        rabbitTemplate.convertAndSend(MY_DIRECT_EXCHANGE, DIRECT_ROUTING_KEY, msg, (message) -> {
            //设置有效时间,如果消息不被消费,进入死信队列
            message.getMessageProperties().setExpiration("10000");
            return message;
        });
        return "success";
    }
}


2、当队列满了之后


  @Bean
    public Queue snailQueue() {
        Map<String, Object> args = new HashMap<>(2);
        // 绑定我们的死信交换机
        args.put("x-dead-letter-exchange", MY_DIRECT_DLX_EXCHANGE);
        // 绑定我们的路由key
        args.put("x-dead-letter-routing-key", DIRECT_DLX_ROUTING_KEY);
//        args.put("x-message-ttl", 5000); //为队列设置过期时间
//        x-max-length:队列最大容纳消息条数,大于该值,mq拒绝接受消息,消息进入死信队列
        args.put("x-max-length", 5);
        return new Queue(MY_DIRECT_QUEUE, true, false, false, args);
    }


注意:如果在添加了这一条(队列长度)发生异常时,请删除掉交换机和队列后,重新启动程序,重新进行绑定。


3、消费者拒绝消费消息(消费端发生异常,mq无法收到消费端的ack)


package com.xiaojie.springboot.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
 * @Description: 消费snail消息的消费者
 * @author: xiaojie
 * @date: 2021.10.09
 */
@Component
@Slf4j
public class SnailConsumer {
    @RabbitListener(queues = "snail_direct_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        // 获取消息Id
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        log.info("获取到的消息>>>>>>>{},消息id>>>>>>{}", msg, messageId);
        try {
            int result = 1 / 0;
            System.out.println("result" + result);
            // // 手动ack
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
            // 手动签收
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            //拒绝消费消息(丢失消息) 给死信队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}


三、解决订单超时


111.png


代码实现


绑定订单死信队列


package com.xiaojie.springboot.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
 * @author xiaojie
 * @version 1.0
 * @description:解决订单超时未支付问题,绑定订单死信队列
 * @date 2021/10/8 23:12
 */
@Component
public class OrderDlxConfig {
    @Value(value="${xiaojie.order.queue}")
    private String orderQueue; //订单队列
    @Value(value="${xiaojie.order.exchange}")
    private String orderExchange;//订单队列
    @Value(value="${xiaojie.dlx.queue}")
    private String orderDeadQueue;//订单死信队列
    @Value(value="${xiaojie.dlx.exchange}")
    private String orderDeadExChange;//订单死信交换机
    @Value(value="${xiaojie.order.routingKey}")
    private String orderRoutingKey;//订单路由键
    @Value(value="${xiaojie.dlx.routingKey}")
    private String orderDeadRoutingKey;//死信队列路由键
    @Bean
    public Queue orderQueue(){
        Map<String, Object> args = new HashMap<>(2);
        // 绑定我们的死信交换机
        args.put("x-dead-letter-exchange", orderDeadExChange);
        // 绑定我们的路由key
        args.put("x-dead-letter-routing-key", orderDeadRoutingKey);
        return new Queue(orderQueue, true, false, false, args);
    }
    @Bean
    public Queue orderDeadQueue(){
        return new Queue(orderDeadQueue);
    }
    //绑定交换机
    @Bean
    public DirectExchange orderExchange(){
        return new DirectExchange(orderExchange);
    }
    @Bean
    public DirectExchange orderDeadExchange(){
        return new DirectExchange(orderDeadExChange);
    }
    //绑定路由键
    @Bean
    public Binding orderBindingExchange(Queue orderQueue, DirectExchange orderExchange) {
        return BindingBuilder.bind(orderQueue).to(orderExchange).with(orderRoutingKey);
    }
    //绑定死信队列到死信交换机
    @Bean
    public Binding deadBindingExchange(Queue orderDeadQueue,  DirectExchange orderDeadExchange) {
        return BindingBuilder.bind(orderDeadQueue).to(orderDeadExchange).with(orderDeadRoutingKey);
    }
}


创建订单完成之后,发送消息


package com.xiaojie.springboot.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.xiaojie.springboot.entity.Order;
import com.xiaojie.springboot.mapper.OrderMapper;
import com.xiaojie.springboot.service.OrderService;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 订单实现类
 * @date 2021/10/8 22:16
 */
@Service
public class OrderServiceImpl implements OrderService {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Value(value = "${xiaojie.order.exchange}")
    private String orderExchange;
    @Value(value = "${xiaojie.order.routingKey}")
    private String orderRoutingKey;
    @Override
    public String saveOrder(Order order) {
        String orderId = UUID.randomUUID().toString();
        order.setOrderId(orderId);
        order.setOrderName("test");
        order.setPayMoney(3000D);
        Integer result = orderMapper.addOrder(order);
        if (result > 0) {
            String msg = JSONObject.toJSONString(order);
            //发送mq
            sendMsg(msg, orderId);
            return "success";
        }
        return "fail";
    }
    /**
     * @description: 发送mq消息
     * @param:
     * @param: msg
     * @param: orderId
     * @return: void
     * @author xiaojie
     * @date: 2021/10/8 22:33
     */
    @Async //异步线程发送 ,此处需要单独创建一个类去创建该方法,不然该异步线程可能不会生效
    public void sendMsg(String msg, String orderId) {
        rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置过期时间30s
                message.getMessageProperties().setExpiration("30000");
//                message.getMessageProperties().setMessageId(orderId);
                return message;
            }
        });
    }
    @Override
    public Order getByOrderId(String orderId) {
        return orderMapper.getOrder(orderId);
    }
    @Override
    public Integer updateOrderStatus(String orderId) {
        return orderMapper.updateOrder(orderId);
    }
}


死信队列消费者


package com.xiaojie.springboot.service;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.xiaojie.springboot.entity.Order;
import com.xiaojie.springboot.myenum.OrderStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 死信队列解决订单超时问题
 * @date 2021/10/8 22:43
 */
@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue("xiaojie_order_dlx_queue"),
        exchange = @Exchange(value = "xiaojie_order_dlx_exchange", type = ExchangeTypes.DIRECT),
        key = "order.dlx"))
@Slf4j
public class Consumer {
    @Autowired
    private OrderService orderService;
    /*
     * @param msg
     * @param headers
     * @param channel
     * @死信队列消费消息,如果订单状态是未支付,则修改订单状态
     * @author xiaojie
     * @date 2021/10/9 13:49
     * @return void
     */
    @RabbitHandler
    public void handlerMsg(@Payload String msg, @Headers Map<String, Object> headers,
                           Channel channel) throws IOException {
        log.info("接收到的消息是direct:{}" + msg);
        try {
            Order orderEntity = JSONObject.parseObject(msg, Order.class);
            if (orderEntity == null) {
                return;
            }
            // 根据订单号码查询该笔订单是否存在
            Order order = orderService.getByOrderId(orderEntity.getOrderId());
            if (order == null) {
                return;
            }
            //判读订单状态
            if (OrderStatus.UNPAY.getStatus() == order.getStatus()) {
                //未支付,修改订单状态
                orderService.updateOrderStatus(orderEntity.getOrderId());
                //库存+1
                System.out.println("库存+1");
            }
            //delivery tag可以从消息头里边get出来
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
            //手动应答,消费者成功消费完消息之后通知mq,从队列移除消息,需要配置文件指明。第二个参数为是否批量处理
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            e.printStackTrace();
            //补偿机制
        }
    }
}


完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
156 6
|
3月前
|
消息中间件 JSON Java
|
3月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
3月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
93 0
|
4月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
109 2
|
3月前
|
消息中间件 存储
RabbitMQ-死信交换机和死信队列
死信队列和死信交换机是RabbitMQ提供的一个非常实用的功能,通过合理使用这一机制,可以大大增强系统的健壮性和可靠性。它们不仅能有效解决消息处理失败的情况,还能为系统的错误追踪、消息延迟处理等提供支持。在设计系统的消息体系时,合理规划和使用死信队列和死信交换机,将会为系统的稳定运行提供一个有力的
67 0
|
5月前
|
消息中间件 Java 物联网
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
|
5月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
5月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
73 0
说说RabbitMQ延迟队列实现原理?
|
5月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
144 1