RabbitMQ Tutorial by Java(3)

简介: RabbitMQ Tutorial by Java

RabbitMQ Tutorial by Java(2)https://developer.aliyun.com/article/1517445

SpringBoot集成RabbitMQ

如何声明

       首先创建SpringBoot项目, 然后引入依赖:

<!--        rabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

       写入配置文件:

spring:
  rabbitmq:
    host: your-rabbitMQ-host
    port: your-port
    username: username
    password: password

       声明配置类:

package com.example.rabbitmqtest.config;
 
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitMQConfig {
    // 确定交换机名
    public static final String EXCHANGE_NAME = "TEST_EXCHANGE";
    // 确定队列名
    public static final String QUEUE_NAME = "QUEUE_NAME";
 
    // 确定RoutingKey
    public static final String ROUTING_KEY = "ROUTING_KEY";
 
    // 声明交换机
    @Bean(EXCHANGE_NAME)
    public DirectExchange getExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }
 
    // 声明队列
    @Bean(QUEUE_NAME)
    public Queue getQueue() {
        return new Queue(QUEUE_NAME);
    }
 
    // 绑定交换机和队列
    @Bean
    public Binding queueBindingExchange(
            @Qualifier(QUEUE_NAME) Queue queue,
            @Qualifier(EXCHANGE_NAME) DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}

建立controller接口接收消息, 并将消息以生产者的身份发送给交换机:

@ResponseBody
@Controller
@RequestMapping("/your-path")
public class SendMsgController {
    // 使用这个模板类来来对RabbitMQ进行操作
    @Resource
    private RabbitTemplate rabbitTemplate;
 
    // 添加路径
    @GetMapping("/child-path/{message}")
    public void sendMSG(@PathVariable String message) {
        System.out.println("当前系统的时间:{" +  new Date().toString() +"},发送一条消息给两个ttl队列:{"+message+"}");
        // 发送消息
        rabbitTemplate.convertAndSend(ExchangeName, "RoutingKey", message);
    }
}

声明消费者:

package com.example.rabbitmqtest.consumer;
 
import com.example.rabbitmqtest.config.TTLQueueConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
import java.nio.charset.StandardCharsets;
import java.util.Date;
 
/**
 * 队列ttl 的消费者
 */
 
@Component
public class DeadLetterQueueConsumer {
    // 接收消息, 添加监听器, 监听对应queue中的消息, 可以包含多个queue, 多个queue之间使用逗号隔开
    @RabbitListener(queues = {"queue1","queue2","queue3"})
    public void recieveD(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("当前系统的时间:{" +  new Date().toString() +"},发送一条消息给两个ttl队列:{"+msg+"}");
 
    }
}
发布确认

此时如果消息 发布出去, 但是由于某种原因接受失败:

       如果生产者发的, 交换机没有回应, 那么就应该调用回调接口来确认消息是否发送失败. 如果消费者发的消息没有发送过去, 那么就会触发回调接口, 让消息被缓存在内存中.

    @FunctionalInterface
    public interface ConfirmCallback {
        void confirm(@Nullable CorrelationData var1, boolean var2, @Nullable String var3);
    }

参数:

  • var1 是表示什么内容发送失败了.
  • var2 表示内容是否发送成功, 如果为true, 则表示发送成哥, 反之则为失败.
  • var3 表示发送失败的原因是什么

实现类:

package com.example.rabbitmqtest.config;
 
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
 
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    @Resource
    private RabbitTemplate rabbitTemplate;
 
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }
 
    /**
     * 交换机确认回调方法
     * @param correlationData 保存回调函数的消息的id以及其相关信息
     * @param b 交换机收到消息为true, 否则为false
     * @param s 失败的原因
     *
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        if(b) {
            System.out.println("success: 已接收到的消息的id>" + (correlationData!=null ? correlationData.getId() : "null"));
        } else {
            System.out.println("false:" + s);
        }
    }
}

   @PostConstruct注解标记的方法会在依赖注入完成后,自动被Spring框架调用。因此,在init方法中,您可以安全地访问rabbitTemplate字段,并将其确认回调设置为当前实例(this)。这样,当RabbitMQ确认消息时,就会调用MyCallBack中的confirm方法。

关联给生产者:

@ResponseBody
@Controller
@RequestMapping("/your-path")
public class SendMsgController {
    // 使用这个模板类来来对RabbitMQ进行操作
    @Resource
    private RabbitTemplate rabbitTemplate;
 
    // 添加路径
    @GetMapping("/child-path/{message}")
    public void sendMSG(@PathVariable String message) {
        CorrelationData correlationData = new CorrelationData();
        System.out.println("当前系统的时间:{" +  new Date().toString() +"},发送一条消息给两个ttl队列:{"+message+"}");
        // 发送消息
        rabbitTemplate.convertAndSend(ExchangeName, "RoutingKey", message,correlationData );
    }
}

配置文件 启动确认机制

spring:
  rabbitmq:
    publisher-confirm-type: correlated

       同时, 不只是correlated, 还有:

⚫ NONE

       禁用发布确认模式,是默认值

⚫ CORRELATED

       发布消息成功到交换器后会触发回调方法

⚫ SIMPLE

       单个确认

消息回退

        CallBack接口, 如果RoutingKey是正确的, 可以路由到对应的队列, 那么当消息正确被接受的时候, RabbitMQ调用回调并返回true, 失败则返回false, 但是如果指定的RoutingKey不存在, 那么消息就会被直接丢弃. 但是生产者是不知道的, 所以 我们应该想办法处理这个被丢弃的消息.

package com.example.rabbitmqtest.config;
 
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
 
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
 
 
public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {
    @Resource
    private RabbitTemplate rabbitTemplate;
 
    @PostConstruct
    private void init() {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(this);
    }
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("被回退的消息内容为: " + new String(returnedMessage.getMessage().getBody(), StandardCharsets.UTF_8));
 
    }
}

        在上述代码中, 首先需要让一个类实现RabbitTemplate.ReturnsCallback这个接口, 然后重写其中的returnedMessage方法, 如下, 消息中接受一个ReturnedMessage类型的参数, 它的具体源码如下:

package org.springframework.amqp.core;
 
public class ReturnedMessage {
    private final Message message;
    private final int replyCode;
    private final String replyText;
    private final String exchange;
    private final String routingKey;
 
    // 下面是一些构造方法 和 其对应的get方法, 以及toString方法, 这里省略
}

        这个类是Spring AMQP中的一个类, 用于封装RabbitMQ返回给生产者的消息信息. 当RabbitMQ的交换机无法将路由器路由到任何队列的时候, 它就会将消息返回给生产者. 并附带一些额外的信息. 例如上图所示的源码. 下面是对该类中的每一个成员变量的一个解释:

  1. Message message:这是原始的消息对象,包含了消息的体和其他属性,比如消息头、内容类型等。
  2. int replyCode:这是RabbitMQ返回的回复码。它是一个整数,通常用于指示返回的原因或状态。具体的值可能依赖于RabbitMQ的配置或文档。
  3. String replyText:这是RabbitMQ返回的回复文本。它通常是一个描述性的字符串,用于解释为什么消息被返回。
  4. String exchange:这是发送消息时使用的交换器名称。当消息返回时,这个交换器名称可以帮助你定位问题,了解是哪个交换器未能正确路由消息。
  5. String routingKey:这是发送消息时使用的路由键。路由键用于决定消息应该被路由到哪个队列。当消息返回时,路由键可以帮助你理解为何消息没有被路由到预期的队列。
备份交换机

       上述讲述了, 如何接收到被回退的消息, 然后手动做出相关处理, 但是如果消息内容多了,复杂了,手动处理的成本也会很高, 所以这个时候, 我们就需要另外一种方法来自动处理没有被正确路由并被返回的消息.

       可以使用备份交换机,在正常交换机无法正确路由某个消息的时候, 这个交换机就会将这个消息发送给备用交换机, 备用交换机连接着用来处理这些被回退的消息的队列和消费者(你也可以理解为一种监察者, 也就是warning consumer or backup consumer).

修改配置类
package com.example.rabbitmqtest.config;
 
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    public static final String WARNING_QUEUE_NAME = "warning.queue"; // 声明确认队列
 
    // 声明确认队列
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
 
    //声明确认队列绑定关系
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("key1");
    }
 
    //声明备份Exchange
    @Bean("backupExchange")
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }
 
    //声明确认Exchange交换机的备份交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        ExchangeBuilder exchangeBuilder =
                ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                        .durable(true)
                        // 设置备份交换机
                        .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
        return (DirectExchange) exchangeBuilder.build();
    }
 
    // 声明警告队列
    @Bean("warningQueue")
    public Queue warningQueue() {
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }
 
    // 声明报警队列绑定关系
    @Bean
    public Binding warningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(queue).to(backupExchange);
    }
 
    // 声明备份队列
    @Bean("backQueue")
    public Queue backQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }
 
    // 声明备份队列绑定关系
    @Bean
    public Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(queue).to(backupExchange);
    }
}

然后添加一个报警消费者即可

@Component 
@Slf4j  // 这里你也可以不使用slf4j
public class WarningConsumer { 
    public static final String WARNING_QUEUE_NAME = "warning.queue"; 
    @RabbitListener(queues = WARNING_QUEUE_NAME)
    public void receiveWarningMsg(Message message) { 
        String msg = new String(message.getBody()); 
        log.error("报警发现不可路由消息:{}", msg); 
    } 
}

队列 TTL

实现

思路图

       创建两个队列 QA和QB, QA的消息过期时间设置为10s, QB为40s,  然后创建一个直接交换机X, 和死信交换机Y, 也是direct类型, 然后创建一个死信队列QD, c为死信消费者.

按照上面的模式, 进行书写配置类:

package com.example.rabbitmqtest.config;
 
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
 
/**
 * 声明一个ttl队列
 */
@Configuration
public class TTLQueueConfig {
    // 普通交换机X
    public static final String X_EXCHANGE = "X";
    // 死信交换机Y
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
 
    // 普通队列QA, QB
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    // 死信队列QD
    public static final String DEAD_LETTER_QUEUE_QD = "QD";
 
    // 创建普通交换机
    @Bean(X_EXCHANGE)
    public DirectExchange x_exchage() {
        return new DirectExchange(X_EXCHANGE);
    }
    @Bean(Y_DEAD_LETTER_EXCHANGE)
    public DirectExchange Y_DEAD_LETTER_EXCHANGE() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }
    // 声明普通队列
    // QA
    @Bean(QUEUE_A)
    public Queue queue_a() {
        Map<String, Object> map = new HashMap<>();
        // 设置死信交换机
        map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        map.put("x-dead-letter-routing-key", "YD");
 
        // 设置ttl  单位是ms
        map.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(map).build();
    }
    // QB
    @Bean(QUEUE_B)
    public Queue queue_b() {
        Map<String, Object> map = new HashMap<>();
        // 设置死信交换机
        map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        map.put("x-dead-letter-routing-key", "YD");
 
        // 设置ttl  单位是ms
        map.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(map).build();
    }
 
    // 死信队列
    @Bean(DEAD_LETTER_QUEUE_QD)
    public Queue queue_d () {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE_QD).build();
    }
 
    // 绑定交换机
    // QA QB 绑定普通交换机X
    @Bean // X板顶QA, RoutingKey为XA
    public Binding queueABindingToX(
            @Qualifier(QUEUE_A) Queue QA,
            @Qualifier(X_EXCHANGE) DirectExchange X) {
        return BindingBuilder.bind(QA).to(X).with("XA");
    }
    @Bean // X板顶QB, RoutingKey为XB
    public Binding queueBBindingToX(
            @Qualifier(QUEUE_B) Queue QB,
            @Qualifier(X_EXCHANGE) DirectExchange X) {
        return BindingBuilder.bind(QB).to(X).with("XB");
    }
 
 
}
生产者

首先我们需要接收来自接口的消息, 然后将消息通过交换机X发送给队列QB和QA :

/**
 * 发送延迟消息
 *
 * 发送消息至 localhost .....  /ttl/sendMSG/test
 */
 
@ResponseBody
@Controller
@RequestMapping("/ttl")
public class SendMsgController {
    @Resource
    private RabbitTemplate rabbitTemplate;
 
    @GetMapping("/sendMSG/{message}")
    public void sendMSG(@PathVariable String message) {
        System.out.println("当前系统的时间:{" +  new Date().toString() +"},发送一条消息给两个ttl队列:{"+message+"}");
 
        rabbitTemplate.convertAndSend(TTLQueueConfig.X_EXCHANGE, "XA","消息来则ttl为10s的队列" + message);
        rabbitTemplate.convertAndSend(TTLQueueConfig.X_EXCHANGE, "XB","消息来则ttl为40s的队列" + message);
    }
}
消费者
package com.example.rabbitmqtest.consumer;
 
import com.example.rabbitmqtest.config.TTLQueueConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
import java.nio.charset.StandardCharsets;
import java.util.Date;
 
/**
 * 队列ttl 的消费者
 */
 
@Component
public class DeadLetterQueueConsumer {
    // 接收消息
    @RabbitListener(queues = {TTLQueueConfig.DEAD_LETTER_QUEUE_QD})
    public void recieveD(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("当前系统的时间:{" +  new Date().toString() +"},发送一条消息给两个ttl队列:{"+msg+"}");
 
    }
}

展示:

查看后端:

延迟队列优化

       如图:

       此处新增了一个Queue, 名为QC, 该队列不需要设置ttl时间.

存在的问题

        我们首先发送一条过期时间为20000ms的消息给QC, 然后发送一个2000ms的消息给QC,会发现:

       消息2虽然过期时间段, 但是它并没有优先发送给死信队列, 反而是过期时间长的消息1先发送给死信队列处理..

       因为RabbitMQ只会检查第一个消息是否过期, 如果过期则丢弃到死信队列. 然后再去检查第二个消息.

优先级队列

场景

        想象一下, 一个队列可以存储转发很多个消息来让特定的消费者消费,但是在复杂的情况中,消息也是分优先级的,比如说, 加入天猫是我创建的后台也是我写的, 那么它里面有很多商户,在收到顾客的消费请求的时候会创建很多订单, 但是不同的商家订单是不一样的. 我们必须对这些商家做出区分, 比如我们优先处理那些订单量大的商家的订单,给他们做一个优先处理.

       考虑到使用redis来做消息队列,但是redis只能使用一个list作为一个简单的消息队列.并不能实现一个优先级的场景, 所以考虑到我们RabbitMQ的优先级队列.

使用web插件添加优先级队列

        登录到web插件:

       添加新队列:

        这里的10意味着,队列接受的消息的优先级的范围是0~10,包括0和10也就是[0,10],你发送消息的时候可以将发送到该优先级队列的消息设置一个0~10的优先级.

       RabbitMQ会确保具有更高级别的消息优先于较低等级的消息被消费,如果有多个消费者,并且他们都是空闲的,那么具有最高优先级的消息将被传递给其中一个消费者.

       但是需要主义的是,如果你不设置x-max-priority,那么默认的最大优先级就是0,这就意味着所有的消息都具有相同的优先级,那么就和普通队列一样了.设置优先级的时候确保在优先级范围内,否则RabbitMQ会拒绝该消息.

使用java代码声明优先级队列
package PriorityQueue;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
import java.nio.charset.StandardCharsets;
 
public class Producer {
    public static void main(String[] args) throws Exception{
        // 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 工厂的ip, 链接rabbit队列
        connectionFactory.setHost("106.14.165.91");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("***");
 
        Connection connection = connectionFactory.newConnection();
 
        Channel channel = connection.createChannel();
        
        // 构造消息
        for (int i = 0; i <= 10; i++) {
            String msg = "消息:" + i;
            if (i % 2 == 0) {
                // 设置优先级的属性
                AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(i).build();
                channel.basicPublish("exchangeName","routingKey",properties,msg.getBytes(StandardCharsets.UTF_8));
            } else {
                channel.basicPublish("exchangeName","routingKey",null,msg.getBytes(StandardCharsets.UTF_8));
            }
        }
        System.out.println("消息发送完毕");
    }
}
消费者
package PriorityQueue;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
 
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
 
public class Consumer {
    public static void main(String[] args) throws Exception{
        // 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 工厂的ip, 链接rabbit队列
        connectionFactory.setHost("106.14.165.91");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("***");
 
        Connection connection = connectionFactory.newConnection();
 
        Channel channel = connection.createChannel();
 
        // 设置优先级属性
        Map<String,Object> params = new HashMap<>();
        params.put("x-max-priority",10);
        // 声明一个优先级队列
        channel.queueDeclare("queueName",true,false,false,params);
 
        System.out.println("启动消费者");
        // 确认回调函数
        DeliverCallback deliverCallback = (consumerTag,delivery) -> {
            String recieveMsg = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("接收到消息:" + recieveMsg);
        };
 
        channel.basicConsume("queueName",deliverCallback,(consumerTag)-> {
            System.out.println("消费者无法消费此消息时被调用");
        });
    }
}

惰性队列

       懒惰队列是一个classic类型的队列, 但是这个队列是以lazy模式运行的. 当你将某个队列设置为lazy模式, 那么在队列里面的消息就会被尽早的存如硬盘, 这些存入硬盘的消息会在他们被消费者请求消费的时候加载到内存中.

       这样设计的原因就是希望队列能支持一个更多数量的消息的存储. 消费者如果由于各种原因下线,那么队列中的元素就会堆积在内存, 导致内存溢出, 此时需要处理这些消息, 懒惰队列, 也可以称之为惰性队列, 它的处理方法就是将其持久化到硬盘, 然后要使用或者消费的时候就拿出来.

       对比消息的持久化, broker会将消息写入磁盘的时候, 也会给内存中进行一个备份, RabbitMQ释放内存的时候, 将消息持久化到硬盘是一个比较消耗资源的操作, 同时也会阻塞队列, 进而无法接受新的消息.

队列的两种模式:

  • default 默认模式
  • lazy  惰性模式

如何创建一个lazy queue?

  • channel.queueDeclare的时候传入对应的参数
1. Map<String,Object> params = new HashMap<>();
2. params.put("x-queue-mode","lazy");
3. channel.queueDeclare("queueName",true,false,false,params);

       如果同时设置了这两种的话, 那么policy优先生效

内存对比

     在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB👍

集群

      最后一个环节就是集群, 之前学过redis的同学都知道为什么要建立一个集群. 我们一台主机上有一个RabbitMQ服务器, 但是如果这个服务器挂了宕机了(

  • 交不起电费了
  • 地震了电线断了
  • 火灾把机器烧了
  • 内存崩溃导致RabbitMQ崩溃了
  • 主机进水了
  • 网站被压测导致内存崩溃, 后台杀死了RabbitMQ进程

), 导致不可用了, 那么生产者的消息就会找不到服务器而被直接丢弃, 造成的影响是毁灭性的

       此外单台RabbitMQ的服务器可以满足1000 per s 的消息吞吐量, 但是如果服务器需要100000 per s的吞吐量, 那么要想要买多台服务器, 那就显得有点力不从心了. 但是我们可以在单机上部署多个RabbitMQ服务器. 也就是建立一个RabbitMQ集群.

       首先你需要是三个主机, 分别为为node1,node2和node3. 但是主机名并不一定就是node1~3, 需要你去手动修改hostName, 如下:

vim /etc/hostname

        然后配置各个节点服务器的hosts文件, 让其能够相互识别:

vim /etc/hosts

node1,node2,node3都要修改

       然后将node1上面的cookie给node2和node3:

scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie

scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie

        启动RabbitMQ服务,顺带启动Erlang虚拟机和RbbitMQ应用服务(在三台节点上分别执行以下命令) :

rabbitmq-server -detached

        在节点2执行:

rabbitmqctl stop_app

(rabbitmqctl stop会将Erlang虚拟机关闭,rabbitmqctl stop_app只关闭RabbitMQ服务)

rabbitmqctl reset

rabbitmqctl join_cluster rabbit@node1

rabbitmqctl start_app(只启动应用服务)

       节点3执行:

rabbitmqctl stop_app

rabbitmqctl reset

rabbitmqctl join_cluster rabbit@node2

rabbitmqctl start_app

        集群状态:

rabbitmqctl cluster_status

        需要重新设置用户:

创建账号

rabbitmqctl add_user admin 123

设置用户角色

rabbitmqctl set_user_tags admin administrator

设置用户权限

rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

        解除集群节点(node2和node3机器分别执行)

rabbitmqctl stop_app

rabbitmqctl reset

rabbitmqctl start_app

rabbitmqctl cluster_status

rabbitmqctl forget_cluster_node rabbit@node2(node1机器上执行)

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
20天前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
38 3
|
2月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
2月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
31 0
rabbitmq基础教程(ui,java,springamqp)
|
2月前
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
113 0
|
3月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
182 0
|
4月前
|
Java
MQTT(EMQX) - Java 调用 MQTT Demo 代码
MQTT(EMQX) - Java 调用 MQTT Demo 代码
176 0
MQTT(EMQX) - Java 调用 MQTT Demo 代码
|
5月前
|
消息中间件 Java Maven
如何在Java中使用RabbitMQ
如何在Java中使用RabbitMQ
|
6月前
|
消息中间件 Java
Java一分钟之-RabbitMQ:AMQP协议实现
【6月更文挑战第11天】RabbitMQ是基于AMQP协议的开源消息队列服务,支持多种消息模式。本文介绍了RabbitMQ的核心概念:生产者、消费者、交换器、队列和绑定,以及常见问题和解决方案。例如,通过设置消息持久化和确认机制防止消息丢失,配置死信队列处理不可消费消息,以及妥善管理资源防止泄漏。还提供了Java代码示例,帮助读者理解和使用RabbitMQ。通过理解这些基础和最佳实践,可以提升RabbitMQ在分布式系统中的可靠性和效率。
136 0
Java一分钟之-RabbitMQ:AMQP协议实现
|
5月前
|
消息中间件 负载均衡 Java
JAVA面试之MQ
JAVA面试之MQ
73 0
|
5月前
|
消息中间件 Java Maven
如何在Java中使用RabbitMQ
如何在Java中使用RabbitMQ