高性能消息中间件 RabbitMQ(五)

简介: 高性能消息中间件 RabbitMQ(五)

六、RabbitMQ高级特性

6.1 消费端限流

之前我们说MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

消费端限流的写法如下:

1.生产者批量发送消息

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSendBatch() {
        // 发送十条消息
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
        }
    }
}

2.消费端配置限流机制

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
    listener:
      simple:
        # 限流机制必须开启手动签收
        acknowledge-mode: manual
        # 消费端最多拉取20条消息消费,签收后不满20条才会继续拉取消息。
        prefetch: 20

3、消费者接受消息

package com.zj.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class OosConsumer {
    @RabbitListener(queues = "boot_queue")
    public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
        //1.获取消息
        System.out.println("当前时间:"+new String(message.getBody()));
        //2.模拟业务处理
        Thread.sleep(2000);
        //3.签收消息
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag,true);
    }
}

20230619

6.2 限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。

使用方法如下:

1.生产者批量发送消息

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSendBatch() {
        // 发送十条消息
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
        }
    }
}

2.消费端配置不公平分发

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
    listener:
      simple:
        # 限流机制必须开启手动签收
        acknowledge-mode: manual
        # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
        prefetch: 1

3、编写两个消费者消费相同的队列信息

package com.zj.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class UnfairConsumer {
    // 消费者1
    @RabbitListener(queues = "boot_queue")
    public void listenMessage1(Message message, Channel channel) throws Exception {
        //1.获取消息
        System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(500); // 消费者1处理快
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
    // 消费者2
    @RabbitListener(queues = "boot_queue")
    public void listenMessage2(Message message, Channel channel) throws Exception {
        //1.获取消息
        System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(3000);// 消费者2处理慢
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
消费者2:send message...1
消费者1:send message...0
19:53:21.676 INFO  ---  [main           ] com.zj.DemoApplication                            :Started DemoApplication in 0.867 seconds (JVM running for 1.259)
消费者1:send message...3
消费者1:send message...4
消费者1:send message...2
消费者1:send message...5
消费者1:send message...6
消费者2:send message...7
消费者1:send message...8
消费者1:send message...9

发现消费者1消费的要比消费者2消费的多。能者多劳。

6.3 设置队列所有消息存活时间

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

1、在创建队列时设置其存活时间:

// 创建队列
    @Bean(QUEUE_NAME)
    public Queue getMessageQueue() {
        return QueueBuilder
                .durable(QUEUE_NAME)// 队列名
                .ttl(10000)      //队列存活时间10s单位毫秒
                .build();
    }

2、生产者生产消息

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSendBatch() {
        // 发送十条消息
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
        }
    }
}

十秒后,未被消费的消息会被移除。

6.4 设置单条消息存活时间

1、在消息发送的时候设置发送时间

/*发送消息并设置消息的存活时间*/
    @Test
    public void testSend() {
        //1.创建消息属性
        MessageProperties messageProperties = new MessageProperties();
        //2.设置存活时间,单位毫秒
        messageProperties.setExpiration("10000");
        //3.创建消息对象
        Message message = new Message(("send message……").getBytes(), messageProperties);
        //4.发送消息
        rabbitTemplate.convertAndSend("boot_topic_exchange","message",message);
    }

注意:

  1. 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。
  2. 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

6.5 优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。

优先级队列用法如下:

1、设置队列的优先级

// 创建队列
    @Bean(QUEUE_NAME)
    public Queue getMessageQueue() {
        return QueueBuilder
                .durable(QUEUE_NAME)// 队列名
               // .ttl(10000)      //队列中消息存活时间10s单位毫秒
                .maxPriority(10)  //设置队列的优先级越大优先级越高,最大255,推荐最大不超过10
                .build();
    }

2、编写生产者发送有优先级的消息

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSend() {
        for (int i = 0; i < 10; i++) {
            if (i == 5) { // i为5时消息的优先级较高
                //1.创建消息属性
                MessageProperties messageProperties = new MessageProperties();
                //2.设置消息优先级
                messageProperties.setPriority(9);
                //3.创建消息对象
                Message message = new Message(("send message……" + i).getBytes(), messageProperties);
                rabbitTemplate.convertAndSend("boot_topic_exchange","message",message);
            }else {
                rabbitTemplate.convertAndSend("boot_topic_exchange","message","send message……" + i);
            }
        }
    }
}

3、编写消费者测试是否是第五条消息最先被消费

package com.zj.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "boot_queue")
    public void listen_message(Message message, Channel channel) throws IOException {
       //1.获取消息
        System.out.println(new String(message.getBody()));
        //2.手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
.   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.7.0)
17:47:14.858 INFO  ---  [main           ] com.zj.DemoApplication                            :Starting DemoApplication using Java 1.8.0_341 on ZHANGJIN with PID 26080 (D:\Java\code\springbootcode\sb_rabbitMQ_consumer\target\classes started by 张锦 in D:\Java\code\springbootcode\sb_rabbitMQ)
17:47:14.858 INFO  ---  [main           ] com.zj.DemoApplication                            :No active profile set, falling back to 1 default profile: "default"
17:47:15.482 INFO  ---  [main           ] o.s.a.rabbit.connection.CachingConnectionFactory  :Attempting to connect to: [192.168.66.100:5672]
17:47:15.498 INFO  ---  [main           ] o.s.a.rabbit.connection.CachingConnectionFactory  :Created new connection: rabbitConnectionFactory#2f2bf0e2:0/SimpleConnection@27f0ad19 [delegate=amqp://MQzhang@192.168.66.100:5672/, localPort= 53985]
17:47:15.529 INFO  ---  [main           ] com.zj.DemoApplication                            :Started DemoApplication in 0.893 seconds (JVM running for 1.338)
send message……5
send message……0
send message……1
send message……2
send message……3
send message……4
send message……6
send message……7
send message……8
send message……9

第五条消息首先被消费。

七、RabbitMQ死信队列

7.1 概念

在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。

消息成为死信的情况:

  1. 队列消息长度到达限制。
  2. 消费者拒签消息,并且不把消息重新放入原队列。
  3. 消息到达存活时间未被消费。

7.2 代码实现

1、创建死信交换机和死信队列

package com.zj.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
    //死信交换机和死信队列
    private final String DEAD_EXCHANGE = "dead_exchange";
    private final String DEAD_QUEUE = "dead_queue";
    //普通交换机和普通队列
    private final String NORMAL_EXCHANGE = "normal_exchange";
    private final String NORMAL_QUEUE = "normal_queue";
    // 创建死信交换机
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange(){
        return ExchangeBuilder
                .topicExchange(DEAD_EXCHANGE) //死信交换机类型和名称
                .durable(false)   //是否持久化
                .build();
    }
    // 创建死信队列
    @Bean(DEAD_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
                .durable(DEAD_QUEUE)   //死信队列名称
                .build();
    }
    // 死信交换机绑定死信队列
    @Bean
    public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,
                                 @Qualifier(DEAD_QUEUE) Queue queue){
         return BindingBuilder
                 .bind(queue)
                 .to(exchange)
                 .with("dead")     //交换机路由键
                 .noargs();
    }
    //创建普通交换机
    @Bean(NORMAL_EXCHANGE)
    public Exchange normalExchange(){
        return ExchangeBuilder
                .topicExchange(NORMAL_EXCHANGE) //普通交换机类型和名称
                .durable(false)   //是否持久化
                .build();
    }
    //创建普通队列
    @Bean(NORMAL_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
                .durable(NORMAL_QUEUE)   //普通信队列名称
                .deadLetterExchange(DEAD_EXCHANGE)  //绑定死信交换机,因为队列中的无法消费的信息会被放到死信交换机上。
                .deadLetterRoutingKey("dead")   //死信队列路由关键字
                .ttl(10000)  //消息存活时间
                .maxLength(10)  //消息最大长度
                .build();
    }
    //普通交换机绑定普通对列
    @Bean
    public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,
                                 @Qualifier(NORMAL_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("normal")
                .noargs();
    }
}

2.创建生产者发送消息(测试存活时间过期变成死信)

@Test
    public void testSend() {
        //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
        //1.存活时间过期
        rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
    }

十秒后

消息全部去了死信队列。

2.创建生产者(超过队列长度变成死信)

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSend() {
        //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
        //1.存活时间过期
//        rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
        //2.超过队列长度变成死信
        for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
        }
    }
}

因为设置了普通队列的长度,所以超出队列长度的那部分就去了死信队列。也设置了队列的存活时间,因此普通队列的消息在10秒后变成了死信。

2.创建生产者和消费者(超过队列长度变成死信)

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSend() {
        //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
        //1.生产者拒签消息,消息变成死信。
        rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
    }
}
@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "normal_queue")
    public void listen_message(Message message, Channel channel) throws IOException {
       //拒签消息
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
    }
}

拒签消息,消息变成了死信。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
230 3
|
1月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
1月前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
1月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
2月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
2月前
|
消息中间件 监控 负载均衡
中间件RabbitMQ性能瓶颈
【7月更文挑战第13天】
147 11
|
2月前
|
消息中间件 NoSQL Kafka
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
|
2月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
2月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。