服务器的异步通信——RabbitMQ1:https://developer.aliyun.com/article/1521829
工作消息队列(WorkQueue)
下面场景中如果queue中有50条请求消息,但是consumer1只能处理40条,剩余的10条就可以由consumer进行处理,所以说工作消息队列可以提高消息的处理速度,避免队列消息堆积
模拟Workqueue,实现一个队列绑定多个消费者,基本实现思路如下:
- 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue中
- 在consumer服务中定义两个消息监听者,都监听simple.queue队列
- 消费者1每秒处理50条消息,消费者2每秒处理10条消息
代码实现:
在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue中
public void testSendMessage2WorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, message__"; for (int i = 1; i <= 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
在consumer服务中定义两个消息监听者,都监听simple.queue队列,设置消费者1每秒处理50条消息,消费者2每秒处理10条消息
@RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200); }
修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限,确保消费者2取消息时只能取一条,提高效率(“能者多劳”):
spring: rabbitmq: listener: simple: prefetch: 1
运行结果:
发布、订阅(Publish、Subscribe)
发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。
常见exchange类型包括:
- Fanout:广播
- Direct:路由
- Topic:话题
exchange负责消息路由,而不是存储,路由失败则消息丢失
Fanout Exchange
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue中,如下:
基本实现思路如下:
- 在consumer中,利用代码声明队列、交换机,将二者进行绑定
- 在consumer中,编写两个消费方法,分别监听fanout.queue1和fanout.queue2
- 在publisher中编写测试方法,向fanout发送消息
代码实现:
在consumer中,利用代码声明队列、交换机,将二者进行绑定
@Configuration public class FanoutConfig { // itcast.fanout @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } // fanout.queue1 @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } // 绑定队列1到交换机 @Bean public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } // fanout.queue2 @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } // 绑定队列2到交换机 @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
在consumer中,编写两个消费方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】"); }
在publisher中编写测试方法,向fanout发送消息
@Test public void testSendFanoutExchange() { // 交换机名称 String exchangeName = "itcast.fanout"; // 消息 String message = "hello, every one!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "", message); }
运行结果:
Direct Exchange
Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此被称为路由模式
- l每一个Queue都与Exchange设置一个BindingKey
- l发布者发送消息时,指定消息的RoutingKey
- lExchange将消息路由到BindingKey与消息RoutingKey一致的队列
基本实现思路如下:
- 利用@RabbitListener声明Exchange、Queue、RoutingKey
- 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
- 在publisher中编写测试方法,向itcast. direct发送消息
代码实现:
在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2,并利用@RabbitListener声明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】"); }
在publisher服务发送消息到DirectExchange
@Test public void testSendDirectExchange() { // 交换机名称 String exchangeName = "itcast.direct"; // 消息 String message = "hello, red!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "red", message); }
运行结果:
Topic Exchange
Topic Exchange与Direct Exchange类似,区别在于Topic Exchange的routingKey必须是多个单词的列表,并且以.分割
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
基本实现思路如下:
- 利用@RabbitListener声明Exchange、Queue、RoutingKey
- 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
- 在publisher中编写测试方法,向itcast. topic发送消息
代码实现:
利用@RabbitListener声明Exchange、Queue、RoutingKey,在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】"); }
在publisher中编写测试方法,向itcast. topic发送消息
@Test public void testSendTopicExchange() { // 交换机名称 String exchangeName = "itcast.topic"; // 消息 String message = "今天天气不错,我的心情好极了!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.weather", message); }
运行结果:
SpringAMQP-消息转换器
在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter 类型的Bean即可。
推荐用JSON方式序列化,实现步骤如下:
在父工程中引入依赖
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
在publisher和consumer服务中声明MessageConverter:
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); }