消息队列:第五章:RabbitMQ的使用

简介: 消息队列:第五章:RabbitMQ的使用

第一步:使用之前先安装好RabbitMQ,建议安装在linux系统下

安装配置RabbitMQ:https://blog.csdn.net/qq_33450681/article/details/85339315

第二步:在配置文件下配置

rabbitmq:
  host: 192.168.0.100
  port: 5672
  virtual-host: /mall
  username: mall
  password: mall
  publisher-confirms: true #如果对异步消息需要回调必须设置为true

浏览器访问http://192.168.0.100:15672/#/



第三步:业务中使用发送消息

    @Autowired
    private OmsOrderSettingMapper orderSettingMapper;
    @Autowired
    private AmqpTemplate amqpTemplate;
/**
     * 发送检查支付结果的消息队列
     * @param orderSn
     * @param count
     */
    @Override
    public void sendDelayPaymentCheck(String orderSn, int count) {
        //获取订单超时时间
        OmsOrderSetting orderSetting = orderSettingMapper.selectByPrimaryKey(1L);
        long delayTimes = orderSetting.getNormalOrderOvertime() * 60 * 1000;
        //将需要发送的数据封装到hashmap中
        HashMap<Object, Object> hashMap = new HashMap<>();
        hashMap.put("out_trade_no",orderSn);
        hashMap.put("count",count);
        //给延迟队列发送消息
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_PAY_CANCEL.getExchange(), QueueEnum.QUEUE_PAY_CANCEL.getRouteKey(), hashMap, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            }
        });
    }

第四步:定义QueueEnum枚举

    /**
     * 支付通知队列
     */
    QUEUE_PAY_CANCEL("mall.pay.direct","mall.pay.cancel","mall.pay.cancel")
    /**
     * 交换名称
     */
    private String exchange;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 路由键
     */
    private String routeKey;
    QueueEnum(String exchange, String name, String routeKey) {
        this.exchange = exchange;
        this.name = name;
        this.routeKey = routeKey;
    }
    public String getExchange() {
        return exchange;
    }
    public String getName() {
        return name;
    }
    public String getRouteKey() {
        return routeKey;
    }

第五步:配置

RabbitMQ参数配置:


使用一个RabbitMQ需要配置以下几个重要的参数


1.虚拟主机名称(Virtual host name),这个参数不是真正的IP地址或者域名,它是RabbitMQ内部的一个虚拟主机,就像是电脑安装了N台虚拟机,对外的名称一般是“/xxxx".


2.交换机名(Exchanges name):顾名思义,就是把生产者送来的消息来进行分发给下游的多个消费者,相当一个内部软交换机。交换机的类型有fanout,direct,topic,header,fanout类型类似以太网交换机的广播模式,把送来的消息给每个下游队列。direct类似单播(使用routingkey来指定目的队列),topic交换机类似组播,把消息传递给下面同一主题的队列,header交换机则忽略掉routingkey,使用hash数据结构来进行匹配和转发。


3.routingkey :前面讲过了,交换机在进行消息转发时候,要使用routingkey为关键字进行转发。


4.队列名称:可以为不同的消费者指定不同的队列,可以对消息进行分类到不同的队列进行转发。

配置类

/**
 * 消息队列配置
 * Created by macro on 2018/9/14.
 */
@Configuration
public class RabbitMqConfig {
    /**
     * 支付队列
     * @return
     */
    @Bean
    public Queue payQueue() {
        return new Queue(QueueEnum.QUEUE_PAY_CANCEL.getName());
    }
    /**
     * 绑定支付交互机
     * @return
     */
    @Bean
    DirectExchange payDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_PAY_CANCEL.getExchange())
                .durable(true)
                .build();
    }
    /**
     * 将支付队列绑定到支付交互机
     * @param payDirect
     * @param payQueue
     * @return
     */
    @Bean
    Binding payBinding(DirectExchange payDirect,Queue payQueue){
        return BindingBuilder
                .bind(payQueue)
                .to(payDirect)
                .with(QueueEnum.QUEUE_PAY_CANCEL.getRouteKey());
    }

第六步:处理支付信息

package com.macro.mall.portal.component;
import com.macro.mall.model.PaymentInfo;
import com.macro.mall.portal.service.PaymentService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
/**
 * 支付的处理者
 */
@Component
@RabbitListener(queues = "mall.pay.cancel")
public class PayReceiver {
    @Autowired
    PaymentService paymentService;
    @RabbitHandler
    public void handle(HashMap mapMessage){
        try {
            String outTradeNo = mapMessage.get("out_trade_no").toString();
            int count = Integer.parseInt(mapMessage.get("count").toString());
            // 如果没有支付成功,再次发送延迟检查队列
            if (count > 0) {
                // 进行支付状态检查
                System.out.println("正在进行第" + (6 - count) + "支付结果次检查");
                //调用alipayClient接口,根据out_trade_no查询支付信息
                PaymentInfo paymentInfo = paymentService.checkPaymentResult(outTradeNo);
                Thread thread = new Thread();
                thread.start();
                Thread.sleep(10000);
                //判断是否已经支付成功
                if (paymentInfo.getPaymentStatus()!=null&&(paymentInfo.getPaymentStatus().equals("TRADE_SUCCESS") || paymentInfo.getPaymentStatus().equals("TRADE_FINISHED"))) {
                    // 交易成功或者失败,记录交易状态
                    System.out.println("检查交易结果成功,记录交易状态。。。");// 修改支付的状态信息
                    // 修改支付信息
                    boolean b = paymentService.checkPaymentStatus(outTradeNo);
                    if(!b){
                        //修改为已支付
                        paymentService.updatePayment(paymentInfo.getCallbackContent(),outTradeNo,paymentInfo.getAlipayTradeNo());
                        // 发送系统消息,出发并发商品支付业务消息队列
                        paymentService.sendPaymentSuccess(paymentInfo.getOutTradeNo(),paymentInfo.getPaymentStatus(),paymentInfo.getAlipayTradeNo());
                    }
                } else {//未支付
                    // 再次进行延迟检查
                    System.out.println("正在进行第" + (6 - count) + "支付结果次检查,检查用户尚未付款成功,继续巡检");
                    paymentService.sendDelayPaymentCheck(outTradeNo, count - 1);
                }
            } else {
                System.out.println("支付结果次检查次数耗尽,支付未果。。。");
            }
        } catch (Exception e) {
        }
    }
}
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
3月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
249 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
884 96
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
397 93
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
205 1
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
359 4
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
279 16

相关产品

  • 云消息队列 MQ