0. 引言
上一章我们讲解了rabbitmq的四种交换机类型、七种通讯方式。本章我们将整合springboot来向大家完整演示rabbitmq的使用,并说明如何保证消息的可靠性。
1. RabbitMQ的安装
这里为了快速部署,我们通过docker来安装,如果需要其他安装方式的可以去rabbitmq官网或者github下载对应系统安装包来安装
1、下载镜像
docker pull rabbitmq
本文书写时,其版本为rabbitmq3.8.9
2、安装镜像
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
3、开启远程管理,否则通过15672无法登陆管理页面
进入到docker容器中执行:
# 查看容器id
docker ps -a
# 进入容器
docker exec -it 容器id /bin/bssh
# 容器内执行
rabbitmq-plugins enable rabbitmq_management
4、访问ip:15672。如果是在虚拟机中安装的,记得开通15672,5672端口
2. RabbitMQ的使用
2.2 springboot整合RabbitMQ
1、创建springboot项目
2、在项目中引入amqp
依赖,这里的版本与springboot保持一致
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.7.RELEASE</version>
</dependency>
2、配置文件中添加rabbitmq的配置
spring:
# rabbitmq配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
3、声明及创建交换机、队列,如果不知道交换机、队列、routingKey概念的,可以先查看上一篇rabbitmq博客
这里我们创建一个队列和直接交换机来示例
(1)创建队列
@Bean
public Queue routingQueueA(){
return new Queue(RabbitConstant.ROUTING_QUEUE_A);
}
(2)创建交换机
@Bean
public DirectExchange testExchange(){
return new DirectExchange("test.exchange");
}
(3)绑定队列和交换机,并且设置routingKey
@Bean
public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
return BindingBuilder.bind(testQueue).to(testExchange).with("test.routing.key");
}
完整代码:
注意以上代码是写在一个配置类中的,目的是为了在项目启动时能够加载该类,并且创建对应的Bean,即队列、交换机和绑定关系
@Configuration
public class RabbitMqConfig {
@Bean
public Queue testQueue(){
return new Queue("test.queue");
// 另一种创建队列的方法
// return QueueBuilder.durable("test.queue").build();
}
@Bean
public DirectExchange testExchange(){
return new DirectExchange("test.exchange");
// 另一种创建交换机的方法
// return ExchangeBuilder.directExchange("test.exchange").build();
}
@Bean
public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
return BindingBuilder.bind(testQueue).to(testExchange).with("test.routing.key");
}
}
这里我们只声明了一个直接交换机,单个队列,rabbitmq的其他消息模型和交换机类型大家可以到上一篇中查看,这里不再累叙
4、生产者发送消息
(1)创建消息对象,这是我们要发送到消息队列中的自定义的消息对象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MyMessage implements Serializable {
private Long id;
private String title;
private String body;
private Date createDate;
}
(1)创建发送方法
public class QueueController {
private final RabbitTemplate rabbitTemplate;
@GetMapping("sendTestQueue")
public String sendTestQueue(){
MyMessage message = new MyMessage(1L,"物流提醒","到达装货区域,注意上传凭证",new Date());
rabbitTemplate.convertAndSend("test.exchange","test.routing.key", message);
return "发送成功";
}
}
5、创建消费者
@Component
public class QueueListener {
@RabbitListener(queues = "test.queue")
public void handler(MyMessage messageInfo, Message message, Channel channel) {
System.out.println("接收的消息:"+messageInfo);
}
}
6、测试调用我们的消息发送方法
7、可以看到结果中显示了我们刚刚发送的消息
2.3 RabbitMQ保证消息可靠性
我们在上一章没有讲解的Publisher Confirms模式就是用来保证消息可靠性的。下面我们来看看实现消息可靠性的具体代码,也就是实现Publisher Confirms模式。
2.3.1 哪些环节会导致消息丢失
首先我们要明白消息可靠性也就是保证消息不丢失,那么就要先理解消息会在哪些环节丢失,我们通过一张图来表述可能会导致消息丢失的环节
(1)生产者到交换机的过程中,如果生产者将消息发送出去了,但是还没送达之前,rabbitmq宕机了,或者因为网络原因消息在传输过程中丢失了,但生产者又不知道交换机没有收到,就会导致消息的丢失
(2)因为rabbitmq是基于内存运行的,当rabbitmq宕机或者重启,内存被初始化,就会导致消息丢失。
(3)交换机到队列的过程中,消息还没到达队列时,rabbitmq宕机了,就会导致消息丢失
(4)同2
(5)队列发送消息到消费者的过程中,当队列把消息发送出去了,在发送途中,因为网络波动或者消息者宕机导致消费者没有收到消息,但是队列并不知道消费者没有收到消息,就会导致消息丢失
(6)消费者接收到消息之后,还没有来得及处理消息,消费者就宕机了,也会导致消息丢失。
下面我们来针对这六个环节来谈谈如何保证消息不丢失
2.3.2 保证消息一定发送到交换机
利用消息队列的confirm机制可以保证消息发送到交换机的可靠性
- 思路:
所谓confirm机制就是:交换机收到消息后会发送一个ack回执给生产者,接收成功ack=true,接收失败ack=false。那么我们就可以通过设置一个回调函数来监听这个ack,如果接收失败就叫消息重发或者存到数据库中后续补发,如果没有收到ack就说明消息在传输中丢失了,那么也进行补发
在开始讲解代码实现之前,要向大家普通,rabbtimq中confirm机制提供了三种类型:
SIMPLE:,发送的消息到达生产者后会触发waitForConfirms回调方法,为同步方式
CORRELATED:发送的消息到达生产者后会触发回调方法,为异步方式,相比较于SIMPLE效率更高
NONE:禁用发布确认类型,是默认值
- 代码实现:
(1)首先将confirm机制设置为correlated
可以通过两种方式设置:一是在配置文件中设置,二是通过setPublisherConfirmType方法设置
spring:
rabbitmq:
publisher-confirm-type: correlated
(2)实现confirmCallback方法。这里的重试机制可以设置为重新发送,或者将消息存放到数据库后续再发送
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("消息发送交换机{}:correlationData({}),ack({}),cause({})",ack ? "成功":"失败", correlationData, ack, cause)
if(!ack){
// 发送失败进行重试机制
}
});
return rabbitTemplate;
}
confirmCallback方法中的三个参数分别为
correlationData:相关数据
ack:消息是否到达交换机,true是,false否
cause:发送失败原因
2.3.3 实现消息持久化
我们上述已经说明,消息存在exchange和queue时可能也会导致消息丢失,那么我们如何保证消息不丢失呢?
这里想象一下redis也是基于内存的,它怎么防止数据丢失呢?
那就是做持久化,所谓持久化,就是把数据保存到磁盘,rabbitmq中怎么实现呢,rabbitmq接收到消息后先存储到内存,然后再存储到磁盘,只有当磁盘保存完毕后,才发送回执给生产者,这样即使rabbitmq宕机了也不会导致消息丢失
- Exchange,Queue持久化
存储消息的地方有Exchange和Queue,那么我们就要在这两个地方实现持久化
我们查看之前创建交换机和队列的源码,其实会发现,里面有一个durable属性,就是用来声明是否持久化的,我们创建时如果不声明就默认为true了。
但是需要注意的是,这里的持久化只是用来控制交换机和队列是否持久化的。当durable=false时,只要rabbitmq重启,当没有消费者监听该交换机或者队列时,该交换机或队列就会被删除。常常用在临时队列中。durable=true时,交换机和队列就会被保存至磁盘,重启后会从磁盘读取到内存。
- 消息持久化
消息的载体是交换机和队列我们要先实现他们的持久化,然后再实现消息本身的持久化
原生的做法,是通过设置BasicProperties的deliveryMode为2来声明其消息实现持久化,如下所示:
AMQP.BasicProperties props = new AMQP.BasicProperties()
.builder()
.deliveryMode(2)
.build();
但我们现在的演示都是整合了springboot的,我们来看看其发送消息的方法的源码
(1)我们的消息发送是通过RabbitTemplate.convertAndSend
方法实现的。该方法中调用了this.convertMessageIfNecessary(object)
方法将消息进行了转换
(2)我们打开convertMessageIfNecessary
方法,方法中新建了一个MessageProperties
对象。咱们上述原生的声明不就是通过一个Properties对象来实现的吗,于是我们点击进该对象查看源码
(3)会发现该对象的无参构造方法中,声明了一个deliveryMode
属性,其值为常量DEFAULT_DELIVERY_MODE
(4)继续追踪该常量的值,会发现其定义就是一个枚举类MessageDeliveryMode.PERSISTENT
,其命名为持久化,通过其名称我们已经能够联想到什么了。
(5)为求真相,打开该枚举类,于是乎我们终于找到了我们想要的东西,其值就是2,与原生的设置异曲同工,这说明,amqp中默认就将消息设置为持久化的了。
所以呢,也不需要我们配置了,可能有同学会疑惑,都不用配置的,你讲他干嘛,这不浪费时间吗?
学习,讲究知其然,知其所以然。如果你抱有不求甚解的态度去学习,那么你能学到的永远是皮毛,经不起考究。
2.3.4 保证消息一定路由到队列
我们上述所说的confirm机制,只能保证消息发送到Exchange,并不能保证Exchange一定能将消息路由到Queue
我们就需要Return机制来保证消息能够路由到队列
- 思路:
Return机制,就是当消息进入从交换机转发到消息队列,但消息队列未收到时调用回调函数,可以在回调函数中通过实现我们的重试机制来实现消息不丢失。这里需要注意的是,return机制提供了两种模式,通过Mandatory属性来设置
(1)Mandatory=true,消息通过交换机无法匹配到队列时会返回给生产者,并触发returnCallback
(2)Mandatory=false,消息通过交换机无法匹配到队列时会直接丢弃消息,默认配置 - 代码:
(1)开启return机制
方式一:setPublisherReturns
connectionFactory.setPublisherReturns(true);
方式二:setMandatory,Mandatory为true时会自动开启return机制
rabbitTemplate.setMandatory(true);
(3)声明returnCallback方法:重试机制中可以重新发送消息,或者存储数据库后续重发
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// connectionFactory.setPublisherReturns(true);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
// 路由失败,进行重试机制
});
return rabbitTemplate;
}
returnCallback方法参数
message: 消息
replyCode:回应码
replyText:回应信息
exchange:交换机
routingKey:路由键
2.3.5 保证消息一定被消费者消费
想要保证消息一定被消费者消费,我们可以通过手动ACK的形式,我们上述讲到了消息接收后会发送一个ACK回执,通过该回执来确定消息是否达到
- 思路:
同理,针对发送给消费者,我们也可以通过手动ACK的形式,所谓手动ACK,就是消费者自己确定什么时候发送这个ACK回执过去,于是可以等到消息消费完毕后再发送这个回执回去,这样就能确保消息已经被消费,消息队列收到ACK后才将消息删除,如果这中间被中断,那么就不会有ACK回执,那消息队列中的消息就不会被删除
- 代码:
(1)开启手动ack
spring:
rabbitmq:
# 手动提交消息
listener:
simple:
acknowledge-mode: manual
direct:
acknowledge-mode: manual
acknowledge-mode提供了三种模式:
NONE:自动模式,默认配置,只要有消费者接受到消息,无论消费成功都认为消费成功
MANUAL:手动模式,消费者自己控制什么时候返回ACK
AUTO:自动模式,但会根据报错来决定是否删除队列中的消息,具体规则如下如果成功消费,没有抛出异常,则自动确认,删除队列中的消息 如果抛出AmqpRejectAndDontRequeueException异常,拒绝确认,不删除队列中的消息 如果抛出ImmediateAcknowledgeAmqpException异常,自动确认,删除队列汇总消息 如果抛出其他异常,则拒绝确认,不会删除队列中的消息
(2)消费者发送ack
@RabbitListener(queues = "test.queue")
public void handler(MyMessage messageInfo, Message message, Channel channel) {
try{
System.out.println("接收的消息:"+messageInfo.toString());
// 返回ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (IOException e){
try {
channel.basicRecover();
} catch (IOException ex) {
ex.printStackTrace();
log.error("消息处理失败:{}",e.getMessage());
}
}
}
配置文件中完整代码
@Configuration
@Slf4j
public class RabbitMqConfig {
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("消息发送交换机{}:correlationData({}),ack({}),cause({})",ack ? "成功":"失败", correlationData, ack, cause);
if(!ack){
// 发送失败进行重试机制
}
});
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
// 路由失败,进行重试机制
});
return rabbitTemplate;
}
@Bean
public Queue testQueue(){
return new Queue("test.queue");
}
@Bean
public DirectExchange testExchange(){
return new DirectExchange("test.exchange");
}
@Bean
public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
return BindingBuilder.bind(testQueue).to(testExchange).with("test.routing.key");
}
}