在 SpringCloud 技术栈的学习中,我们掌握了微服务的基础通信与协作模式,但微服务架构的深度优化离不开消息中间件的支撑。作为解决服务同步调用痛点的核心组件,消息中间件(MQ)通过异步通信模式,在服务解耦、性能优化、流量削峰等场景中发挥着不可替代的作用。本章将从核心概念入手,结合 RabbitMQ 与 SpringAMQP 的实战案例,带你全面掌握 MQ 的应用技巧。
一、初识 MQ:微服务异步通信的核心
1.1 同步 vs 异步:两种通信模式的博弈
微服务间的通信主要分为同步和异步两类,二者各有适配场景:
同步通讯:类似打电话,需实时响应。如之前学习的 Feign 调用,能立即获取结果,但存在耦合度高、性能吞吐受限、级联失败风险等问题。
异步通讯:类似发邮件,无需即时回复。通过 "发布者 - Broker - 订阅者" 模式,发布者发送事件后无需等待,订阅者按需处理,从根源上解决同步调用的痛点。
异步通讯的核心优势的:
吞吐量提升:无需等待订阅者处理,响应更快速
故障隔离:服务无直接依赖,避免级联失败
资源高效:无阻塞等待,减少无效资源占用
低耦合:服务可灵活插拔、替换
流量削峰:Broker 缓冲波动流量,订阅者按能力处理
其唯一不足是架构复杂度增加,需依赖 Broker 的可靠性、安全性和性能 —— 而成熟的 MQ 产品已完美解决这些问题。
1.2 什么是 MQ?主流产品对比
MQ(Message Queue)即消息队列,是异步通信中的 "Broker",负责存储和转发消息。主流开源 MQ 产品各有特性,选型需结合业务需求:
特性 RabbitMQ ActiveMQ RocketMQ Kafka
公司 / 社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP、XMPP 等 OpenWire、STOMP 等 自定义协议 自定义协议
可用性 高 一般 高 高
单机吞吐量 一般 差 高 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 高 一般 高 一般
选型建议:
追求可用性:Kafka、RocketMQ、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求低延迟:RabbitMQ、Kafka
本章以 RabbitMQ 为实战载体,其成熟的生态和 Spring 的良好适配,是微服务场景的优选方案。
二、RabbitMQ 快速入门
2.1 环境搭建:Docker 安装 RabbitMQ
推荐使用 Docker 快速部署 RabbitMQ(带管理界面):
拉取镜像:docker pull rabbitmq:3.8-management(在线)或docker load -i mq.tar(离线)
启动容器:
bash
运行
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \ # 管理界面端口
-p 5672:5672 \ # 消息通信端口
-d \
rabbitmq:3.8-management
2.2 核心角色与消息模型
RabbitMQ 的核心角色包括:
Publisher:消息生产者(发送消息)
Consumer:消息消费者(处理消息)
Exchange:交换机,负责消息路由(不存储消息)
Queue:队列,存储消息
VirtualHost:虚拟主机,隔离不同租户的资源
RabbitMQ 提供 5 种消息模型,涵盖从简单队列到复杂订阅的各类场景,本章将重点讲解最常用的 4 种。
2.3 入门案例:简单队列模式
简单队列是最基础的模型,仅包含生产者、队列、消费者三个角色,消息直接从生产者发送到队列,消费者订阅队列获取消息。
2.3.1 生产者实现(原生 API)
java
运行
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.创建连接工厂并配置参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.150.101"); // 虚拟机IP
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 2.建立连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3.声明队列(不存在则创建)
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭资源
channel.close();
connection.close();
}
}
2.3.2 消费者实现(原生 API)
java
运行
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.配置连接参数(同生产者)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 2.建立连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3.声明队列(与生产者保持一致)
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息并处理
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息...");
}
}
2.3.3 核心流程总结
发送流程:建立连接 → 创建通道 → 声明队列 → 发送消息 → 关闭资源
接收流程:建立连接 → 创建通道 → 声明队列 → 定义消费逻辑 → 绑定消费者与队列
三、SpringAMQP:RabbitMQ 的 Spring 优雅封装
SpringAMQP 是 Spring 对 RabbitMQ 的封装实现,支持自动装配、注解驱动,大幅简化开发。其核心功能包括:自动声明队列 / 交换机 / 绑定关系、注解式监听器、RabbitTemplate 工具类。
3.1 环境准备
在父工程mq-demo中引入依赖:
xml
org.springframework.boot
spring-boot-starter-amqp
在生产者和消费者服务的application.yml中配置 MQ 连接:
yaml
spring:
rabbitmq:
host: 192.168.150.101 # 虚拟机IP
port: 5672
virtual-host: /
username: itcast
password: 123321
3.2 简单队列(Basic Queue)实战
3.2.1 生产者发送消息
使用RabbitTemplate简化消息发送:
java
运行
@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
String queueName = "simple.queue";
String message = "hello, spring amqp!";
// 发送消息(队列名,消息内容)
rabbitTemplate.convertAndSend(queueName, message);
}
}
3.2.2 消费者接收消息
通过@RabbitListener注解声明消费者:
java
运行
@Component
public class SpringRabbitListener {
// 监听指定队列
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) {
System.out.println("Spring消费者接收到消息:【" + msg + "】");
}
}
3.3 工作队列(Work Queue):解决消息堆积
当消息生产速度大于消费速度时,会出现消息堆积。工作队列模式通过多个消费者共享一个队列,实现消息分流处理。
3.3.1 生产者:模拟消息堆积
java
运行
@Test
public void testWorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello, message_";
// 循环发送50条消息
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
3.3.2 消费者:多实例消费
java
运行
// 消费者1:快速处理(睡眠20ms)
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
// 消费者2:慢速处理(睡眠200ms)
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
3.3.3 优化:能者多劳
默认情况下消息平均分配,导致慢消费者堆积。通过配置prefetch参数限制消费者预取数量:
yaml
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次仅获取1条消息,处理完成后再获取下一条
3.4 发布 / 订阅模式:交换机的核心作用
发布 / 订阅模式通过交换机(Exchange)实现消息路由,生产者将消息发送到交换机,交换机根据类型将消息转发到绑定的队列。交换机不存储消息,路由失败则消息丢失。
3.4.1 广播模式(Fanout Exchange)
Fanout 交换机将消息转发给所有绑定的队列,实现 "一次发送,多端接收"。
声明交换机和队列(消费者端):
java
运行
@Configuration
public class FanoutConfig {
// 声明Fanout交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("itcast.fanout");
}
// 声明队列1
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
// 绑定队列1与交换机
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
// 声明队列2并绑定
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
生产者发送消息:
java
运行
@Test
public void testFanoutExchange() {
String exchangeName = "itcast.fanout";
String message = "hello, everyone!";
// 发送到交换机(路由键为空)
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
消费者接收消息:
java
运行
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
3.4.2 定向模式(Direct Exchange)
Direct 交换机根据路由键(RoutingKey)匹配队列,仅将消息转发给路由键完全一致的队列。
通过注解快速声明交换机、队列和绑定关系:
java
运行
// 队列1绑定red、blue路由键
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) {
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
// 队列2绑定red、yellow路由键
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg) {
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
生产者发送指定路由键的消息:
java
运行
@Test
public void testSendDirectExchange() {
String exchangeName = "itcast.direct";
String message = "红色警报!海洋生物变异,惊现哥斯拉!";
// 发送消息(指定路由键red)
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
3.4.3 通配符模式(Topic Exchange)
Topic 交换机支持路由键通配符,适配更灵活的路由场景。路由键由多个单词组成,以.分隔:
:匹配 0 个或多个单词(如china.#可匹配china.news、china.weather.today)
:匹配恰好 1 个单词(如item.可匹配item.insert,不可匹配item.insert.user)
实战案例:
java
运行
// 监听china.#路由键
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg) {
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
// 监听#.news路由键
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg) {
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
生产者发送消息:
java
运行
@Test
public void testSendTopicExchange() {
String exchangeName = "itcast.topic";
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 路由键为china.news,将被两个队列同时接收
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
3.5 消息转换器:优化序列化方式
Spring 默认使用 JDK 序列化,存在数据体积大、可读性差、安全隐患等问题。推荐使用 JSON 序列化替代。
3.5.1 配置 JSON 转换器
引入依赖(生产者和消费者均需):
xml
com.fasterxml.jackson.dataformat
jackson-dataformat-xml
2.9.10
在启动类中注册转换器 Bean:
java
运行
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
3.5.2 发送和接收对象消息
java
运行
// 生产者:发送Map对象
@Test
public void testSendMap() {
Map msg = new HashMap<>();
msg.put("name", "Jack");
msg.put("age", 21);
// 发送到object.queue队列
rabbitTemplate.convertAndSend("object.queue", msg);
}
// 消费者:接收Map对象
@RabbitListener(queues = "object.queue")
public void listenObjectQueueMessage(Map obj) throws InterruptedException {
System.err.println("消费者接收到对象消息:【" + obj + "】" + LocalDateTime.now());
Thread.sleep(200);
}
四、核心总结与拓展
4.1 关键知识点回顾
消息模型:简单队列(一对一)、工作队列(一对多)、发布 / 订阅(多对多)
交换机类型:Fanout(广播)、Direct(定向)、Topic(通配符)
核心优化:prefetch 参数实现 "能者多劳",JSON 转换器优化序列化
SpringAMQP 核心 API:RabbitTemplate(发送)、@RabbitListener(接收)、@QueueBinding(声明资源)
4.2 拓展阅读
RocketMQ 实战总结:https://kdocs.cn/l/cqlkfURBJ85w
RabbitMQ 官方文档:https://www.rabbitmq.com/documentation.html
SpringAMQP 官方指南:https://spring.io/projects/spring-amqp