高性能消息中间件 RabbitMQ(六)

简介: 高性能消息中间件 RabbitMQ(六)

八、RabbitMQ延迟队列

8.1 概念

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

例如:用户下单后,30分钟后查询订单状态,未支付则会取消订单。

但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。

8.2 死信队列实现延迟队列

1.创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、lombok依赖。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>

2.编写配置文件

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
#日志格式
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

3.创建队列和交换机

package com.zj.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
    // 订单交换机和队列
    private final String ORDER_EXCHANGE = "order_exchange";
    private final String ORDER_QUEUE = "order_queue";
    // 过期订单交换机和队列(死信交换机和死信队列)
    private final String EXPIRE_EXCHANGE = "expire_exchange";
    private final String EXPIRE_QUEUE = "expire_queue";
    // 过期订单交换机
    @Bean(EXPIRE_EXCHANGE)
    public Exchange deadExchange(){
        return ExchangeBuilder
                .topicExchange(EXPIRE_EXCHANGE)
                .durable(false)
                .build();
    }
    // 过期订单队列
    @Bean(EXPIRE_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
                .durable(EXPIRE_QUEUE)
                .build();
    }
    // 将过期订单队列绑定到交换机
    @Bean
    public Binding bindDeadQueue(@Qualifier(EXPIRE_EXCHANGE) Exchange exchange,@Qualifier(EXPIRE_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("expire_routing")
                .noargs();
    }
    // 订单交换机
    @Bean(ORDER_EXCHANGE)
    public Exchange normalExchange(){
        return ExchangeBuilder
                .topicExchange(ORDER_EXCHANGE)
                .durable(false)
                .build();
    }
    // 订单队列
    @Bean(ORDER_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
                .durable(ORDER_QUEUE)
                .ttl(10000) // 存活时间为10s,模拟30min
                .deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机
                .deadLetterRoutingKey("expire_routing") // 死信交换机的路由关键字
                .build();
    }
    // 将订单队列绑定到交换机
    @Bean
    public Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,
                                   @Qualifier(ORDER_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("order_routing")
                .noargs();
    }
}

4.编写下单的控制器方法,下单后向订单交换机发送消息

@RestController
public class OrderController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    //下单
    @GetMapping("/place/{id}")
    public String placeOrder(@PathVariable("id") String id){
        System.out.println("处理订单数据");
        //将订单id发送到订单队列
        rabbitTemplate.convertAndSend("order_exchange","order_routing",id);
        return "下单成功,修改库存。";
    }
}

5.编写监听死信队列的消费者

@Component
public class Consumer {
    // 监听过期队列
    @RabbitListener(queues = "expire_queue")
    public void listen_message(String id)  {
        System.out.println("查询订单号为:"+id+"的订单,如果已支付无需处理,未支付回退库存。 ");
    }
}

8.3 插件实现延迟队列

在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。

RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列。

1、使用xftpj将延迟插件上传到虚拟机

2.安装插件

# 将插件放入RabbitMQ插件目录中
mv rabbitmq_delayed_message_exchange-3.9.0.ez /usr/local/rabbitmq/plugins/
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.重启RabbitMQ服务

#停止rabbitmq
rabbitmqctl stop
#启动rabbitmq
rabbitmq-server restart -detached

此时登录管控台可以看到交换机类型多了延迟消息:

4、创建延迟交换机和延迟队列

package com.zj.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class RabbitConfig {
    // 创建延迟交换机和延迟队列
    private final String DELAYED_EXCHANGE = "delayed_exchange";
    private final String DELAYED_QUEUE = "delayed_queue";
    // 延迟交换机,ExchangeBuilder只能创建普通的交换机例如:topic、direct、fanout交换机。要创建延迟交换机只能创建自定义交换机。
    @Bean(DELAYED_EXCHANGE)
    public Exchange deadExchange(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type","topic"); //topic:延迟交换机的实际类型。
        return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",false,true,args);
    }
    // 延迟队列
    @Bean(DELAYED_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
                .durable(DELAYED_QUEUE)
                .build();
    }
    // 将延迟队列绑定到延迟交换机
    @Bean
    public Binding bindExchangeQueue(@Qualifier(DELAYED_EXCHANGE) Exchange exchange,
                                     @Qualifier(DELAYED_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("delayed-routing")
                .noargs();
    }
}

5.编写下单的控制器方法

package com.zj.controller;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class OrderController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    //下单
    @GetMapping("/place/{id}")
    public String placeOrder(@PathVariable("id") String id){
        System.out.println("处理订单数据");
        //设置消息的延迟时间为10s
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(10000);
                return message;
            }
        };
        rabbitTemplate.convertAndSend("delayed_exchange","delayed-routing",id,messagePostProcessor);
        return "下单成功,修改库存。";
    }
}

6.编写延迟队列的消费者

package com.zj.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
    // 监听延迟队列
    @RabbitListener(queues = "delayed_queue")
    public void listen_message(String id)  {
        System.out.println("查询订单号为:"+id+"的订单,如果已支付无需处理,未支付回退库存。 ");
    }
}

7、下单测试

延迟队列中没有消息是因为消费者将消息消费了。

九、RabbitMQ集群(暂略)

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
4月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
2月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
134 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
3月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
159 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
2月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
3月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
223 9
|
3月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
119 10
|
2月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
3月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
3月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践