RabbitMq 理论知识

简介: 归纳整理相关理论知识


提纲要点

基本概念

特点

  • 可靠性:支持持久化,传输确认,发布确认等保证了MQ的可靠性。
  • 灵活的分发消息策略(路由):这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
  • 支持集群:多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 多种协议:RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
  • 支持多种语言客户端:RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
  • 可视化管理界面:RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
  • 插件机制:RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件

AMQP协议

RabbitMQ 基于AMQP 是一个高级消息队列传输协议( Advanced Message Queuing Protocol )

  • 一套被称作”高级消息队列协议模型(AMQ Model)“的消息能力定义。该模型涵盖了Broker服务中用于路由和存储消息的组件,以及把这些组件连在一起的规则。
  • 一个网络层协议AMQP。能够让客户端程序与实现了AMQ Model的服务端进行通信。

AMQ Model

  • exchange(交换器):从Publisher程序中收取消息,并把这些消息根据一些规则路由到消息队列(Message Queue)中
  • message queue(消息队列):存储消息。直到消息被安全的投递给了消费者。
  • binding :定义了 message queue 和 exchange 之间的关系,提供了消息路由的规则。

RabbitMQ中主要组成部分

  • Broker:消息队列服务进程。此进程包括两个部分:Exchange和Queue。
  • Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列
  • Queue:消息队列,存储消息的队列。
  • Producer:消息生产者。生产方客户端将消息同交换机路由发送到队列中。
  • Consumer:消息消费者。消费队列中存储的消息。

RabbitMQ交换机模型

Direct Exchange

直连交换机:消息和特定的路由键完全匹配

创建消息队列

创建队列绑定交换机

指定路由发送消息

结论:匹配了路由key值发送到队列中

Fanout Exchange  

广播交换机:一 个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息  (发布/订阅模式)

创建广播类型交换机

创建两个队列

绑定路由和key

发送消息

结论:只要绑定到交换机中,就会往指定的交换机中发送消息

Topic Exchange  

主题交换机: 交换机是使用通配符去匹配,路由到对应的队列。通配符有两种:"*" 、 "#"。需要注意的是通配符前面必须要加上"."符号

发送消息代码

@ApiOperation(value = "发送广播消息", notes = "发送广播消息")
    @GetMapping(value = "private/test3")
    public String publishMessage3(
            @RequestParam("name")String name
    ) {
        rabbitTemplate.convertAndSend("MyTest-topic", "my.topic.A",name);
        return "success";
    }

路由KEY匹配my.topic.A和my.topic.* 所以两个队列都能接收到到消息

Headers Exchange

请求头部交换机:它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由

RabbitMq 消息过期时间TTL

RabbitMq 队列中的消息都是由TTL时间,当消息达到一定时间后将会失效,具体时间发送方案:

rabbitTemplate.convertAndSend(RabbitStatistics.GQS_MILEAGE, RabbitStatistics.GQS_MILEAGE_KEY, JSON.toJSONString(dto), message -> {
    MessageProperties messageProperties = message.getMessageProperties();
    // 设置这条消息的过期时间
    messageProperties.setExpiration(String.valueOf(1000 * 24 * 60 * 60));
    return message;
});

多租户模式:虚拟主机和隔离

每一个 RabbitMQ 服务器都能创建虚拟消息服务器,我们称之为虚拟主机(vhost)。每一个 vhost本质上是一个mini版的 RabbitMq 服务器,拥有自己的队列交换器和绑定·.....更重要的是,它拥有自己的权限机制。这使得你能够安全地使用一个 RabbitMq 服务器来服务众多应用程序

队列

  • 死信队列DLX
  • 延迟队列: 消费者订阅的是 死信队列,没有消费者订阅普通队列的话,当消息过期时间到了,就会被路由到死信队列,这就达成了,消息被延迟消费的目的。  
  • 优先级队列
  1. 新建队列设置最大优先级(最大优先级不要太高,会耗费CPU资源)

  1. 发送消息的时候,消息设置优先级,如果没有设置的情况下默认优先级为1
rabbitTemplate.convertAndSend(RabbitStatistics.GQS_MILEAGE, RabbitStatistics.GQS_MILEAGE_KEY, JSON.toJSONString(dto), message -> {
    MessageProperties messageProperties = message.getMessageProperties();
    // 设置这条消息的过期时间
    messageProperties.setExpiration(String.valueOf(1000 * 24 * 60 * 60));
    pro.getMessageProperties().setPriority(5);
    return message;
});

持久化

  • 交换器的持久化在声明交换器是将 durable 参数设置为 true 实现,如果不持久化,RabbitMQ 服务重启之后,相关的交换器元数据会丢失(没有这个交换器了),但是 队列和消息不会丢失(分情况是否设置持久化),只是 不能将消息发送到这个交换器了。
  • 队列的持久化在声明队列时将 durable 参数设置为 true 实现,如果不持久化,RabbitMQ 服务重启之后,相关的 元数据会丢失消息也会丢失
  • 消息的持久化但是队列的持久化,并 不能保证消息数据不丢失,要保证消息不丢失,需要将消息的投递模式设置为 2 (BasicProperties 中的 deliveryMode 属性)

消息删除

过期删除策略:通过设置消息TTL时间,消息到时间自动删除

消息确认删除:通过ack消息消费确认的机制,进行消息删除

事务

springboot 中启用事务管理

@Bean("rabbitTransactionManager")
RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
    return new RabbitTransactionManager(connectionFactory);
}

springboot 中使用事务

@RestController
@RequestMapping("/v1")
@Slf4j
@Api(value = "测试类", description = "测试类")
public class TestController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @ApiOperation(value = "发送广播消息", notes = "发送广播消息")
    @GetMapping(value = "private/test")
    @Transactional(transactionManager = "rabbitTransactionManager",rollbackFor = Exception.class)
    public String run(
            @RequestParam("name")String name
    ) {
        rabbitTemplate.convertAndSend("MyTest-fanout", null,name);
        return "success";
    }
}

消息发送和确认机制

生产者消息发送确认机制

消息发送后需要确认消息是否已经正确的入队列,通过消息确认回调(setConfirmCallback)和路由key回调(setReturnsCallback)

package com.example.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RabbitMqConfig {
    @Bean("rabbitTemplate")
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                return;
            }
        });
        /**
         * 在找不过交换机和routingkey还是会触发
         */
        rabbitTemplate.setReturnsCallback(returned -> {
            log.error("返回消息{}",returned);
            log.error("ReturnCallback:     " + "消息:" + returned.getMessage());
            log.error("ReturnCallback:     " + "回应码:" + returned.getReplyCode());
        });
        return rabbitTemplate;
    }
}

CorrelationData 可以设置ID进行消息的传递

消费者消息确认机制

自动确认模式:

在消费者处理完消息后,自动确认消息已被消费。这种模式下,如果消费者在处理消息时发生异常,那么消息也会被认为已被确认,会从队列中删除,而无法重新消费。因此,不建议使用此模式。

手动确认模式(无需返回确认结果):

消费者在接收到消息后,不会自动确认消息已被消费,而是需要手动调用 channel.basicAck 方法来确认消息已被消费。此模式下,如果消费者在处理消息时发生异常,那么消息不会被确认,也不会从队列中删除,可以重新消费。

手动确认模式(需要返回确认结果):

消费者在接收到消息后,需要手动调用 channel.basicAck 方法来确认消息已被消费,并需要在回调方法中返回确认结果。如果消费者在处理消息时发生异常,那么消息不会被确认,也不会从队列中删除,可以重新消费。

一般建议使用手动确认模式,因为这种模式下可以保证消息的可靠性,并且可以防止消息的丢失。

在 Spring Boot 中,acknowledge-mode 有以下几个配置选项:

AUTO: 自动确认模式。

消息消费者收到消息后,自动发送确认消息,无需手动调用确认方法。此模式是默认的确认模式。

MANUAL: 手动确认模式。

消息消费者接收到消息后,不会自动发送确认消息,需要手动调用确认方法来确认消息已经被消费。如果没有手动确认消息,则消息会一直存在于队列中,不会被删除。

NONE: 不确认模式。

消息消费者接收到消息后,不会自动发送确认消息,并且也不会手动确认消息。此模式会造成消息被重复消费,因此一般不建议使用。

配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: MANUAL(auto,none)

在监听器方法中,需要手动调用 Channel 对象的 basicAck 方法来确认消息。例如:

@RabbitListener(queues = "myQueue")
public void handleMessage(Message message, Channel channel) throws IOException {
    // 处理消息
    // 手动确认消息
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

通过绑定队列,交换机,key的方式监听消息,下面这种方式会自动创建路由值进行绑定

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "topic-2", durable = "true"),
            exchange = @Exchange(value = "MyTest-topic",type = "topic"),
            key = "my.topic.B"))
    public void receiveMessage2(Message message, Channel channel) throws IOException {
        // 处理消息
        try {
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            // 消息处理逻辑
            System.out.println("Received message: " + messageBody);
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 消息处理异常,手动拒绝消息并重新入队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }

消费者消费模式

公平模式

yml配置

spring:
  rabbitmq:
    host: 10.123.35.161
    port: 5672
    username: admin
    password: admin123
    virtual-host: /vmsdms
    #消息确认配置项
    #确认消息已发送到交换机(Exchange)
    publisher-confirms: true
    #确认消息已发送到队列(Queue)
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual #消费手动确认
        prefetch: 1  #每次从一次性从broker里面取的待消费的消息的个数
        retry:
          enabled: true   # 允许消息消费失败的重试
          max-attempts: 3   # 消息最多消费次数3次
          initial-interval: 2000    # 消息多次消费的间隔2秒

关键信息: prefetch 获取消费的个数设为1,2,3,4;表示每次获取的数量,只要设置了这个数据,会变成公平模式

消费者代码

package com.example.demo.handler;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
/**
 * @author jiangmb
 * @version 1.0.0
 * @date 2023-03-23 16:53
 */
@Component
public class CustomerModeHandler {
    @RabbitListener(queues = {"topic-4"})
    public void receiveMessage1(Message message, Channel channel) throws IOException {
        // 处理消息
        try {
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            // 消息处理逻辑
            System.out.println(LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"消费者1 Received message: " + messageBody);
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 消息处理异常,手动拒绝消息并重新入队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
    @RabbitListener(queues = {"topic-4"})
    public void receiveMessage2(Message message, Channel channel) throws IOException {
        // 处理消息
        try {
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            // 消息处理逻辑
            System.out.println(LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"消费者2 Received message: " + messageBody);
            TimeUnit.SECONDS.sleep(10);
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 消息处理异常,手动拒绝消息并重新入队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
}

消费结果

结论:消费公平模式下,消费性能比较高的机器能够负载更高的并发量

轮询模式

yml配置

spring:
  rabbitmq:
    host: 10.123.35.161
    port: 5672
    username: admin
    password: admin123
    virtual-host: /vmsdms
    #消息确认配置项
    #确认消息已发送到交换机(Exchange)
    publisher-confirms: true
    #确认消息已发送到队列(Queue)
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual #消费手动确认
        # prefetch: 1  #每次从一次性从broker里面取的待消费的消息的个数
        retry:
          enabled: true   # 允许消息消费失败的重试
          max-attempts: 3   # 消息最多消费次数3次
          initial-interval: 2000    # 消息多次消费的间隔2秒

关键信息: prefetch 不设置的数量那么就是轮询模式

消费者代码

package com.example.demo.handler;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
/**
 * @author jiangmb
 * @version 1.0.0
 * @date 2023-03-23 16:53
 */
@Component
public class CustomerModeHandler {
    @RabbitListener(queues = {"topic-4"})
    public void receiveMessage1(Message message, Channel channel) throws IOException {
        // 处理消息
        try {
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            // 消息处理逻辑
            System.out.println(LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"消费者1 Received message: " + messageBody);
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 消息处理异常,手动拒绝消息并重新入队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
    @RabbitListener(queues = {"topic-4"})
    public void receiveMessage2(Message message, Channel channel) throws IOException {
        // 处理消息
        try {
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            // 消息处理逻辑
            System.out.println(LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"消费者2 Received message: " + messageBody);
            TimeUnit.SECONDS.sleep(10);
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 消息处理异常,手动拒绝消息并重新入队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
}

消费结果

附加

springboot rabbitmq 配置和解析

spring:
  rabbitmq:
    host: localhost                     # RabbitMQ服务的主机名或IP地址
    port: 5672                          # RabbitMQ服务的端口号,默认为5672
    username: guest                     # 连接RabbitMQ服务的用户名,默认为guest
    password: guest                     # 连接RabbitMQ服务的密码,默认为guest
    virtual-host: /                     # 虚拟主机名称,默认为/
    connection-timeout: 60000           # 连接超时时间(毫秒),默认为60000
    requested-heartbeat: 60             # 请求心跳时间(秒),默认为60
    publisher-confirms: false           # 是否启用生产者确认,默认为false
    publisher-returns: false            # 是否启用生产者返回,默认为false
    template:
      receive-timeout: 60000             # 接收消息的超时时间(毫秒),默认为60000
      reply-timeout: 5000               # 请求回复的超时时间(毫秒),默认为5000
      retry:
        enabled: false                   # 是否启用重试,默认为false
        initial-interval: 1000           # 初始重试间隔时间(毫秒),默认为1000
        max-interval: 10000              # 最大重试间隔时间(毫秒),默认为10000
        multiplier: 1.0                  # 重试时间乘数,默认为1.0
        max-attempts: 3                  # 最大重试次数,默认为3
        stateless: false                 # 是否启用无状态重试,默认为false
        backoff:
          type: exponential              # 退避类型,默认为exponential
          multiplier: 2.0                # 退避时间乘数,默认为2.0
          max-interval: 30000            # 最大退避时间(毫秒),默认为30000
          initial-interval: 1000         # 初始退避时间(毫秒),默认为1000
          max-attempts: 3                # 最大重试次数,默认为3
      mandatory: false                   # 是否启用强制标志,默认为false
      receive-timeout: 60000             # 接收消息的超时时间(毫秒),默认为60000
      reply-timeout: 5000               # 请求回复的超时时间(毫秒),默认为5000
      exchange:                           # 指定Exchange的名称和类型
        name: myExchange
        type: topic
      routing-key: myRoutingKey          # 指定Routing Key
      default-receive-queue: myQueue      # 默认接收队列的名称
      message-converter: json             # 指定消息转换器
    listener:
      direct:
        auto-startup: true                # 是否自动启动,默认为true
        acknowledge-mode: auto            # 确认模式,默认为auto
        concurrency: 1                    # 消费者线程数,默认为1
        max-concurrency: 10               # 最大消费者线程数,默认为10
        prefetch: 1                        # 每个消费者一次
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 缓存 NoSQL
RabbitMQ 总结面试
RabbitMQ 总结面试
51 0
|
消息中间件 SQL 存储
超详细的RabbitMQ入门,看这篇就够了!
RabbitMQ入门,看这篇就够了
197981 63
|
5月前
|
消息中间件 存储 Kafka
01.RabbitMQ入门
01.RabbitMQ入门
56 0
|
6月前
|
消息中间件 Java Kafka
【RabbitMQ】RabbitMQ快速入门 通俗易懂 初学者入门
【RabbitMQ】RabbitMQ快速入门 通俗易懂 初学者入门
138 0
|
6月前
|
消息中间件 存储 缓存
【RabbitMQ教程】第二章 —— RabbitMQ - 简单案例(下)
【RabbitMQ教程】第二章 —— RabbitMQ - 简单案例(下)
|
6月前
|
消息中间件 Java
【RabbitMQ教程】第二章 —— RabbitMQ - 简单案例(上)
【RabbitMQ教程】第二章 —— RabbitMQ - 简单案例
|
消息中间件 存储 监控
RabbitMQ:从入门到实践
1. RabbitMQ简介 RabbitMQ是一款开源的、基于AMQP协议的消息队列系统,用于构建可扩展、高性能、松耦合的分布式系统。RabbitMQ具有以下特点: 支持多种语言和平台:Java、Python、Ruby、.NET等 提供丰富的交换器类型和路由策略:直接、广播、主题和头 支持消息持久化和高可用性:保证消息不丢失,服务可用性 提供管理界面和监控插件:方便管理和监控RabbitMQ服务器 社区活跃,文档丰富:易于学习和使用
4784 2
RabbitMQ:从入门到实践
|
消息中间件 存储 网络协议
RabbitMQ基础概念
RabbitMQ基础概念
74 0
|
消息中间件 存储 缓存
RabbitMQ 实战教程(一)
RabbitMQ 实战教程(一)
217 0
RabbitMQ 实战教程(一)
|
消息中间件 网络协议 Java
SpringCloudStream学习(一)RabbitMQ基础
SpringCloudStream学习(一)RabbitMQ基础
150 0
SpringCloudStream学习(一)RabbitMQ基础