高性能消息中间件 RabbitMQ(六)

简介: 高性能消息中间件 RabbitMQ(六)

八、RabbitMQ延迟队列

8.1 概念

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

例如:用户下单后,30分钟后查询订单状态,未支付则会取消订单。

但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。

8.2 死信队列实现延迟队列

1.创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、lombok依赖。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>

2.编写配置文件

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
#日志格式
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

3.创建队列和交换机

package com.zj.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
    // 订单交换机和队列
    private final String ORDER_EXCHANGE = "order_exchange";
    private final String ORDER_QUEUE = "order_queue";
    // 过期订单交换机和队列(死信交换机和死信队列)
    private final String EXPIRE_EXCHANGE = "expire_exchange";
    private final String EXPIRE_QUEUE = "expire_queue";
    // 过期订单交换机
    @Bean(EXPIRE_EXCHANGE)
    public Exchange deadExchange(){
        return ExchangeBuilder
                .topicExchange(EXPIRE_EXCHANGE)
                .durable(false)
                .build();
    }
    // 过期订单队列
    @Bean(EXPIRE_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
                .durable(EXPIRE_QUEUE)
                .build();
    }
    // 将过期订单队列绑定到交换机
    @Bean
    public Binding bindDeadQueue(@Qualifier(EXPIRE_EXCHANGE) Exchange exchange,@Qualifier(EXPIRE_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("expire_routing")
                .noargs();
    }
    // 订单交换机
    @Bean(ORDER_EXCHANGE)
    public Exchange normalExchange(){
        return ExchangeBuilder
                .topicExchange(ORDER_EXCHANGE)
                .durable(false)
                .build();
    }
    // 订单队列
    @Bean(ORDER_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
                .durable(ORDER_QUEUE)
                .ttl(10000) // 存活时间为10s,模拟30min
                .deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机
                .deadLetterRoutingKey("expire_routing") // 死信交换机的路由关键字
                .build();
    }
    // 将订单队列绑定到交换机
    @Bean
    public Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,
                                   @Qualifier(ORDER_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("order_routing")
                .noargs();
    }
}

4.编写下单的控制器方法,下单后向订单交换机发送消息

@RestController
public class OrderController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    //下单
    @GetMapping("/place/{id}")
    public String placeOrder(@PathVariable("id") String id){
        System.out.println("处理订单数据");
        //将订单id发送到订单队列
        rabbitTemplate.convertAndSend("order_exchange","order_routing",id);
        return "下单成功,修改库存。";
    }
}

5.编写监听死信队列的消费者

@Component
public class Consumer {
    // 监听过期队列
    @RabbitListener(queues = "expire_queue")
    public void listen_message(String id)  {
        System.out.println("查询订单号为:"+id+"的订单,如果已支付无需处理,未支付回退库存。 ");
    }
}

8.3 插件实现延迟队列

在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。

RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列。

1、使用xftpj将延迟插件上传到虚拟机

2.安装插件

# 将插件放入RabbitMQ插件目录中
mv rabbitmq_delayed_message_exchange-3.9.0.ez /usr/local/rabbitmq/plugins/
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.重启RabbitMQ服务

#停止rabbitmq
rabbitmqctl stop
#启动rabbitmq
rabbitmq-server restart -detached

此时登录管控台可以看到交换机类型多了延迟消息:

4、创建延迟交换机和延迟队列

package com.zj.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class RabbitConfig {
    // 创建延迟交换机和延迟队列
    private final String DELAYED_EXCHANGE = "delayed_exchange";
    private final String DELAYED_QUEUE = "delayed_queue";
    // 延迟交换机,ExchangeBuilder只能创建普通的交换机例如:topic、direct、fanout交换机。要创建延迟交换机只能创建自定义交换机。
    @Bean(DELAYED_EXCHANGE)
    public Exchange deadExchange(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type","topic"); //topic:延迟交换机的实际类型。
        return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",false,true,args);
    }
    // 延迟队列
    @Bean(DELAYED_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
                .durable(DELAYED_QUEUE)
                .build();
    }
    // 将延迟队列绑定到延迟交换机
    @Bean
    public Binding bindExchangeQueue(@Qualifier(DELAYED_EXCHANGE) Exchange exchange,
                                     @Qualifier(DELAYED_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("delayed-routing")
                .noargs();
    }
}

5.编写下单的控制器方法

package com.zj.controller;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class OrderController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    //下单
    @GetMapping("/place/{id}")
    public String placeOrder(@PathVariable("id") String id){
        System.out.println("处理订单数据");
        //设置消息的延迟时间为10s
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(10000);
                return message;
            }
        };
        rabbitTemplate.convertAndSend("delayed_exchange","delayed-routing",id,messagePostProcessor);
        return "下单成功,修改库存。";
    }
}

6.编写延迟队列的消费者

package com.zj.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
    // 监听延迟队列
    @RabbitListener(queues = "delayed_queue")
    public void listen_message(String id)  {
        System.out.println("查询订单号为:"+id+"的订单,如果已支付无需处理,未支付回退库存。 ");
    }
}

7、下单测试

延迟队列中没有消息是因为消费者将消息消费了。

九、RabbitMQ集群(暂略)

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
230 3
|
1月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
1月前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
1月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
2月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
2月前
|
消息中间件 监控 负载均衡
中间件RabbitMQ性能瓶颈
【7月更文挑战第13天】
147 11
|
2月前
|
消息中间件 NoSQL Kafka
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
|
2月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
2月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。