RabbitMq确认机制&SpringBoot整合RabbitMQ

简介: RabbitMq确认机制&SpringBoot整合RabbitMQ

正文


一、RabitMQ如何确认消息不丢失


1、从生产者角度来考虑


产生原因:我们的生产者发送消息之后可能由于网络故障等各种原因导致我们的消息并没有发送到MQ之中,但是这个时候我们生产端又不知道我们的消息没有发出去,这就会造成消息的丢失。


解决方法:为了解决这个问题,RabbitMQ引入了事务机制和发送方确认机制(confirm)。事务机制开启之后,相当于同步执行,必然会降低系统的性能,因此一般我们不采用这种方式。确实机制,是当mq收到生产者发送的消息时,会返回一个ack告知生产者,收到了这条消息,如果没有收到,那就采取重试机制后者其他方式补偿。


事务方式


package com.xiaojie.rabbitmq.tx;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaojie.rabbitmq.MyConnection;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * @Description: mq事务模式保证消息可靠性
 * @author: xiaojie
 * @date: 2021.09.28
 */
public class TxProvider {
    //定义队列
    private static final String QUEUE_NAME = "myqueue";
    static Channel channel = null;
    static Connection connection = null;
    public static void main(String[] args) {
        try {
            System.out.println("生产者启动成功..");
            // 1.创建连接
            connection = MyConnection.getConnection();
            // 2.创建通道
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String msg = "测试事务机制保证消息发送可靠性。。。。";
            channel.txSelect(); //开启事务
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
            //发生异常时,mq中并没有新的消息入队列
            //int i=1/0;
            //没有发生异常,提交事务
            channel.txCommit();
            System.out.println("生产者发送消息成功:" + msg);
        } catch (Exception e) {
            e.printStackTrace();
            //发生异常则回滚事务
            try {
                if (channel != null) {
                    channel.txRollback();
                }
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}


消费者


package com.xiaojie.rabbitmq.tx;
import com.rabbitmq.client.*;
import com.xiaojie.rabbitmq.MyConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @Description: 事务模拟消费者
 * 如果不改为手动应达模式,那么事务开启对消费者没有影响
 * @author: xiaojie
 * @date: 2021.09.28
 */
public class Txconsumer {
    private static final String QUEUE_NAME = "myqueue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建我们的连接
        Connection connection = MyConnection.getConnection();
        // 2.创建我们通道
        Channel channel = connection.createChannel();
        channel.txSelect();//开启事务
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                //设置手动应答
                channel.basicAck(envelope.getDeliveryTag(), true);
                System.out.println("消费消息msg:" + msg+"手动应答:"+envelope.getDeliveryTag());
                channel.txCommit(); //消费者开启事务,必须要提交事务之后,消息才会从队列中移除,否则不移除。
            }
        };
        // 3.创建我们的监听的消息 false 关闭自动确认模式
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}


Confirm模式


package com.xiaojie.rabbitmq.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaojie.rabbitmq.MyConnection;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: confirm模式保证消息可靠性
 * @date 2021/9/28
 */
public class Provider {
    private static final String QUEUE_NAME = "myqueue";
    static Connection connection = null;
    static Channel channel = null;
    static String msg = "confirm模式保证消息可靠性。。。。";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        try {
            //创建连接
            connection = MyConnection.getConnection();
            //创建通道
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.confirmSelect();//开启确认模式,确认消息已经被mq持久化到硬盘上
            channel.basicQos(1);//每次发送一条消息,并且收到消费者的ack之后才会发送第二条
            //如果异常进行重试,超过重试次数放弃执行
//            int i=1/0;
            //发送消息
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
            if (channel.waitForConfirms()) {
                //确认消息已经持久化到硬盘
                System.out.println("消息发送成功。。。。。。");
            }
        } catch (Exception e) {
            for (int i = 0; i < 3; i++) {
                //发送消息
                System.out.println("重试次数" + (i + 1));
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
                if (i >= 3) {
                    //实际生产时候,可以放到表中记录失败的消息,然后采取补偿措施
                    break;
                }
            }
        } finally {
            //关闭通道,关闭连接
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}


2、从mq角度考虑


产生原因:消息到达mq之后,mq宕机了,然后消息又没有进行持久化,这时消息就会丢失。如果队列满了,或者mq拒绝接受消息时都会导致消息丢失(死信队列)。


解决方法:开启mq的持久化机制,消息队列,交换机、消息都要开启持久化。当然也存在特殊情况,消息在还没有持久化到硬盘的时候宕机了,这种小概率事件的发生。也可以采用集群模式,尽可能保证消息的可靠性。(假如机房炸了,这种情况实在也是没有办法)。


3、从消费者角度考虑


产生原因:


在RabbitMQ将消息发出后,消费端还没接收到消息之前,发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失;

在RabbitMQ将消息发出后,消费端还没接收到消息之前,消费端挂了,此时消息会丢失;

消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。


解决方法:开启手动应答模式,只要mq没有收到消费者已经消费掉消息的ack,那消息就不会从队列中移除。后面整合Springboot时,在代码中标明。


二、RabbitMQ持久化机制


手动开启持久化


555.jpg


如图,在手动创建队列,交换机时,开启持久化模式,默认情况下是持久化方式的。


2、代码方式


//队列持久化
 channel.queueDeclare(EMAIL_QUEUE_FANOUT, true, false, false, null);
 //生产者绑定交换机 参数1 交换机的名称。2,交换机的类型,3,true标识持久化
 channel.exchangeDeclare(EXCHANGE, "direct",true);


Springboot模式下默认绑定是持久化绑定(springboot2.4.2版本)


三、SpringBoot整合RabbitMQ


配置文件


package com.xiaojie.springboot.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 配置rabbitmq
 * @date 2021/9/25 22:16
 */
@Component
public class RabbitMqConfig {
    //定义队列
    private static final String MY_FANOUT_QUEUE = "xiaojie_fanout_queue";
    //定义队列
    private static final String MY_DIRECT_QUEUE = "xiaojie_direct_queue";
    //定义队列
    private static final String MY_TOPIC_QUEUE = "xiaojie_topic_queue";
    //定义fanout交换机
    private static final String MY_FANOUT_EXCHANGE = "xiaojie_fanout_exchange";
    //定义direct交换机
    private static final String MY_DIRECT_EXCHANGE = "xiaojie_direct_exchange";
    //定义topics交换机
    private static final String MY_TOPICS_EXCHANGE = "xiaojie_topics_exchange";
    //创建队列 默认开启持久化
    @Bean
    public Queue fanoutQueue() {
        return new Queue(MY_FANOUT_QUEUE);
    }
    //创建队列
    @Bean
    public Queue directQueue() {
        return new Queue(MY_DIRECT_QUEUE);
    }
    //创建队列
    @Bean
    public Queue topicQueue() {
        return new Queue(MY_TOPIC_QUEUE);
    }
    //创建fanout交换机 默认开启持久化
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(MY_FANOUT_EXCHANGE);
    }
    //创建direct交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(MY_DIRECT_EXCHANGE);
    }
    //创建direct交换机
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(MY_TOPICS_EXCHANGE);
    }
    //绑定fanout交换机
    @Bean
    public Binding fanoutBindingExchange(Queue fanoutQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }
    //绑定direct交换机
    @Bean
    public Binding directBindingExchange(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("msg.send");
    }
    //绑定topic交换机
    @Bean
    public Binding topicBindingExchange(Queue topicQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("msg.#");
    }
    //多个队列绑定到同一个交换机
//    @Bean
//    public Binding topicBindingExchange1(Queue directQueue, TopicExchange topicExchange) {
//        return BindingBuilder.bind(directQueue).to(topicExchange).with("msg.send");
//    }
}


生产者回调


package com.xiaojie.springboot.callback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
 * @Description: 生产者发送消息之后,接受服务器回调
 * @author: xiaojie
 * @date: 2021.09.29
 */
@Component
@Slf4j
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init() {
        //指定 ConfirmCallback
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("correlation>>>>>>>{},ack>>>>>>>>>{},cause>>>>>>>>{}", correlationData, ack, cause);
        if (ack) {
            //确认收到消息
        } else {
            //收到消息失败,可以开启重试机制,或者将失败的存起来,进行补偿
        }
    }
    /*
     *
     * @param returnedMessage
     * 消息是否从Exchange路由到Queue, 只有消息从Exchange路由到Queue失败才会回调这个方法
     * @author xiaojie
     * @date 2021/9/29 13:53
     * @return void
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("被退回信息是》》》》》》{}", returnedMessage.getMessage());
        log.info("replyCode》》》》》》{}", returnedMessage.getReplyCode());
        log.info("replyText》》》》》》{}", returnedMessage.getReplyText());
        log.info("exchange》》》》》》{}", returnedMessage.getExchange());
        log.info("routingKey>>>>>>>{}", returnedMessage.getRoutingKey());
    }
}


生产者


package com.xiaojie.springboot.provider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
/**
 * @author xiaojie
 * @version 1.0
 * @description:消息生产者 publisher-confirm-type: correlated
 * #NONE值是禁用发布确认模式,是默认值
 * #CORRELATED值是发布消息成功到交换器后会触发回调方法
 * #SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,
 * 其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,
 * 根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,将无法发送消息到broker
 * @date 2021/9/25 22:47
 */
@Component
@Slf4j
public class MsgProvider {
    //定义队列
    private static final String MY_FANOUT_QUEUE = "xiaojie_fanout_queue";
    private static final String MY_DIRECT_QUEUE = "xiaojie_direct_queue";
    private static final String DIRECT_ROUTING_KEY = "msg.send";
    private static final String MY_TOPIC_QUEUE = "xiaojie_topic_queue";
    private static final String TOPIC_ROUTING_KEY = "msg.send";
    //定义fanout交换机
    private static final String MY_FANOUT_EXCHANGE = "xiaojie_fanout_exchange";
    //定义direct交换机
    private static final String MY_DIRECT_EXCHANGE = "xiaojie_direct_exchange";
    //定义topics交换机
    private static final String MY_TOPICS_EXCHANGE = "xiaojie_topics_exchange";
    @Autowired
    private AmqpTemplate amqpTemplate;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public String sendMSg(Integer type, String msg) {
        try {
            if (1 == type) {
                //发送到fanout
                rabbitTemplate.convertAndSend(MY_FANOUT_QUEUE, "fanout" + msg);
            } else if (2 == type) {
                //第一个参数交换机,第二个参数,路由键,第三个参数,消息
                rabbitTemplate.convertAndSend(MY_DIRECT_EXCHANGE, DIRECT_ROUTING_KEY, "direct" + msg);
            } else if (3 == type) {
                rabbitTemplate.convertAndSend(MY_TOPICS_EXCHANGE, TOPIC_ROUTING_KEY, "topic" + msg);
            }
            return "success";
        } catch (AmqpException e) {
            e.printStackTrace();
        }
        return "error";
    }
}


消费者Fanout


package com.xiaojie.springboot.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
 * @author xiaojie
 * @version 1.0
 * @description: fanout消费者
 * @date 2021/9/25 22:57
 */
@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue("xiaojie_fanout_queue"),
        exchange = @Exchange(value = "xiaojie_fanout_exchange", type = ExchangeTypes.FANOUT),
        key = ""))
@Slf4j
public class FanoutMsgConsumer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * @description: 接收到信息
     * @param:
     * @param: msg
     * @return: void
     * @author xiaojie
     * @date: 2021/9/25 23:02
     */
    @RabbitHandler
    public void handlerMsg(@Payload String msg, @Headers Map<String, Object> headers,
                           Channel channel) throws IOException {
        log.info("接收到的消息是fanout:{}" + msg);
        //delivery tag可以从消息头里边get出来
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//        int i=1/0; 模拟重试机制,如果重试则代码不能try(有点类似事务),并且自动应答模式下,重试次数结束之后,自动应答消息出队列。
        //手动应答,第二个参数为是否批量处理
        channel.basicAck(deliveryTag, false);
        boolean redelivered = (boolean) headers.get(AmqpHeaders.REDELIVERED);
        //第二个参数为是否批量,第三个参数为是否重新进入队列,如果为true,则重新进入队列
//        channel.basicNack(deliveryTag, false, !redelivered);
    }
}


完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 Java 网络架构
|
2月前
|
架构师 Java 开发者
得物面试:Springboot自动装配机制是什么?如何控制一个bean 是否加载,使用什么注解?
在40岁老架构师尼恩的读者交流群中,近期多位读者成功获得了知名互联网企业的面试机会,如得物、阿里、滴滴等。然而,面对“Spring Boot自动装配机制”等核心面试题,部分读者因准备不足而未能顺利通过。为此,尼恩团队将系统化梳理和总结这一主题,帮助大家全面提升技术水平,让面试官“爱到不能自已”。
得物面试:Springboot自动装配机制是什么?如何控制一个bean 是否加载,使用什么注解?
|
4月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
4月前
|
Java 数据库 开发者
深入剖析 SpringBoot 的 SPI 机制
【8月更文挑战第10天】在软件开发中,SPI(Service Provider Interface)机制是一种重要的服务发现和加载机制,尤其在构建模块化、可扩展的系统时尤为重要。SpringBoot作为Spring家族的一员,其内置的SPI机制不仅继承了Java SPI的设计思想,还进行了优化和扩展,以适应Spring Boot特有的需求。本文将深入剖析SpringBoot中的SPI机制,揭示其背后的原理与应用。
92 7
|
4月前
|
消息中间件 Java Maven
|
4月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
877 2
|
4月前
|
Java 开发者 Spring
"揭秘SpringBoot魔法SPI机制:一键解锁服务扩展新姿势,让你的应用灵活飞天!"
【8月更文挑战第11天】SPI(Service Provider Interface)是Java的服务提供发现机制,用于运行时动态查找和加载服务实现。SpringBoot在其基础上进行了封装和优化,通过`spring.factories`文件提供更集中的配置方式,便于框架扩展和组件替换。本文通过定义接口`HelloService`及其实现类`HelloServiceImpl`,并在`spring.factories`中配置,结合`SpringFactoriesLoader`加载服务,展示了SpringBoot SPI机制的工作流程和优势。
61 5
|
4月前
|
安全 Java UED
掌握SpringBoot单点登录精髓,单点登录是一种身份认证机制
【8月更文挑战第31天】单点登录(Single Sign-On,简称SSO)是一种身份认证机制,它允许用户只需在多个相互信任的应用系统中登录一次,即可访问所有系统,而无需重复输入用户名和密码。在微服务架构日益盛行的今天,SSO成为提升用户体验和系统安全性的重要手段。本文将详细介绍如何在SpringBoot中实现SSO,并附上示例代码。
85 0
|
4月前
|
消息中间件 Java Kafka
深入SpringBoot的心脏地带:掌握其核心机制的全方位指南
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型,具有高可靠性及稳定性;RocketMQ 则是由阿里巴巴开源的高性能分布式消息队列,支持事务消息等多种特性;而 Kafka 是 LinkedIn 开源的分布式流处理平台,以其高吞吐量和良好的可扩展性著称。文中还提供了使用这三种消息队列产品的示例代码。总之,这三款产品各有优势,适用于不同场景。
17 0
|
4月前
|
消息中间件 Java Kafka
SpringBoot大揭秘:如何轻松掌握其核心机制?
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型,具有高可靠性及稳定性;RocketMQ 则是由阿里巴巴开源的高性能分布式消息队列,支持事务消息等多种特性;而 Kafka 是 LinkedIn 开源的分布式流处理平台,以其高吞吐量和良好的可扩展性著称。文中还提供了使用这三种消息队列产品的示例代码。
16 0