RabbitMQ【应用 01】SpringBoot集成RabbitMQ及设置RabbitMQ启动总开关

简介: RabbitMQ【应用 01】SpringBoot集成RabbitMQ及设置RabbitMQ启动总开关

1.SpringBoot集成RabbitMQ

1.1 依赖及配置

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
  # 用于接收设备发送的数据
  rabbitmq:
    host: xxx.xx.xxx.xxx
    port: 5672
    username: guest
    password: guest
    mq-name: test
    # 确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    # 确认消息已发送到队列
    publisher-returns: true

1.2 消息监听与发送

  • 数据获取
@Component
@Slf4j
public class RabbitMessageQueueReceiver {
    @Autowired
    private ConfigProperties configProperties;
    @Autowired
    private AsyncConfig asyncConfig;
    @Autowired
    private DataGsmEquipComparisonManager dataGsmEquipComparisonManager;
    @RabbitListener(queuesToDeclare = {@Queue(name = "${spring.rabbitmq.mq-name}", durable = "true")}, ackMode = "MANUAL")
    @RabbitHandler()
    public void receive(String msg, Channel channel, Message message) throws IOException, InterruptedException {
      // 获取消息体
        String jsonString = new String(message.getBody());
        // 处理数据格式
        Map<String, Object> dataMap = dealMessageData(jsonString);
        try {
            asyncConfig.taskExecutor().execute(() -> {
                // 根据数据类型处理消息【这里大家根据实际情况进行处理】
                DealMessageByType.getInstance().dispose(dataMap);
            });
            channel.basicQos(5);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("error message:" + jsonString);
            try {
                channel.basicQos(5);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        }
    }
  • 数据发送
@Component
@Log4j
public class RabbitMessageQueueSender {
    public RabbitTemplate rabbitTemplate;
    public boolean sendMessage(String exchange, String routingKey, String message) {
        try {
      rabbitTemplate.convertAndSend(exchange, routingKey, message);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
}
  • 确认机制(消息发送到服务回调)
@Component
@Slf4j
public class RabbitmqSendCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void run() {
        if (rabbitTemplate != null) {
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setConfirmCallback(this);
        }
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String failCause) {
        if (ack) {
            log.info("消息发送成功");
        } else {
            log.info("消息发送失败,进行容错处理");
        }
        log.info("消息发送到交换机时的回调函数, ack:" + ack + "FailCause 消息:" + failCause);
    }
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息从交换机发送到队列时失败的回调函数, 调用失败!!!" + returned);
    }
}

2.设置RabbitMQ启动总开关

SpringBoot 项目集成了 RabbitMQ 但是有时候又用不到它,比如说:

  • 开发跟 RabbitMQ 服务无关接口时,此时 MQ 服务如果未启动,会有报错信息不断打印出来。
  • 不同的用户部署时,有可能用不到 RabbitMQ,此时没有部署 MQ,启动项目时不能报错。

核心报错信息:

WARN  o.s.boot.actuate.amqp.RabbitHealthIndicator - Rabbit health check failed
Caused by: java.net.ConnectException: Connection refused: connect

详细报错信息:

[2023-03-16 11:18:11.456] traceId= [RMI TCP Connection(8)-xxx.xxx.xx.xxx] WARN  o.s.boot.actuate.amqp.RabbitHealthIndicator - Rabbit health check failed
org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
  at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:61)
  at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:602)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:725)
  at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:252)
  at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2173)
  at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2146)
  at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2126)
  at org.springframework.boot.actuate.amqp.RabbitHealthIndicator.getVersion(RabbitHealthIndicator.java:49)
  at org.springframework.boot.actuate.amqp.RabbitHealthIndicator.doHealthCheck(RabbitHealthIndicator.java:44)
  at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82)
  at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37)
  at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:77)
  at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:40)
  at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:130)
  at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateContribution(HealthEndpointSupport.java:141)
  at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:126)
  at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:95)
  at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:66)
  at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:71)
  at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:61)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282)
  at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:74)
  at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60)
  at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:122)
  at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:97)
  at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
  at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
  at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468)
  at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)
  at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)
  at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)
  at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)
  at sun.reflect.GeneratedMethodAccessor212.invoke(Unknown Source)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357)
  at sun.rmi.transport.Transport$1.run(Transport.java:200)
  at sun.rmi.transport.Transport$1.run(Transport.java:197)
  at java.security.AccessController.doPrivileged(Native Method)
  at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
  at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573)
  at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:834)
  at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688)
  at java.security.AccessController.doPrivileged(Native Method)
  at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused: connect
  at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
  at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81)
  at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476)
  at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)
  at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200)
  at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)
  at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)
  at java.net.Socket.connect(Socket.java:606)
  at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
  at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1223)
  at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1173)
  at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connectAddresses(AbstractConnectionFactory.java:640)
  at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:615)
  at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:565)
  ... 50 common frames omitted

2.1 总开关配置

添加spring.rabbitmq.enable配置作为总开关:

spring:
  # 用于接收设备发送的数据
  rabbitmq:
    # rabbitmq 的自定义配置 enable 用于开启或关闭 rabbitmq 服务(false关闭,true开启)
    enable: true
    host: 172.81.205.216
    port: 5672
    username: guest
    password: guest
    mq-name: ZRTZ_QUEUE_EFENCE_DEVICE_OBTAIN_STATUS
    # 确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    # 确认消息已发送到队列
    publisher-returns: true

2.2 关闭自动配置

@EnableRabbit
@SpringBootApplication(exclude = {RabbitAutoConfiguration.class})
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

2.3 根据开关进行配置

/**
 * 用于管理 RabbitAutoConfiguration 是否配置
 */
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.enable", havingValue = "true")
public class RabbitMessageQueueEnableAutoConfig extends RabbitAutoConfiguration {
}

2.4 消息监听与发送

  • 监听开关
@Component
@Slf4j
@Data
public class RabbitMessageQueueReceiverConfig {
    @Value("${spring.rabbitmq.enable}")
    private boolean enable;
    @Bean
    public RabbitMessageQueueReceiver initRabbitMessageQueueReceiver() {
        if (enable) {
            RabbitMessageQueueReceiver rabbitMessageQueueReceiver = new RabbitMessageQueueReceiver();
            log.info("【------已启用------】RabbitMessageQueueReceiver");
            return rabbitMessageQueueReceiver;
        } else {
            log.info("【------不启用------】RabbitMessageQueueReceiver");
            return null;
        }
    }
}
// 监听代码【去掉@Component】
// @Component
@Log4j
public class RabbitMessageQueueSender {
    public RabbitTemplate rabbitTemplate;
    public boolean sendMessage(String exchange, String routingKey, String message) {
        try {
      rabbitTemplate.convertAndSend(exchange, routingKey, message);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
}
  • 消息发送及回调【添加 (required = false) 防止接口被调用出错】
// 消息发送
@Component
@Log4j
public class RabbitMessageQueueSender {
    @Autowired(required = false)
    public RabbitTemplate rabbitTemplate;
    public boolean sendMessage(String exchange, String routingKey, String message) {
        try {
            if (rabbitTemplate != null) {
                rabbitTemplate.convertAndSend(exchange, routingKey, message);
            } else {
                return false;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
}
// 发送回调
@Component
@Slf4j
public class RabbitmqSendCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void run() {
        if (rabbitTemplate != null) {
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setConfirmCallback(this);
        }
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String failCause) {
        if (ack) {
            log.info("消息发送成功");
        } else {
            log.info("消息发送失败,进行容错处理");
        }
        log.info("消息发送到交换机时的回调函数, ack:" + ack + "FailCause 消息:" + failCause);
    }
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息从交换机发送到队列时失败的回调函数, 调用失败!!!" + returned);
    }
}

3.总结

  • 关闭自动配置。
  • 根据自定义的标志进行bean对象装配。
  • 防止未装配导致的报错。
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
479 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
7月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
387 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
7月前
|
人工智能 Java API
Java与大模型集成实战:构建智能Java应用的新范式
随着大型语言模型(LLM)的API化,将其强大的自然语言处理能力集成到现有Java应用中已成为提升应用智能水平的关键路径。本文旨在为Java开发者提供一份实用的集成指南。我们将深入探讨如何使用Spring Boot 3框架,通过HTTP客户端与OpenAI GPT(或兼容API)进行高效、安全的交互。内容涵盖项目依赖配置、异步非阻塞的API调用、请求与响应的结构化处理、异常管理以及一些面向生产环境的最佳实践,并附带完整的代码示例,助您快速将AI能力融入Java生态。
1130 12
|
10月前
|
机器学习/深度学习 数据采集 存储
朴素贝叶斯处理混合数据类型,基于投票与堆叠集成的系统化方法理论基础与实践应用
本文探讨了朴素贝叶斯算法在处理混合数据类型中的应用,通过投票和堆叠集成方法构建分类框架。实验基于电信客户流失数据集,验证了该方法的有效性。文章详细分析了算法的数学理论基础、条件独立性假设及参数估计方法,并针对二元、类别、多项式和高斯分布特征设计专门化流水线。实验结果表明,集成学习显著提升了分类性能,但也存在特征分类自动化程度低和计算开销大的局限性。作者还探讨了特征工程、深度学习等替代方案,为未来研究提供了方向。(239字)
267 5
朴素贝叶斯处理混合数据类型,基于投票与堆叠集成的系统化方法理论基础与实践应用
|
9月前
|
物联网 Linux 开发者
快速部署自己私有MQTT-Broker-下载安装到运行不到一分钟,快速简单且易于集成到自己项目中
本文给物联网开发的朋友推荐的是GMQT,让物联网开发者快速拥有合适自己的MQTT-Broker,本文从下载程序到安装部署手把手教大家安装用上私有化MQTT服务器。
2010 5
|
8月前
|
人工智能 运维 负载均衡
F5发布业界首创集成式应用交付与安全平台,开启ADC 3.0新时代
F5发布业界首创集成式应用交付与安全平台,开启ADC 3.0新时代
322 0
|
8月前
|
人工智能 自然语言处理 分布式计算
AI 驱动传统 Java 应用集成的关键技术与实战应用指南
本文探讨了如何将AI技术与传统Java应用集成,助力企业实现数字化转型。内容涵盖DJL、Deeplearning4j等主流AI框架选择,技术融合方案,模型部署策略,以及智能客服、财务审核、设备诊断等实战应用案例,全面解析Java系统如何通过AI实现智能化升级与效率提升。
639 0
|
11月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
338 32
|
10月前
|
缓存 前端开发 定位技术
通义灵码2.5智能体模式实战———集成高德MCP 10分钟生成周边服务地图应用
通义灵码2.5智能体模式结合高德MCP服务,实现快速构建周边服务地图应用。通过自然语言需求输入,智能体自动分解任务并生成完整代码,涵盖前端界面、API集成与数据处理,10分钟内即可完成传统开发需数小时的工作,大幅提升开发效率。
666 0