高性能消息中间件 RabbitMQ(五)

简介: 高性能消息中间件 RabbitMQ(五)

六、RabbitMQ高级特性

6.1 消费端限流

之前我们说MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

消费端限流的写法如下:

1.生产者批量发送消息

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSendBatch() {
        // 发送十条消息
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
        }
    }
}

2.消费端配置限流机制

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
    listener:
      simple:
        # 限流机制必须开启手动签收
        acknowledge-mode: manual
        # 消费端最多拉取20条消息消费,签收后不满20条才会继续拉取消息。
        prefetch: 20

3、消费者接受消息

package com.zj.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class OosConsumer {
    @RabbitListener(queues = "boot_queue")
    public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
        //1.获取消息
        System.out.println("当前时间:"+new String(message.getBody()));
        //2.模拟业务处理
        Thread.sleep(2000);
        //3.签收消息
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag,true);
    }
}

20230619

6.2 限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。

使用方法如下:

1.生产者批量发送消息

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSendBatch() {
        // 发送十条消息
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
        }
    }
}

2.消费端配置不公平分发

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
    listener:
      simple:
        # 限流机制必须开启手动签收
        acknowledge-mode: manual
        # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
        prefetch: 1

3、编写两个消费者消费相同的队列信息

package com.zj.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class UnfairConsumer {
    // 消费者1
    @RabbitListener(queues = "boot_queue")
    public void listenMessage1(Message message, Channel channel) throws Exception {
        //1.获取消息
        System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(500); // 消费者1处理快
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
    // 消费者2
    @RabbitListener(queues = "boot_queue")
    public void listenMessage2(Message message, Channel channel) throws Exception {
        //1.获取消息
        System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(3000);// 消费者2处理慢
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
消费者2:send message...1
消费者1:send message...0
19:53:21.676 INFO  ---  [main           ] com.zj.DemoApplication                            :Started DemoApplication in 0.867 seconds (JVM running for 1.259)
消费者1:send message...3
消费者1:send message...4
消费者1:send message...2
消费者1:send message...5
消费者1:send message...6
消费者2:send message...7
消费者1:send message...8
消费者1:send message...9

发现消费者1消费的要比消费者2消费的多。能者多劳。

6.3 设置队列所有消息存活时间

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

1、在创建队列时设置其存活时间:

// 创建队列
    @Bean(QUEUE_NAME)
    public Queue getMessageQueue() {
        return QueueBuilder
                .durable(QUEUE_NAME)// 队列名
                .ttl(10000)      //队列存活时间10s单位毫秒
                .build();
    }

2、生产者生产消息

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSendBatch() {
        // 发送十条消息
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
        }
    }
}

十秒后,未被消费的消息会被移除。

6.4 设置单条消息存活时间

1、在消息发送的时候设置发送时间

/*发送消息并设置消息的存活时间*/
    @Test
    public void testSend() {
        //1.创建消息属性
        MessageProperties messageProperties = new MessageProperties();
        //2.设置存活时间,单位毫秒
        messageProperties.setExpiration("10000");
        //3.创建消息对象
        Message message = new Message(("send message……").getBytes(), messageProperties);
        //4.发送消息
        rabbitTemplate.convertAndSend("boot_topic_exchange","message",message);
    }

注意:

  1. 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。
  2. 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

6.5 优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。

优先级队列用法如下:

1、设置队列的优先级

// 创建队列
    @Bean(QUEUE_NAME)
    public Queue getMessageQueue() {
        return QueueBuilder
                .durable(QUEUE_NAME)// 队列名
               // .ttl(10000)      //队列中消息存活时间10s单位毫秒
                .maxPriority(10)  //设置队列的优先级越大优先级越高,最大255,推荐最大不超过10
                .build();
    }

2、编写生产者发送有优先级的消息

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSend() {
        for (int i = 0; i < 10; i++) {
            if (i == 5) { // i为5时消息的优先级较高
                //1.创建消息属性
                MessageProperties messageProperties = new MessageProperties();
                //2.设置消息优先级
                messageProperties.setPriority(9);
                //3.创建消息对象
                Message message = new Message(("send message……" + i).getBytes(), messageProperties);
                rabbitTemplate.convertAndSend("boot_topic_exchange","message",message);
            }else {
                rabbitTemplate.convertAndSend("boot_topic_exchange","message","send message……" + i);
            }
        }
    }
}

3、编写消费者测试是否是第五条消息最先被消费

package com.zj.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "boot_queue")
    public void listen_message(Message message, Channel channel) throws IOException {
       //1.获取消息
        System.out.println(new String(message.getBody()));
        //2.手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
.   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.7.0)
17:47:14.858 INFO  ---  [main           ] com.zj.DemoApplication                            :Starting DemoApplication using Java 1.8.0_341 on ZHANGJIN with PID 26080 (D:\Java\code\springbootcode\sb_rabbitMQ_consumer\target\classes started by 张锦 in D:\Java\code\springbootcode\sb_rabbitMQ)
17:47:14.858 INFO  ---  [main           ] com.zj.DemoApplication                            :No active profile set, falling back to 1 default profile: "default"
17:47:15.482 INFO  ---  [main           ] o.s.a.rabbit.connection.CachingConnectionFactory  :Attempting to connect to: [192.168.66.100:5672]
17:47:15.498 INFO  ---  [main           ] o.s.a.rabbit.connection.CachingConnectionFactory  :Created new connection: rabbitConnectionFactory#2f2bf0e2:0/SimpleConnection@27f0ad19 [delegate=amqp://MQzhang@192.168.66.100:5672/, localPort= 53985]
17:47:15.529 INFO  ---  [main           ] com.zj.DemoApplication                            :Started DemoApplication in 0.893 seconds (JVM running for 1.338)
send message……5
send message……0
send message……1
send message……2
send message……3
send message……4
send message……6
send message……7
send message……8
send message……9

第五条消息首先被消费。

七、RabbitMQ死信队列

7.1 概念

在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。

消息成为死信的情况:

  1. 队列消息长度到达限制。
  2. 消费者拒签消息,并且不把消息重新放入原队列。
  3. 消息到达存活时间未被消费。

7.2 代码实现

1、创建死信交换机和死信队列

package com.zj.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
    //死信交换机和死信队列
    private final String DEAD_EXCHANGE = "dead_exchange";
    private final String DEAD_QUEUE = "dead_queue";
    //普通交换机和普通队列
    private final String NORMAL_EXCHANGE = "normal_exchange";
    private final String NORMAL_QUEUE = "normal_queue";
    // 创建死信交换机
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange(){
        return ExchangeBuilder
                .topicExchange(DEAD_EXCHANGE) //死信交换机类型和名称
                .durable(false)   //是否持久化
                .build();
    }
    // 创建死信队列
    @Bean(DEAD_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
                .durable(DEAD_QUEUE)   //死信队列名称
                .build();
    }
    // 死信交换机绑定死信队列
    @Bean
    public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,
                                 @Qualifier(DEAD_QUEUE) Queue queue){
         return BindingBuilder
                 .bind(queue)
                 .to(exchange)
                 .with("dead")     //交换机路由键
                 .noargs();
    }
    //创建普通交换机
    @Bean(NORMAL_EXCHANGE)
    public Exchange normalExchange(){
        return ExchangeBuilder
                .topicExchange(NORMAL_EXCHANGE) //普通交换机类型和名称
                .durable(false)   //是否持久化
                .build();
    }
    //创建普通队列
    @Bean(NORMAL_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
                .durable(NORMAL_QUEUE)   //普通信队列名称
                .deadLetterExchange(DEAD_EXCHANGE)  //绑定死信交换机,因为队列中的无法消费的信息会被放到死信交换机上。
                .deadLetterRoutingKey("dead")   //死信队列路由关键字
                .ttl(10000)  //消息存活时间
                .maxLength(10)  //消息最大长度
                .build();
    }
    //普通交换机绑定普通对列
    @Bean
    public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,
                                 @Qualifier(NORMAL_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("normal")
                .noargs();
    }
}

2.创建生产者发送消息(测试存活时间过期变成死信)

@Test
    public void testSend() {
        //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
        //1.存活时间过期
        rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
    }

十秒后

消息全部去了死信队列。

2.创建生产者(超过队列长度变成死信)

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSend() {
        //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
        //1.存活时间过期
//        rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
        //2.超过队列长度变成死信
        for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
        }
    }
}

因为设置了普通队列的长度,所以超出队列长度的那部分就去了死信队列。也设置了队列的存活时间,因此普通队列的消息在10秒后变成了死信。

2.创建生产者和消费者(超过队列长度变成死信)

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSend() {
        //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
        //1.生产者拒签消息,消息变成死信。
        rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
    }
}
@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "normal_queue")
    public void listen_message(Message message, Channel channel) throws IOException {
       //拒签消息
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
    }
}

拒签消息,消息变成了死信。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
3月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
19天前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
44 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
2月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
115 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
1月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
2月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
87 8
|
2月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
83 9
|
1月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
2月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
2月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践