六、RabbitMQ高级特性
6.1 消费端限流
之前我们说MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。
消费端限流的写法如下:
1.生产者批量发送消息
@SpringBootTest class DemoApplicationTests { @Resource public RabbitTemplate rabbitTemplate; @Test public void testSendBatch() { // 发送十条消息 for (int i = 0; i < 100; i++) { rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i); } } }
2.消费端配置限流机制
spring: rabbitmq: host: 192.168.66.100 port: 5672 username: MQzhang password: MQzhang virtual-host: / listener: simple: # 限流机制必须开启手动签收 acknowledge-mode: manual # 消费端最多拉取20条消息消费,签收后不满20条才会继续拉取消息。 prefetch: 20
3、消费者接受消息
package com.zj.consumer; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class OosConsumer { @RabbitListener(queues = "boot_queue") public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException { //1.获取消息 System.out.println("当前时间:"+new String(message.getBody())); //2.模拟业务处理 Thread.sleep(2000); //3.签收消息 long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag,true); } }
20230619
6.2 限流实现不公平分发
在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。
使用方法如下:
1.生产者批量发送消息
@SpringBootTest class DemoApplicationTests { @Resource public RabbitTemplate rabbitTemplate; @Test public void testSendBatch() { // 发送十条消息 for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i); } } }
2.消费端配置不公平分发
spring: rabbitmq: host: 192.168.66.100 port: 5672 username: MQzhang password: MQzhang virtual-host: / listener: simple: # 限流机制必须开启手动签收 acknowledge-mode: manual # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发 prefetch: 1
3、编写两个消费者消费相同的队列信息
package com.zj.consumer; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class UnfairConsumer { // 消费者1 @RabbitListener(queues = "boot_queue") public void listenMessage1(Message message, Channel channel) throws Exception { //1.获取消息 System.out.println("消费者1:"+new String(message.getBody(),"UTF-8")); //2. 处理业务逻辑 Thread.sleep(500); // 消费者1处理快 //3. 手动签收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); } // 消费者2 @RabbitListener(queues = "boot_queue") public void listenMessage2(Message message, Channel channel) throws Exception { //1.获取消息 System.out.println("消费者2:"+new String(message.getBody(),"UTF-8")); //2. 处理业务逻辑 Thread.sleep(3000);// 消费者2处理慢 //3. 手动签收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); } }
消费者2:send message...1 消费者1:send message...0 19:53:21.676 INFO --- [main ] com.zj.DemoApplication :Started DemoApplication in 0.867 seconds (JVM running for 1.259) 消费者1:send message...3 消费者1:send message...4 消费者1:send message...2 消费者1:send message...5 消费者1:send message...6 消费者2:send message...7 消费者1:send message...8 消费者1:send message...9
发现消费者1消费的要比消费者2消费的多。能者多劳。
6.3 设置队列所有消息存活时间
RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。
1、在创建队列时设置其存活时间:
// 创建队列 @Bean(QUEUE_NAME) public Queue getMessageQueue() { return QueueBuilder .durable(QUEUE_NAME)// 队列名 .ttl(10000) //队列存活时间10s单位毫秒 .build(); }
2、生产者生产消息
@SpringBootTest class DemoApplicationTests { @Resource public RabbitTemplate rabbitTemplate; @Test public void testSendBatch() { // 发送十条消息 for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i); } } }
十秒后,未被消费的消息会被移除。
6.4 设置单条消息存活时间
1、在消息发送的时候设置发送时间
/*发送消息并设置消息的存活时间*/ @Test public void testSend() { //1.创建消息属性 MessageProperties messageProperties = new MessageProperties(); //2.设置存活时间,单位毫秒 messageProperties.setExpiration("10000"); //3.创建消息对象 Message message = new Message(("send message……").getBytes(), messageProperties); //4.发送消息 rabbitTemplate.convertAndSend("boot_topic_exchange","message",message); }
注意:
- 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。
- 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。
6.5 优先级队列
假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。
优先级队列用法如下:
1、设置队列的优先级
// 创建队列 @Bean(QUEUE_NAME) public Queue getMessageQueue() { return QueueBuilder .durable(QUEUE_NAME)// 队列名 // .ttl(10000) //队列中消息存活时间10s单位毫秒 .maxPriority(10) //设置队列的优先级越大优先级越高,最大255,推荐最大不超过10 .build(); }
2、编写生产者发送有优先级的消息
@SpringBootTest class DemoApplicationTests { @Resource public RabbitTemplate rabbitTemplate; @Test public void testSend() { for (int i = 0; i < 10; i++) { if (i == 5) { // i为5时消息的优先级较高 //1.创建消息属性 MessageProperties messageProperties = new MessageProperties(); //2.设置消息优先级 messageProperties.setPriority(9); //3.创建消息对象 Message message = new Message(("send message……" + i).getBytes(), messageProperties); rabbitTemplate.convertAndSend("boot_topic_exchange","message",message); }else { rabbitTemplate.convertAndSend("boot_topic_exchange","message","send message……" + i); } } } }
3、编写消费者测试是否是第五条消息最先被消费
package com.zj.consumer; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class Consumer { // 监听队列 @RabbitListener(queues = "boot_queue") public void listen_message(Message message, Channel channel) throws IOException { //1.获取消息 System.out.println(new String(message.getBody())); //2.手动签收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); } }
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.7.0) 17:47:14.858 INFO --- [main ] com.zj.DemoApplication :Starting DemoApplication using Java 1.8.0_341 on ZHANGJIN with PID 26080 (D:\Java\code\springbootcode\sb_rabbitMQ_consumer\target\classes started by 张锦 in D:\Java\code\springbootcode\sb_rabbitMQ) 17:47:14.858 INFO --- [main ] com.zj.DemoApplication :No active profile set, falling back to 1 default profile: "default" 17:47:15.482 INFO --- [main ] o.s.a.rabbit.connection.CachingConnectionFactory :Attempting to connect to: [192.168.66.100:5672] 17:47:15.498 INFO --- [main ] o.s.a.rabbit.connection.CachingConnectionFactory :Created new connection: rabbitConnectionFactory#2f2bf0e2:0/SimpleConnection@27f0ad19 [delegate=amqp://MQzhang@192.168.66.100:5672/, localPort= 53985] 17:47:15.529 INFO --- [main ] com.zj.DemoApplication :Started DemoApplication in 0.893 seconds (JVM running for 1.338) send message……5 send message……0 send message……1 send message……2 send message……3 send message……4 send message……6 send message……7 send message……8 send message……9
第五条消息首先被消费。
七、RabbitMQ死信队列
7.1 概念
在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。
消息成为死信的情况:
- 队列消息长度到达限制。
- 消费者拒签消息,并且不把消息重新放入原队列。
- 消息到达存活时间未被消费。
7.2 代码实现
1、创建死信交换机和死信队列
package com.zj.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { //死信交换机和死信队列 private final String DEAD_EXCHANGE = "dead_exchange"; private final String DEAD_QUEUE = "dead_queue"; //普通交换机和普通队列 private final String NORMAL_EXCHANGE = "normal_exchange"; private final String NORMAL_QUEUE = "normal_queue"; // 创建死信交换机 @Bean(DEAD_EXCHANGE) public Exchange deadExchange(){ return ExchangeBuilder .topicExchange(DEAD_EXCHANGE) //死信交换机类型和名称 .durable(false) //是否持久化 .build(); } // 创建死信队列 @Bean(DEAD_QUEUE) public Queue deadQueue(){ return QueueBuilder .durable(DEAD_QUEUE) //死信队列名称 .build(); } // 死信交换机绑定死信队列 @Bean public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange, @Qualifier(DEAD_QUEUE) Queue queue){ return BindingBuilder .bind(queue) .to(exchange) .with("dead") //交换机路由键 .noargs(); } //创建普通交换机 @Bean(NORMAL_EXCHANGE) public Exchange normalExchange(){ return ExchangeBuilder .topicExchange(NORMAL_EXCHANGE) //普通交换机类型和名称 .durable(false) //是否持久化 .build(); } //创建普通队列 @Bean(NORMAL_QUEUE) public Queue normalQueue(){ return QueueBuilder .durable(NORMAL_QUEUE) //普通信队列名称 .deadLetterExchange(DEAD_EXCHANGE) //绑定死信交换机,因为队列中的无法消费的信息会被放到死信交换机上。 .deadLetterRoutingKey("dead") //死信队列路由关键字 .ttl(10000) //消息存活时间 .maxLength(10) //消息最大长度 .build(); } //普通交换机绑定普通对列 @Bean public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange, @Qualifier(NORMAL_QUEUE) Queue queue){ return BindingBuilder .bind(queue) .to(exchange) .with("normal") .noargs(); } }
2.创建生产者发送消息(测试存活时间过期变成死信)
@Test public void testSend() { //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信 //1.存活时间过期 rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息"); }
十秒后
消息全部去了死信队列。
2.创建生产者(超过队列长度变成死信)
@SpringBootTest class DemoApplicationTests { @Resource public RabbitTemplate rabbitTemplate; @Test public void testSend() { //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信 //1.存活时间过期 // rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息"); //2.超过队列长度变成死信 for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息"); } } }
因为设置了普通队列的长度,所以超出队列长度的那部分就去了死信队列。也设置了队列的存活时间,因此普通队列的消息在10秒后变成了死信。
2.创建生产者和消费者(超过队列长度变成死信)
@SpringBootTest class DemoApplicationTests { @Resource public RabbitTemplate rabbitTemplate; @Test public void testSend() { //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信 //1.生产者拒签消息,消息变成死信。 rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息"); } } @Component public class Consumer { // 监听队列 @RabbitListener(queues = "normal_queue") public void listen_message(Message message, Channel channel) throws IOException { //拒签消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false); } }
拒签消息,消息变成了死信。