RabbitMQ发送方确认机制

简介: RabbitMQ发送方确认机制

1、前言

RabbitMQ消息首先发送到交换机,然后通过路由键【routingKey】和【bindingKey】比较从而将消息发送到对应的队列【queue】上。在这个过程有两个地方消息可能会丢失:

  1. 消息发送到交换机的过程。
  2. 消息从交换机发送到队列的过程。

而RabbitMQ提供了类似于回调函数的机制来告诉发送方消息是否发送成功。这里针对上述的两种情况,RabbitMQ也是给出了以下的应对策略:

  • publisher-confirm:消息到达交换机时会触发。
  • publisher-return:到达交换机但是没有路由到队列,会返回ack以及失败原因。

2、publisher-confirm

在SpringBoot项目的properties文件中加上

spring.rabbitmq.publisher-confirm-type=correlated

该配置有三个值:

  1. none:是禁用发布确认模式,是默认值
  2. correlated:是发布消息成功到交换器后会触发回调方法
  3. simple:有两种效果,第一种和correlated值一样会触发回调方法;第二种在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。

RabbitMQ的配置类实现ConfirmCallback

/*** @author LoneWalker* @date 2023/4/8* @description*/@Slf4j@ConfigurationpublicclassRabbitMqConfigimplementsRabbitTemplate.ConfirmCallback {
@BeanpublicRabbitTemplaterabbitTemplate(CachingConnectionFactoryconnectionFactory) {
RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
//设置给rabbitTemplaterabbitTemplate.setConfirmCallback(this);
returnrabbitTemplate;
    }
@BeanpublicMessageConverterjackson2JsonMessageConverter() {
returnnewJackson2JsonMessageConverter();
    }
@BeanpublicDirectExchangegetExchange(){
returnnewDirectExchange("directExchange",false,false);
    }
@BeanpublicQueuegetQueue(){
returnnewQueue("publisher.addUser",true,false,false);
    }
@BeanpublicBindinggetBinding(DirectExchangeexchange,Queuequeue){
returnBindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
    }
/*** 消息成功到达交换机会触发* @param correlationData* @param ack* @param cause*/@Overridepublicvoidconfirm(CorrelationDatacorrelationData, booleanack, Stringcause) {
if (ack) {
log.info("交换机收到消息成功:"+correlationData.getId());
        }else {
log.error("交换机收到消息失败:"+correlationData.getId() +"原因:"+cause);
        }
    }
}


而需要这个correlationData是因为确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。所以我们改写一下发送消息的方法:

@RequiredArgsConstructor@ServicepublicclassPublisherServiceImplimplementsPublisherService{
privatefinalRabbitTemplaterabbitTemplate;
@OverridepublicvoidaddUser(Useruser) {
CorrelationDatacorrelationData=newCorrelationData();
correlationData.setId(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user,correlationData);
    }
}

然后发送消息:

再模拟一下失败的情况——把交换机名称改成错的:

温馨提示:测试完把交换机名称改回去。

3、publisher-return

在SpringBoot项目的properties文件中添加:

spring.rabbitmq.publisher-returns=true

###消息在没有被队列接收时是否强行退回还是直接丢弃

spring.rabbitmq.template.mandatory=true

RabbitMQ的配置类再实现ReturnsCallback

@Slf4j@ConfigurationpublicclassRabbitMqConfigimplementsRabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@BeanpublicRabbitTemplaterabbitTemplate(CachingConnectionFactoryconnectionFactory) {
RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
//设置给rabbitTemplaterabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setMandatory(true);
returnrabbitTemplate;
    }
@BeanpublicMessageConverterjackson2JsonMessageConverter() {
returnnewJackson2JsonMessageConverter();
    }
@BeanpublicDirectExchangegetExchange(){
returnnewDirectExchange("directExchange",false,false);
    }
@BeanpublicQueuegetQueue(){
returnnewQueue("publisher.addUser",true,false,false);
    }
@BeanpublicBindinggetBinding(DirectExchangeexchange,Queuequeue){
returnBindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
    }
/*** 消息成功到达交换机会触发* @param correlationData* @param ack* @param cause*/@Overridepublicvoidconfirm(CorrelationDatacorrelationData, booleanack, Stringcause) {
if (ack) {
log.info("交换机收到消息成功:"+correlationData.getId());
        }else {
log.error("交换机收到消息失败:"+correlationData.getId() +"原因:"+cause);
        }
    }
/*** 消息未成功到达队列会触发* @param returnedMessage*/@OverridepublicvoidreturnedMessage(ReturnedMessagereturnedMessage) {
log.error("{}--消息未成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
    }
}

把路由键改为错误的值:

正常来说消息到达交换机就一定可以到达队列,到不了队列基本上就是代码写错了。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
消息中间件 存储 监控
|
消息中间件 存储 负载均衡
一文读懂RocketMQ的高可用机制——消息发送高可用
一文读懂RocketMQ的高可用机制——消息发送高可用
440 1
|
8月前
|
消息中间件 存储 运维
|
8月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
117 0
|
8月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
119 0
|
6月前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
|
5月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
90 0
|
5月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
83 0
|
5月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
73 0
|
7月前
|
消息中间件 Apache RocketMQ
消息队列 MQ产品使用合集之是否提供机制检测消费的状态
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。