RabbitMQ Tutorial by Java(2)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RabbitMQ Tutorial by Java

RabbitMQ Tutorial by Java(1)https://developer.aliyun.com/article/1517444


Envelope元数据

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
 
package com.rabbitmq.client;
 
public class Envelope {
    private final long _deliveryTag;
    private final boolean _redeliver;
    private final String _exchange;
    private final String _routingKey;
 
    public Envelope(long deliveryTag, boolean redeliver, String exchange, String routingKey) {
        this._deliveryTag = deliveryTag;
        this._redeliver = redeliver;
        this._exchange = exchange;
        this._routingKey = routingKey;
    }
 
    public long getDeliveryTag() {
        return this._deliveryTag;
    }
 
    public boolean isRedeliver() {
        return this._redeliver;
    }
 
    public String getExchange() {
        return this._exchange;
    }
 
    public String getRoutingKey() {
        return this._routingKey;
    }
 
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("Envelope(deliveryTag=").append(this._deliveryTag);
        sb.append(", redeliver=").append(this._redeliver);
        sb.append(", exchange=").append(this._exchange);
        sb.append(", routingKey=").append(this._routingKey);
        sb.append(")");
        return sb.toString();
    }
}

Envelope 类是 RabbitMQ Java 客户端库中的一个类,它用于封装从 RabbitMQ 服务器接收到的消息的元数据。这个类包含了关于消息的一些重要信息,比如投递标签(deliveryTag)、是否重新投递(redeliver)、交换机名称(exchange)和路由键(routingKey)。

下面是 Envelope 类中每个字段和方法的详细解释:

字段:

  1. _deliveryTag
  • 类型:long
  • 描述:这是 RabbitMQ 为每条消息分配的唯一标识符。当消费者处理完消息后,需要使用此标签来确认(ack)或拒绝(nack)消息。
  1. _redeliver
  • 类型:boolean
  • 描述:这个字段表示消息是否被重新投递。如果消息之前被投递过但因为某些原因(例如消费者未正确确认)而被 RabbitMQ 重新放入队列,这个字段就会是 true
  1. _exchange
  • 类型:String
  • 描述:这个字段表示消息最初被发送到的交换机名称。交换机是 RabbitMQ 中用于路由消息的关键组件。
  1. _routingKey
  • 类型:String
  • 描述:这个字段表示消息在发送时使用的路由键。路由键用于确定消息应该被路由到哪个队列。

方法:

  1. getDeliveryTag()
  • 返回值:long
  • 描述:这个方法返回消息的投递标签。
  1. isRedeliver()
  • 返回值:boolean
  • 描述:这个方法返回一个布尔值,表示消息是否被重新投递。
  1. getExchange()
  • 返回值:String
  • 描述:这个方法返回消息最初被发送到的交换机名称。
  1. getRoutingKey()
  • 返回值:String
  • 描述:这个方法返回消息在发送时使用的路由键。
  1. toString()
  • 返回值:String
  • 描述:这个方法覆盖了 Object 类中的 toString 方法,用于返回 Envelope 对象的字符串表示形式,方便调试和日志记录。

使用场景:

当消费者从 RabbitMQ 接收消息时,每条消息都会附带一个 Envelope 对象。消费者可以使用 Envelope 对象中的方法来获取消息的元数据,并根据这些信息来决定如何处理消息。例如,消费者可以使用 getDeliveryTag 方法获取投递标签,以便在处理完消息后发送确认。

持久化

       我们上面已经了解如何保证任务不会丢失, 即使消费者连接丢失. 但是我们的任务依然会有丢失的风险, 例如RabbitMQ服务器崩掉.

       当RabbitMQ服务器退出或者崩溃的时候, 他将会清除队列和消息,  除非你指定它不清除. 我们需要做两件事情, 来保证即使是服务器崩溃也不会丢失数据.

       首先我们需要确保队列会在RabbitMQ节点重启之后存活, 要想做到这样, 就需要声明这个队列为持久化模式

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

       但是前面我们讲到, 我们应该避免对一个已经存在的队列重新定义, 因为他不会生效, RabbitMQ是不允许使用不同的参数(durable, autoDelete,exclusive等)重新定义一个已经存在的queue的. 即使这个语句本身是正确的. 如果你这样做将会返回一个错误信息.

       你可以声明一个不同名称的queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

       设置了以上的信息之后, 就可以保证此时这个队列将不会在RabbitMQ重启的时候丢失了, 但是这并不意味着RabbitMQ重启之后, 消息不会丢失, 因为你仅仅只是持久化了queue, 而不是消息, 现在我们需要将我们的消息同时也标记为持久化模式.

        如何将消息体设置为durable? 我们思考一下, 首先消息是从producer那边publish过来的, 那么我们可不可以从basicPublish这个方法中找线索?? 还真被你找到了, 如下:

basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)

        我们在推送消息的时候, 可以设置一个属性AMQP.BasicProperties props, 这个属性定义如下:

public static class BasicProperties extends AMQBasicProperties {
        private String contentType;
        private String contentEncoding;
        private Map<String, Object> headers;
        private Integer deliveryMode;
        private Integer priority;
        private String correlationId;
        private String replyTo;
        private String expiration;
        private String messageId;
        private Date timestamp;
        private String type;
        private String userId;
        private String appId;
        private String clusterId;
        
        // 方法体 ... 省略
 
}

这个BasicProperties类继承自AMQBasicProperties,它扩展了AMQP协议中消息属性的基础定义。AMQP(高级消息队列协议)是一个开放、可靠、面向消息的中间件协议,它支持多种消息传递模式,包括发布/订阅、点对点、请求/响应等。下面是对该类中一些属性和方法的基本解释:

属性:

  1. contentType
  • 用途:表示消息体的MIME类型,例如text/plainapplication/json。这有助于接收方知道如何解析消息内容。
  1. contentEncoding
  • 用途:表示消息内容使用的字符编码,如UTF-8
  1. headers
  • 用途:一个自定义的键值对集合,允许发送方和接收方传递额外的信息。
  1. deliveryMode
  • 用途:定义消息的持久性。通常有两个值:1表示非持久(消息不存储在服务器上),2表示持久(消息存储在服务器上,直到被消费)。
  1. priority
  • 用途:消息的优先级,用于在多个消息等待消费时决定先处理哪个消息。
  1. correlationId
  • 用途:用于将回复与请求关联起来,通常用于RPC(远程过程调用)模式。
  1. replyTo
  • 用途:用于指定一个队列名,用于接收对这条消息的回复。这在RPC场景中特别有用。
  1. expiration
  • 用途:定义消息的生存时间(TTL,Time-To-Live)。如果在这段时间内消息没有被消费,它将被丢弃。
  1. messageId
  • 用途:为消息提供一个全局唯一的标识符。
  1. timestamp
  • 用途:表示消息创建或发送的时间。
  1. type
  • 用途:表示消息的类型或名称,用于在多个不同类型的消息中进行区分。
  1. userId
  • 用途:创建或发送消息的用户ID。
  1. appId
  • 用途:标识创建消息的应用程序的名称。
  1. clusterId
  • 用途:表示消息来自的RabbitMQ集群的ID。

方法:

通常,该类还会包含一些用于获取和设置这些属性的getter和setter方法,以及可能的其他方法用于序列化、反序列化或比较属性等。具体的方法实现取决于这个类的完整源代码。

使用场景:

这些属性通常用于确保消息的正确路由、处理和持久化。例如,发送方可能会设置replyTocorrelationId以接收RPC回复;或者设置priority来确保某些关键消息优先被处理。接收方则会使用这些属性来正确地处理或路由消息。

       其中有一个deliveryMode, 这个表示消息的持久化 .

       所以我们第一个想到的就是通过构建一个BasicProperties对象,然后设置里面的属性,然后传入给basicPublish, 如下:

import com.rabbitmq.client.AMQP.BasicProperties;  
import com.rabbitmq.client.Channel;  
import java.nio.charset.StandardCharsets;  
import java.util.HashMap;  
import java.util.Map;  
  
// ... 其他代码 ...  
  
// 创建消息属性  
Map<String, Object> headers = new HashMap<>();  
headers.put("custom-header", "some-value");  
  
BasicProperties properties = new BasicProperties.Builder()  
    .contentType("text/plain")  
    .contentEncoding("UTF-8")  
    .headers(headers)  
    .deliveryMode(2) // 设置为持久化消息  
    .priority(1)  
    .correlationId("my-correlation-id")  
    .replyTo("my-reply-queue")  
    .expiration("60000") // 消息将在60秒后过期  
    .messageId("my-message-id")  
    .timestamp(new java.util.Date())  
    .type("my-message-type")  
    .userId("my-user-id")  
    .appId("my-app-id")  
    .clusterId("my-cluster-id")  
    .build();  
  
// 获取RabbitMQ的Channel  
Channel channel = connection.createChannel();  
  
// 发布消息到指定的交换机和路由键,并带上属性  
String exchange = "my-exchange";  
String routingKey = "my.routing.key";  
String messageBody = "Hello, RabbitMQ!";  
channel.basicPublish(exchange, routingKey, properties, messageBody.getBytes(StandardCharsets.UTF_8));  
  
// ... 其他代码 ...

      除此之外, 官方还提供了第二种方法, 你可以不用build一个BasicProperties,而是直接使用封装好的AMQP.BasicProperties实例对象MessageProperties来直接传入:

import com.rabbitmq.client.MessageProperties;
 
channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

       下面是MessageProperties的原码:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
 
package com.rabbitmq.client;
 
import java.util.Date;
import java.util.Map;
 
public class MessageProperties {
    public static final AMQP.BasicProperties MINIMAL_BASIC = new AMQP.BasicProperties((String)null, (String)null, (Map)null, (Integer)null, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
    public static final AMQP.BasicProperties MINIMAL_PERSISTENT_BASIC = new AMQP.BasicProperties((String)null, (String)null, (Map)null, 2, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
    public static final AMQP.BasicProperties BASIC = new AMQP.BasicProperties("application/octet-stream", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
    public static final AMQP.BasicProperties PERSISTENT_BASIC = new AMQP.BasicProperties("application/octet-stream", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
    public static final AMQP.BasicProperties TEXT_PLAIN = new AMQP.BasicProperties("text/plain", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
    public static final AMQP.BasicProperties PERSISTENT_TEXT_PLAIN = new AMQP.BasicProperties("text/plain", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
 
    public MessageProperties() {
    }
}

       此处的消息持久化为最后一个PERSISTENT_TEXT_PLAIN, 使用的构造方法为:

public BasicProperties(String contentType, String contentEncoding, Map<String, Object> headers, Integer deliveryMode, Integer priority, String correlationId, String replyTo, String expiration, String messageId, Date timestamp, String type, String userId, String appId, String clusterId) {
            this.contentType = contentType;
            this.contentEncoding = contentEncoding;
            this.headers = headers == null ? null : Collections.unmodifiableMap(new HashMap(headers));
            this.deliveryMode = deliveryMode;
            this.priority = priority;
            this.correlationId = correlationId;
            this.replyTo = replyTo;
            this.expiration = expiration;
            this.messageId = messageId;
            this.timestamp = timestamp;
            this.type = type;
            this.userId = userId;
            this.appId = appId;
            this.clusterId = clusterId;
        }

       关于消息持久化的说明:

       将一个消息设置为持久化, 并不能完全保证消息不会丢失. 尽管它告诉RabbitMQ将message保存在硬硬盘, 当RabbitMQ已经接收一个消息, 但是还没有被保存的时候, 仍然会有一段很短的时间窗口, 这段时间窗口如果发生事故, 也可能导致消息丢失.

       如果你想保证message的durable的正确性和有效性, 你可以参考Publish and confirm模式 : Consumer Acknowledgements and Publisher Confirms | RabbitMQ

发布订阅

       这个部分我们将做一些完全不一样的事情 -- 我们将会把一个消息发送给多个消费者, 这个模式就被称为发布订阅模式.        

       为了用图解寿命这个模式, 我们将会建立一个简单的日志系统, 他将会包含两个项目, 第一个会发送日志消息, 第二个会接受然后打印这些消息.

       在这个日志系统中, 每一个接受者的副本都会收到消息, 因此我们可以启动一个接受者, 也可以称为消费者, and将这些log消息导向硬盘, 与此同时, 我们将会跑起另外一个消费者并且看到这些日志打印到屏幕上.

交换机

       其实一个消息并不是直接传递给队列的, 而是指定交换机, 然后由交换机传递给对应的队列.

       我们之前所构造的例子中, 包含这三个部分:

  • 一个生产者来生产消息, 然后发给队列
  • 一个队列, 这个队列来转发消息给消费者
  • 一个消费者, 消费者接受并处理来自队列的消息

       RabbitMQ的核心消息模式, 是生产者永远都不会直接给队列发送任何消息, 事实上大多数情况下, 生产者会并不知道它生产的消息将会被发送到哪个队列.

       相反, 生产者仅仅只能发送消息给交换机, 一个交换机是一个很简单的实现, 一方面它接受来自生产者的消息, 另外一方面,它将这些消息转发给队列. 交换机必定确切的知道它收到消息之后, 这个消息将会被发送到哪个队列. 比如说它是否会被添加到一个指定的队列, 或者是是其他的队列. 亦或是将其丢弃. 不管是哪种, 这些规则都是由交换机的类型决定

       首先创建一个交换机:

       然后给这个交换机绑定一个队列, 如下:

       可以看到这个test交换机绑定了一个test队列, 绑定之后指定routingKey, 后期producer发送消息的时候可以通过exchangeName来指定交换机, 然后通过routingKey来指定要传入哪个队列.

        那我可以将两个交换机绑定的队列, 并且将其指定的routingKey的值设置为一样的吗?

       一个交换机确实可以绑定两个队列,并且这两个绑定队列的routingKey可以设置为一样。但是,这样做的话,当消息使用这个特定的routingKey发送到交换机时,交换机会将消息路由到这两个队列中,实现消息的广播效果。

       在实际应用中,是否使用相同的routingKey取决于你的业务需求。如果你希望消息被发送到多个队列进行处理,那么可以设置相同的routingKey。但如果你希望根据不同的routingKey将消息路由到不同的队列,以实现更细粒度的控制,那么就应该为每个队列设置不同的routingKey。

       此外,需要注意的是,routingKey的匹配规则还受到交换机类型的影响。例如,在Direct Exchange中,routingKey必须与队列的绑定键完全匹配;而在Topic Exchange中,routingKey可以与绑定键进行模式匹配。因此,在设置routingKey时,还需要考虑你使用的交换机类型。

        交换机的类型:

  • Direct Exchange(直连交换机)
  • 特点:消息会传送给绑定键(BindingKey)与消息的路由键(RoutingKey)完全匹配的那个队列。
  • 工作原理:在发送消息时,需要指定一个RoutingKey。当消息到达交换机时,交换机会查找与这个RoutingKey完全匹配的BindingKey,并将消息转发给对应的队列。如果找不到匹配的队列,消息则会被丢弃
  • 应用场景:适用于需要精确匹配RoutingKey的场景,如简单的请求-响应模型或者路由到特定服务或处理流程的队列。

        在这个设置中,我们可以看到direct exchange X绑定了两个队列。第一个队列用绑定密钥橙色绑定,第二个队列有两个绑定,一个绑定密钥黑色,另一个绑定密钥绿色。

       在这样的设置中,发布到交换机的带有路由关键字橙色的消息将被路由到队列Q1。路由关键字为黑色或绿色的消息将发送到Q2。所有其他消息都将被丢弃。

       多次绑定:

       使用同一个RoutingKey绑定多个队列是完全合法的, 例如上图, 我们可以给c1和c2这两个队列绑定同一个direct类型的交换机, 并且使用同一个RoutingKey : black. 上图这个案例中, 这个direct交换机的作用就类似于一个fanout交换机.

  • Topic Exchange(主题交换机)
  • 特点:与Direct类型的交换器类似,也是将消息路由到RoutingKey与BindingKey匹配的队列中,但它支持模糊匹配。
  • 工作原理:BindingKey可以包含通配符(如.*),使得RoutingKey可以与多个BindingKey匹配。这样,一个消息可以被路由到多个队列中。
  • 应用场景:适用于需要将消息发送到一组相关的队列的场景,如基于主题或模式的消息发布和订阅。
  • 使用: topic类型的交换机, 其RoutingKey不能乱写, 必须满足一定的要求, 它必须要求是一个单词列表, 单词之间谁用 点号分开. 例如 stock.usr.notice
  • routingKey的匹配规则: * 可以代表一个单词, 使用#代表多个单词, 例如 *.test可以匹配 a.test和b.test, test.#可以匹配test.a.b和test.a.c

  • Headers Exchange(头交换机)
  • 特点:不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
  • 工作原理:在绑定队列和交换器时,会制定一组键值对。当发送消息到交换器时,RabbitMQ会获取到该消息的headers,并对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对。如果匹配,则消息会被路由到该队列中。
  • 应用场景:适用于需要根据消息内容中的特定属性进行路由的场景,提供了更灵活的消息路由机制。
  • Fanout Exchange(扇型交换机)
  • 特点:发布/订阅的广播模式,它会将发送到该交换机的消息发送到所有与该交换机绑定的队列中。
  • 工作原理:当一个消息发送到扇形交换机时,交换机会将消息分别发送给所有绑定到该交换机上的队列,无论它们的RoutingKey或BindingKey是什么。
  • 应用场景:适用于需要将消息广播到多个队列的场景,如通知系统或需要多个服务或组件同时处理同一消息的情况。

       

默认交换机的类型

       创建交换机, 可以通过RabbitMQ提供的web插件来生成 :

       可以通过java client来生成:

channel.exchangeDeclare("logs", "fanout");

       其声明如下:

        现在我们就可以通过这个交换机来推送消息:

channel.basicPublish( "logs", "", null, message.getBytes());

       但是有人可能会想起来, 这和我们之前写的不一样, 我们之前没有指定这个交换机name啊, 或者是指定了一个空字符串, 如下:

channel.basicPublish("", "hello", null, message.getBytes());

       为什么它还是能够指定到hello这个队列??

       那是因为:

       The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.

       你指定的空串代表它的交换机为默认交换机, 默认交换机是队列在创建的时候, 已经和队列进行绑定了, 这样保证每个队列能有一个初始化的默认的交换机. 如果你指定的是默认交换机, 那么这个routingKey就为你指定的队列名字了.

       并且你不能显示的去让队列绑定默认交换机, 也不能让队列和默认交换机解绑, 当然, 默认交换机也不能被删除.

案例

       实现一个fanout交换机, 实现一个生产者,  两个队列, 两个队列bind到这个fanout交换机, 创建两个消费者, 分别接受两个队列的消息.

实现一个生产者, 可以不断地输入数据 :

package fanoutExchangeTest;
 
import Util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
 
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
 
public class Producer {
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
 
        // declaring an exchange named logs and its type is fanout
        channel.exchangeDeclare("logs","fanout");
 
        // bind queue
        channel.queueBind("queue1","logs", "logsToQueue1");
        channel.queueBind("queue2","logs", "logsToQueue2");
 
        // declaring two queues the one named queue1 and the other one named queue2
//        channel.queueDeclare("queue1",true,true,false,null);
//        channel.queueDeclare("queue2",true,true,false,null);
        // manage message
        ConcurrentNavigableMap<Long, String> map = new ConcurrentSkipListMap<>();
        // publish and confirm
        channel.confirmSelect();
 
        // callback : success
        ConfirmCallback success = (sequenceNumber,multiple) -> {
            if (multiple) {
                ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(sequenceNumber,true);
                longStringConcurrentNavigableMap.clear();
            } else {
                map.remove(sequenceNumber);
            }
        };
        // callback : fail
        ConfirmCallback fail = (sequenceNumber,multiple) -> {
            String body = map.get(sequenceNumber);
            System.err.format(
                    "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
                    body, sequenceNumber, multiple
            );
            success.handle(sequenceNumber,multiple);
        };
 
        // add non - sycn listener
        channel.addConfirmListener(success,fail);
 
        // publis code
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String msg = scanner.next();
            channel.basicPublish("logs", "testFanout",null, msg.getBytes());
            channel.waitForConfirmsOrDie(3000L);
        }
 
    }
}

创建两个消费者:

package fanoutExchangeTest;
 
import Util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
 
import java.io.IOException;
 
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMQUtil.getChannel();
        channel.queueDeclare("queue1",false,false,false,null);
 
        channel.basicConsume("queue1", false, (s, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
 
            System.out.println(" [Consumer1] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }, s -> {
            System.out.println("nothing");
        });
    }
}
package fanoutExchangeTest;
 
import Util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
 
import java.io.IOException;
 
public class Consumer2 {
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMQUtil.getChannel();
        channel.queueDeclare("queue2",false,false,false,null);
 
        channel.basicConsume("queue2", false, (s, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
 
            System.out.println(" [Consumer2] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }, s -> {
            System.out.println("nothing");
        });
    }
}

       首先启动两个消费者, 然后启动生产者, 随后输入数据, 输出:

临时队列

        有时候我们需要一些流动性, 变化性很强的数据, 就可以创建临时队列, 他有如下特性:

  • 匿名性:临时队列通常没有明确的名称,而是由RabbitMQ服务器在创建时自动分配一个唯一的名称。这使得它们非常适合于一次性使用或短暂存在的场景。
  • 自动删除:当最后一个消费者断开连接时,临时队列会自动被删除。这种特性使得队列的管理变得简单,因为您不需要手动跟踪和删除不再使用的队列。
  • 非持久化:临时队列通常也是非持久化的,这意味着它们不会存储在磁盘上,因此当RabbitMQ服务器重启时,这些队列及其内容会丢失。
  • 使用场景:临时队列在RPC(远程过程调用)场景中特别有用,其中客户端发送一个请求并等待一个响应。在这种情况下,客户端可以创建一个临时队列来接收响应,一旦响应被接收,队列就可以被自动删除。
  • 创建方式:在代码中,您可以使用RabbitMQ的客户端库来创建临时队列。例如,在RabbitMQ的Java客户端中,您可以通过不指定队列名称,并设置某些参数来创建一个临时队列。当您声明一个队列但不提供名称时,RabbitMQ会自动为您生成一个唯一的名称。
  • 注意事项:虽然临时队列提供了便利性和简化管理的好处,但您也应该意识到它们的局限性。由于它们是非持久化的,并且会在最后一个消费者断开连接时自动删除,因此不适合用于需要长期保存数据或需要在多个会话之间共享数据的场景。

下面是如何进行获取一个临时队列:

String queueName = channel.queueDeclare().getQueue();

死信队列

       死信队列(Dead-Letter Queue,DLQ)是一种特殊的队列,用于存放无法被正常处理的消息。这些消息可能由于各种原因,如消息被拒绝、消息过期、队列达到最大长度、消息格式错误或处理过程中抛出异常等,无法被消费者正常消费。通过将这些无法处理的消息放入死信队列,可以防止它们阻塞正常的消息处理流程,同时也方便进行后续的问题排查和处理。

       死信队列在消息中间件中是一个重要的概念,它增强了消息的可靠性,有效避免了因消息处理失败而引起的数据丢失和系统异常。此外,死信队列中的消息可以进行特殊处理,如记录日志、统计失败次数、发送告警通知等,有助于监控系统的健康状况,并对处理失败的消息进行进一步的分析和处理。

       值得注意的是,死信队列通常需要手动维护,而不是自动清空,因为死信消息往往需要人工分析和处理。在实际应用中,可以通过查询、导出和重新发送进入死信队列的死信消息,按需管理死信消息,避免消息漏处理。

  消费者C1代码:

package DeadQueue;
 
import Util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
 
import java.sql.SQLOutput;
import java.util.HashMap;
import java.util.Map;
 
/**
 * 消费者1
 */
public class Consumer1 {
    // 有两个交换机
    public static final String normal_exchange = "normal_exchange";
 
    // 死信交换机
    public static final String dead_exchange = "dead_exchange";
 
    // 普通队列
    public static final String normal_queue = "normal_queue";
 
    // 死信队列
    public static final String dead_queue = "dead_queue";
 
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
 
        // 声明两个交换机: 死信交换机和普通交换机
        channel.exchangeDeclare(normal_exchange, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(dead_exchange,BuiltinExchangeType.DIRECT);
 
        Map<String, Object> map = new HashMap<>();
        // 过期时间
        map.put("x-message-ttl",10000);
        // 正常队列设置死信交换机
        map.put("x-dead-letter-exchange",dead_exchange);
        // 设置死信消息的RoutingKey
        map.put("x-dead-letter-routing-key", "lisi");
 
        // 声明两个队列
        channel.queueDeclare(normal_queue,false,false,false,map); // 声明将死信发送给死信交换机
        channel.queueDeclare(dead_queue,false,false,false,null);
 
        // 绑定交换机和队列
        // 绑定普通队列和消费者1
        channel.queueBind(normal_queue,normal_exchange,"zhangsan");
        channel.queueBind(dead_queue,dead_exchange,"lisi");
 
        DeliverCallback deliverCallback = (tag,msg) -> {
            System.out.println("consumer1接收到消息: " + new String(msg.getBody(),"UTF-8"));
        };
        channel.basicConsume(normal_queue,true, deliverCallback, tag-> {});
    }
}

消费者C2代码:

package DeadQueue;
 
import Util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
 
import java.util.HashMap;
import java.util.Map;
 
/**
 * 消费者1
 */
public class Consumer2 {
    // 死信队列
    public static final String dead_queue = "dead_queue";
 
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
 
 
        DeliverCallback deliverCallback = (tag,msg) -> {
            System.out.println("consumer2接收到消息: " + new String(msg.getBody(),"UTF-8"));
        };
        channel.basicConsume(dead_queue,true, deliverCallback, tag-> {});
    }
}

生产者代码:

package DeadQueue;
 
import Util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
 
public class Producer {
    // 定义一个普通交换机即可
    public static final String normal_exchange = "normal_exchange";
 
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
        // 死信消息
        for (int i = 0; i < 10; i++) {
            String msg = "info" + i;
            channel.basicPublish(normal_exchange,"zhangsan", null/* 这里消息的过期时间已经在队列声明的时候设置*/, msg.getBytes());
        }
    }
}

首先启动消费者C1, 让其创建相关队列和交换机, 随后关闭消费者C1模拟其崩溃, 然后开启生产者, 发现normal队列里面产生了10条无法被消费消息;

随后开启消费者C2, 来消费死信队列的消息:

        当然, 一个消息被放入死信队列当然不止 设置过期时间这一种, 还可以设置队列最大长度, 当普通队列的长度到达最大值的时候, 这个时候额外的消息会被放入死信队列

        Map<String, Object> props = new HashMap<>();
        // 过期时间
        // props.put("x-message-ttl",10000);
        // 设置最大长度为6
        props.put("x-max-length",6);

        当然你也可以主动拒绝消息, 而不是被动的触发转发给死信队列.

如何设置主动拒绝?

        // 其他代码
        DeliverCallback deliverCallback = (tag,msg) -> {
            String getMsg = new String(msg.getBody(), StandardCharsets.UTF_8);
            if (getMsg.equals("info")) {
                System.out.println("消息:" + getMsg + " 被拒绝");
 
                // 拒绝策略需要开启手动应答
                // 第二个参数设置为false表示 不会重新将此消息返回原来的队列.
                channel.basicReject(msg.getEnvelope().getDeliveryTag(),false);
            } else {
                System.out.println("consumer1接收到消息: " + getMsg);
            }
        };
        channel.basicConsume(normal_queue,false, deliverCallback, tag-> {});

延迟队列

        有些时候, 我们并不是需要立即就将消息拿出来处理, 而是需要等待特定的时间, 然后再对它进行处理, 延迟队列就实现了这一点.

       RabbitMQ中的延迟队列(Delay Queue)是一种特殊的队列,其中的消息不会立即被消费,而是会被延迟一段时间后才进行处理。这种队列主

要用于那些需要在未来某个时间点被处理的消息,比如定时任务订单超时未支付自动取消等场景。

       延迟队列的底层原理通常是由消息过期时间(TTL)和交换机组成。生产者发送消息到交换机时,会设置消息的过期时间。当这个时间到达后,消息会从交换机发送到真正的队列中,等待消费者进行消费。

       RabbitMQ中的延迟队列提供了一种自动化的方式,可以延迟处理特定的业务逻辑,而无需进行额外的预处理操作。

       需要注意的是,RabbitMQ本身并不直接支持延迟队列,但可以通过一些插件或者特定的消息属性和交换机类型来实现延迟队列的功能。例如,可以使用RabbitMQ的延迟消息插件(rabbitmq-delayed-message-exchange)来实现延迟队列。

RabbitMQ Tutorial by Java(3)https://developer.aliyun.com/article/1517446

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
20天前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
38 3
|
2月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
2月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
31 0
rabbitmq基础教程(ui,java,springamqp)
|
2月前
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
113 0
|
3月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
182 0
|
4月前
|
Java
MQTT(EMQX) - Java 调用 MQTT Demo 代码
MQTT(EMQX) - Java 调用 MQTT Demo 代码
176 0
MQTT(EMQX) - Java 调用 MQTT Demo 代码
|
5月前
|
消息中间件 Java Maven
如何在Java中使用RabbitMQ
如何在Java中使用RabbitMQ
|
6月前
|
消息中间件 Java
Java一分钟之-RabbitMQ:AMQP协议实现
【6月更文挑战第11天】RabbitMQ是基于AMQP协议的开源消息队列服务,支持多种消息模式。本文介绍了RabbitMQ的核心概念:生产者、消费者、交换器、队列和绑定,以及常见问题和解决方案。例如,通过设置消息持久化和确认机制防止消息丢失,配置死信队列处理不可消费消息,以及妥善管理资源防止泄漏。还提供了Java代码示例,帮助读者理解和使用RabbitMQ。通过理解这些基础和最佳实践,可以提升RabbitMQ在分布式系统中的可靠性和效率。
136 0
Java一分钟之-RabbitMQ:AMQP协议实现
|
5月前
|
消息中间件 负载均衡 Java
JAVA面试之MQ
JAVA面试之MQ
73 0
|
5月前
|
消息中间件 Java Maven
如何在Java中使用RabbitMQ
如何在Java中使用RabbitMQ