RabbitMq 理论知识

简介: 归纳整理相关理论知识


提纲要点

基本概念

特点

  • 可靠性:支持持久化,传输确认,发布确认等保证了MQ的可靠性。
  • 灵活的分发消息策略(路由):这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
  • 支持集群:多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 多种协议:RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
  • 支持多种语言客户端:RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
  • 可视化管理界面:RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
  • 插件机制:RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件

AMQP协议

RabbitMQ 基于AMQP 是一个高级消息队列传输协议( Advanced Message Queuing Protocol )

  • 一套被称作”高级消息队列协议模型(AMQ Model)“的消息能力定义。该模型涵盖了Broker服务中用于路由和存储消息的组件,以及把这些组件连在一起的规则。
  • 一个网络层协议AMQP。能够让客户端程序与实现了AMQ Model的服务端进行通信。

AMQ Model

  • exchange(交换器):从Publisher程序中收取消息,并把这些消息根据一些规则路由到消息队列(Message Queue)中
  • message queue(消息队列):存储消息。直到消息被安全的投递给了消费者。
  • binding :定义了 message queue 和 exchange 之间的关系,提供了消息路由的规则。

RabbitMQ中主要组成部分

  • Broker:消息队列服务进程。此进程包括两个部分:Exchange和Queue。
  • Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列
  • Queue:消息队列,存储消息的队列。
  • Producer:消息生产者。生产方客户端将消息同交换机路由发送到队列中。
  • Consumer:消息消费者。消费队列中存储的消息。

RabbitMQ交换机模型

Direct Exchange

直连交换机:消息和特定的路由键完全匹配

创建消息队列

创建队列绑定交换机

指定路由发送消息

结论:匹配了路由key值发送到队列中

Fanout Exchange  

广播交换机:一 个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息  (发布/订阅模式)

创建广播类型交换机

创建两个队列

绑定路由和key

发送消息

结论:只要绑定到交换机中,就会往指定的交换机中发送消息

Topic Exchange  

主题交换机: 交换机是使用通配符去匹配,路由到对应的队列。通配符有两种:"*" 、 "#"。需要注意的是通配符前面必须要加上"."符号

发送消息代码

@ApiOperation(value = "发送广播消息", notes = "发送广播消息")
    @GetMapping(value = "private/test3")
    public String publishMessage3(
            @RequestParam("name")String name
    ) {
        rabbitTemplate.convertAndSend("MyTest-topic", "my.topic.A",name);
        return "success";
    }

路由KEY匹配my.topic.A和my.topic.* 所以两个队列都能接收到到消息

Headers Exchange

请求头部交换机:它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由

RabbitMq 消息过期时间TTL

RabbitMq 队列中的消息都是由TTL时间,当消息达到一定时间后将会失效,具体时间发送方案:

rabbitTemplate.convertAndSend(RabbitStatistics.GQS_MILEAGE, RabbitStatistics.GQS_MILEAGE_KEY, JSON.toJSONString(dto), message -> {
    MessageProperties messageProperties = message.getMessageProperties();
    // 设置这条消息的过期时间
    messageProperties.setExpiration(String.valueOf(1000 * 24 * 60 * 60));
    return message;
});

多租户模式:虚拟主机和隔离

每一个 RabbitMQ 服务器都能创建虚拟消息服务器,我们称之为虚拟主机(vhost)。每一个 vhost本质上是一个mini版的 RabbitMq 服务器,拥有自己的队列交换器和绑定·.....更重要的是,它拥有自己的权限机制。这使得你能够安全地使用一个 RabbitMq 服务器来服务众多应用程序

队列

  • 死信队列DLX
  • 延迟队列: 消费者订阅的是 死信队列,没有消费者订阅普通队列的话,当消息过期时间到了,就会被路由到死信队列,这就达成了,消息被延迟消费的目的。  
  • 优先级队列
  1. 新建队列设置最大优先级(最大优先级不要太高,会耗费CPU资源)

  1. 发送消息的时候,消息设置优先级,如果没有设置的情况下默认优先级为1
rabbitTemplate.convertAndSend(RabbitStatistics.GQS_MILEAGE, RabbitStatistics.GQS_MILEAGE_KEY, JSON.toJSONString(dto), message -> {
    MessageProperties messageProperties = message.getMessageProperties();
    // 设置这条消息的过期时间
    messageProperties.setExpiration(String.valueOf(1000 * 24 * 60 * 60));
    pro.getMessageProperties().setPriority(5);
    return message;
});

持久化

  • 交换器的持久化在声明交换器是将 durable 参数设置为 true 实现,如果不持久化,RabbitMQ 服务重启之后,相关的交换器元数据会丢失(没有这个交换器了),但是 队列和消息不会丢失(分情况是否设置持久化),只是 不能将消息发送到这个交换器了。
  • 队列的持久化在声明队列时将 durable 参数设置为 true 实现,如果不持久化,RabbitMQ 服务重启之后,相关的 元数据会丢失消息也会丢失
  • 消息的持久化但是队列的持久化,并 不能保证消息数据不丢失,要保证消息不丢失,需要将消息的投递模式设置为 2 (BasicProperties 中的 deliveryMode 属性)

消息删除

过期删除策略:通过设置消息TTL时间,消息到时间自动删除

消息确认删除:通过ack消息消费确认的机制,进行消息删除

事务

springboot 中启用事务管理

@Bean("rabbitTransactionManager")
RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
    return new RabbitTransactionManager(connectionFactory);
}

springboot 中使用事务

@RestController
@RequestMapping("/v1")
@Slf4j
@Api(value = "测试类", description = "测试类")
public class TestController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @ApiOperation(value = "发送广播消息", notes = "发送广播消息")
    @GetMapping(value = "private/test")
    @Transactional(transactionManager = "rabbitTransactionManager",rollbackFor = Exception.class)
    public String run(
            @RequestParam("name")String name
    ) {
        rabbitTemplate.convertAndSend("MyTest-fanout", null,name);
        return "success";
    }
}

消息发送和确认机制

生产者消息发送确认机制

消息发送后需要确认消息是否已经正确的入队列,通过消息确认回调(setConfirmCallback)和路由key回调(setReturnsCallback)

package com.example.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RabbitMqConfig {
    @Bean("rabbitTemplate")
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                return;
            }
        });
        /**
         * 在找不过交换机和routingkey还是会触发
         */
        rabbitTemplate.setReturnsCallback(returned -> {
            log.error("返回消息{}",returned);
            log.error("ReturnCallback:     " + "消息:" + returned.getMessage());
            log.error("ReturnCallback:     " + "回应码:" + returned.getReplyCode());
        });
        return rabbitTemplate;
    }
}

CorrelationData 可以设置ID进行消息的传递

消费者消息确认机制

自动确认模式:

在消费者处理完消息后,自动确认消息已被消费。这种模式下,如果消费者在处理消息时发生异常,那么消息也会被认为已被确认,会从队列中删除,而无法重新消费。因此,不建议使用此模式。

手动确认模式(无需返回确认结果):

消费者在接收到消息后,不会自动确认消息已被消费,而是需要手动调用 channel.basicAck 方法来确认消息已被消费。此模式下,如果消费者在处理消息时发生异常,那么消息不会被确认,也不会从队列中删除,可以重新消费。

手动确认模式(需要返回确认结果):

消费者在接收到消息后,需要手动调用 channel.basicAck 方法来确认消息已被消费,并需要在回调方法中返回确认结果。如果消费者在处理消息时发生异常,那么消息不会被确认,也不会从队列中删除,可以重新消费。

一般建议使用手动确认模式,因为这种模式下可以保证消息的可靠性,并且可以防止消息的丢失。

在 Spring Boot 中,acknowledge-mode 有以下几个配置选项:

AUTO: 自动确认模式。

消息消费者收到消息后,自动发送确认消息,无需手动调用确认方法。此模式是默认的确认模式。

MANUAL: 手动确认模式。

消息消费者接收到消息后,不会自动发送确认消息,需要手动调用确认方法来确认消息已经被消费。如果没有手动确认消息,则消息会一直存在于队列中,不会被删除。

NONE: 不确认模式。

消息消费者接收到消息后,不会自动发送确认消息,并且也不会手动确认消息。此模式会造成消息被重复消费,因此一般不建议使用。

配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: MANUAL(auto,none)

在监听器方法中,需要手动调用 Channel 对象的 basicAck 方法来确认消息。例如:

@RabbitListener(queues = "myQueue")
public void handleMessage(Message message, Channel channel) throws IOException {
    // 处理消息
    // 手动确认消息
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

通过绑定队列,交换机,key的方式监听消息,下面这种方式会自动创建路由值进行绑定

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "topic-2", durable = "true"),
            exchange = @Exchange(value = "MyTest-topic",type = "topic"),
            key = "my.topic.B"))
    public void receiveMessage2(Message message, Channel channel) throws IOException {
        // 处理消息
        try {
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            // 消息处理逻辑
            System.out.println("Received message: " + messageBody);
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 消息处理异常,手动拒绝消息并重新入队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }

消费者消费模式

公平模式

yml配置

spring:
  rabbitmq:
    host: 10.123.35.161
    port: 5672
    username: admin
    password: admin123
    virtual-host: /vmsdms
    #消息确认配置项
    #确认消息已发送到交换机(Exchange)
    publisher-confirms: true
    #确认消息已发送到队列(Queue)
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual #消费手动确认
        prefetch: 1  #每次从一次性从broker里面取的待消费的消息的个数
        retry:
          enabled: true   # 允许消息消费失败的重试
          max-attempts: 3   # 消息最多消费次数3次
          initial-interval: 2000    # 消息多次消费的间隔2秒

关键信息: prefetch 获取消费的个数设为1,2,3,4;表示每次获取的数量,只要设置了这个数据,会变成公平模式

消费者代码

package com.example.demo.handler;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
/**
 * @author jiangmb
 * @version 1.0.0
 * @date 2023-03-23 16:53
 */
@Component
public class CustomerModeHandler {
    @RabbitListener(queues = {"topic-4"})
    public void receiveMessage1(Message message, Channel channel) throws IOException {
        // 处理消息
        try {
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            // 消息处理逻辑
            System.out.println(LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"消费者1 Received message: " + messageBody);
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 消息处理异常,手动拒绝消息并重新入队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
    @RabbitListener(queues = {"topic-4"})
    public void receiveMessage2(Message message, Channel channel) throws IOException {
        // 处理消息
        try {
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            // 消息处理逻辑
            System.out.println(LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"消费者2 Received message: " + messageBody);
            TimeUnit.SECONDS.sleep(10);
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 消息处理异常,手动拒绝消息并重新入队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
}

消费结果

结论:消费公平模式下,消费性能比较高的机器能够负载更高的并发量

轮询模式

yml配置

spring:
  rabbitmq:
    host: 10.123.35.161
    port: 5672
    username: admin
    password: admin123
    virtual-host: /vmsdms
    #消息确认配置项
    #确认消息已发送到交换机(Exchange)
    publisher-confirms: true
    #确认消息已发送到队列(Queue)
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual #消费手动确认
        # prefetch: 1  #每次从一次性从broker里面取的待消费的消息的个数
        retry:
          enabled: true   # 允许消息消费失败的重试
          max-attempts: 3   # 消息最多消费次数3次
          initial-interval: 2000    # 消息多次消费的间隔2秒

关键信息: prefetch 不设置的数量那么就是轮询模式

消费者代码

package com.example.demo.handler;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
/**
 * @author jiangmb
 * @version 1.0.0
 * @date 2023-03-23 16:53
 */
@Component
public class CustomerModeHandler {
    @RabbitListener(queues = {"topic-4"})
    public void receiveMessage1(Message message, Channel channel) throws IOException {
        // 处理消息
        try {
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            // 消息处理逻辑
            System.out.println(LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"消费者1 Received message: " + messageBody);
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 消息处理异常,手动拒绝消息并重新入队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
    @RabbitListener(queues = {"topic-4"})
    public void receiveMessage2(Message message, Channel channel) throws IOException {
        // 处理消息
        try {
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            // 消息处理逻辑
            System.out.println(LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"消费者2 Received message: " + messageBody);
            TimeUnit.SECONDS.sleep(10);
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 消息处理异常,手动拒绝消息并重新入队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
}

消费结果

附加

springboot rabbitmq 配置和解析

spring:
  rabbitmq:
    host: localhost                     # RabbitMQ服务的主机名或IP地址
    port: 5672                          # RabbitMQ服务的端口号,默认为5672
    username: guest                     # 连接RabbitMQ服务的用户名,默认为guest
    password: guest                     # 连接RabbitMQ服务的密码,默认为guest
    virtual-host: /                     # 虚拟主机名称,默认为/
    connection-timeout: 60000           # 连接超时时间(毫秒),默认为60000
    requested-heartbeat: 60             # 请求心跳时间(秒),默认为60
    publisher-confirms: false           # 是否启用生产者确认,默认为false
    publisher-returns: false            # 是否启用生产者返回,默认为false
    template:
      receive-timeout: 60000             # 接收消息的超时时间(毫秒),默认为60000
      reply-timeout: 5000               # 请求回复的超时时间(毫秒),默认为5000
      retry:
        enabled: false                   # 是否启用重试,默认为false
        initial-interval: 1000           # 初始重试间隔时间(毫秒),默认为1000
        max-interval: 10000              # 最大重试间隔时间(毫秒),默认为10000
        multiplier: 1.0                  # 重试时间乘数,默认为1.0
        max-attempts: 3                  # 最大重试次数,默认为3
        stateless: false                 # 是否启用无状态重试,默认为false
        backoff:
          type: exponential              # 退避类型,默认为exponential
          multiplier: 2.0                # 退避时间乘数,默认为2.0
          max-interval: 30000            # 最大退避时间(毫秒),默认为30000
          initial-interval: 1000         # 初始退避时间(毫秒),默认为1000
          max-attempts: 3                # 最大重试次数,默认为3
      mandatory: false                   # 是否启用强制标志,默认为false
      receive-timeout: 60000             # 接收消息的超时时间(毫秒),默认为60000
      reply-timeout: 5000               # 请求回复的超时时间(毫秒),默认为5000
      exchange:                           # 指定Exchange的名称和类型
        name: myExchange
        type: topic
      routing-key: myRoutingKey          # 指定Routing Key
      default-receive-queue: myQueue      # 默认接收队列的名称
      message-converter: json             # 指定消息转换器
    listener:
      direct:
        auto-startup: true                # 是否自动启动,默认为true
        acknowledge-mode: auto            # 确认模式,默认为auto
        concurrency: 1                    # 消费者线程数,默认为1
        max-concurrency: 10               # 最大消费者线程数,默认为10
        prefetch: 1                        # 每个消费者一次
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 SQL 存储
超详细的RabbitMQ入门,看这篇就够了!
RabbitMQ入门,看这篇就够了
201992 64
|
8月前
|
消息中间件 Java Kafka
【RabbitMQ】RabbitMQ快速入门 通俗易懂 初学者入门
【RabbitMQ】RabbitMQ快速入门 通俗易懂 初学者入门
165 0
|
8月前
|
消息中间件 存储 JSON
从兔子说起:深入理解RabbitMQ基础概念【RabbitMQ 一】
从兔子说起:深入理解RabbitMQ基础概念【RabbitMQ 一】
85 0
|
8月前
|
消息中间件 存储 缓存
【RabbitMQ教程】第二章 —— RabbitMQ - 简单案例(下)
【RabbitMQ教程】第二章 —— RabbitMQ - 简单案例(下)
|
8月前
|
消息中间件 Java
【RabbitMQ教程】第二章 —— RabbitMQ - 简单案例(上)
【RabbitMQ教程】第二章 —— RabbitMQ - 简单案例
|
8月前
|
消息中间件 中间件
【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认
【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认
|
消息中间件 存储 监控
RabbitMQ:从入门到实践
1. RabbitMQ简介 RabbitMQ是一款开源的、基于AMQP协议的消息队列系统,用于构建可扩展、高性能、松耦合的分布式系统。RabbitMQ具有以下特点: 支持多种语言和平台:Java、Python、Ruby、.NET等 提供丰富的交换器类型和路由策略:直接、广播、主题和头 支持消息持久化和高可用性:保证消息不丢失,服务可用性 提供管理界面和监控插件:方便管理和监控RabbitMQ服务器 社区活跃,文档丰富:易于学习和使用
4804 2
RabbitMQ:从入门到实践
|
消息中间件 存储 网络协议
RabbitMQ基础概念
RabbitMQ基础概念
84 0
|
消息中间件 JSON Java
RabbitMQ入门篇
来到docker官网找一个带management的版本,https://hub.docker.com/_/rabbitmq 然后下载、运行
93 0
|
消息中间件 负载均衡 安全
RabbitMQ设计原理解析
RabbitMQ现在用的也比较多,但是没有过去那么多啦。现在很多的流行或者常用技术或者思路都是从过去的思路中演变而来的。了解一些过去的技术,对有些人来说可能会产生众里寻他千百度的顿悟,加深对技术的理解,更好的应用于工作中去。
148 0