高性能消息中间件 RabbitMQ(二)

简介: 高性能消息中间件 RabbitMQ

2.3 账户管理

guest账户默认只允许本地使用,我们可以创建新账户远程访问RabbitMQ(如2.2中),但是不推荐远程使用MQ.

1、创建账户

# 创建账户
rabbitmqctl add_user MQzhang(用户名) MQzhang(密码)

2、给用户授予管理员角色

rabbitmqctl set_user_tags 用户名 administrator

3、给用户授权

# "/"表示虚拟机
# zj表示用户名
# ".*" ".*" ".*" 表示完整权限
rabbitmqctl set_permissions -p "/" MQzhang".*" ".*" ".*"

4、通过管控台访问rabbitmq即可。

2.4 管控台

2.5 Docker安装

1、关闭RabbitMQ服务

rabbitmqctl stop

2、在Centos7中安装docker

# 安装Docker
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
# 启动docker
systemctl start docker

3、拉取镜像

docker pull rabbitmq

4、启动MQ

docker run -d --hostname zj--name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

三、RabbitMQ工作模式

RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、远程调用模式(RPC,不常用)

3.1 简单模式

简介

特点

  1. 一个生产者对应一个消费者,通过队列进行消息传递。
  2. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机。

项目搭建

接下来我们使用JAVA代码操作RabbitMQ,让其按照简单模式进行工作。

JMS

由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则——JMS,用于操作消息中间件。JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。

1、启动RabbitMQ

# 开启管控台插件
rabbitmq-plugins enable rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached

2、创建普通maven项目,添加RabbitMQ依赖:

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.0</version>
  </dependency>
</dependencies>

3、编写生产者

package com.zj.mq.Simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建队列(如果队列已经存在的话则使用该队列,也就是说队列只会创建一次)和交换机(简单模式下使用的是默认交换机direct)
        /* 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数*/
        channel.queueDeclare("simpleQueue",false,false,false,null);
        //5.发送消息
        String msg ="hello rabbitMQ";
        /*
         * 参数1:交换机名,""表示默认交换机
         * 参数2:路由键,简单模式就是队列名
         * 参数3:其他额外参数
         * 参数4:要传递的消息字节数组
         */
        channel.basicPublish("","simpleQueue",null,msg.getBytes());
        //6.关闭资源(信道和连接)
        channel.close();
        connection.close();
        System.out.println("OK");
    }
}

4.编写消费者

package com.zj.mq.Simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*消费者*/
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("simpleQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接受消息为:"+message);
            }
        });
    }
}

消费者随时在监听队列只要队列有消息就会被消费。

3.2 工作队列模式

与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用direct交换机,应用于处理消息较多的情况。特点如下:

  1. 一个队列对应多个消费者。
  2. 一条消息只会被一个消费者消费。
  3. 消息队列默认采用轮询的方式将消息平均发送给消费者。

1、编写生产者,并产生大量消息

package com.zj.mq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建队列(如果队列已经存在的话则使用该队列,也就是说队列只会创建一次)和交换机(简单模式下使用的是默认交换机direct)
        /* 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数*/
        channel.queueDeclare("WorkQueue",false,false,false,null);
        //5.发送大量消息
        for (int i = 0; i < 100; i++) {
            /*
             * 参数1:交换机名,""表示默认交换机
             * 参数2:路由键,简单模式就是队列名
             * 参数3:表示该消息是持久化消息,即保存到内存也会保存到磁盘
             * 参数4:要传递的消息字节数组
             */
            channel.basicPublish("","WorkQueue", MessageProperties.PERSISTENT_TEXT_PLAIN,("这是第"+i+"个消息").getBytes());
        }
        //6.关闭资源(信道和连接)
        channel.close();
        connection.close();
    }
}

2.编写消费者

编写三个消费者,他们都监听的是一个队列。

package com.zj.mq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*消费者*/
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者1接受消息为:"+message);
            }
        });
    }
}
package com.zj.mq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*消费者*/
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者2接受消息为:"+message);
            }
        });
    }
}
package com.zj.mq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*消费者*/
public class Consumer3 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者3接受消息为:"+message);
            }
        });
    }
}
消费者1接受消息为:这是第0个消息
消费者1接受消息为:这是第3个消息
消费者1接受消息为:这是第6个消息
消费者1接受消息为:这是第9个消息
消费者1接受消息为:这是第12个消息
消费者1接受消息为:这是第15个消息
消费者1接受消息为:这是第18个消息
消费者1接受消息为:这是第21个消息
……
消费者2接受消息为:这是第1个消息
消费者2接受消息为:这是第4个消息
消费者2接受消息为:这是第7个消息
消费者2接受消息为:这是第10个消息
消费者2接受消息为:这是第13个消息
消费者2接受消息为:这是第16个消息
消费者2接受消息为:这是第19个消息
消费者2接受消息为:这是第22个消息
消费者2接受消息为:这是第25个消息
……
消费者3接受消息为:这是第2个消息
消费者3接受消息为:这是第5个消息
消费者3接受消息为:这是第8个消息
消费者3接受消息为:这是第11个消息
消费者3接受消息为:这是第14个消息
消费者3接受消息为:这是第17个消息
消费者3接受消息为:这是第20个消息
消费者3接受消息为:这是第23个消息
消费者3接受消息为:这是第26个消息
消费者3接受消息为:这是第29个消息
消费者3接受消息为:这是第32个消息


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
230 3
|
1月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
1月前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
1月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
2月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
2月前
|
消息中间件 监控 负载均衡
中间件RabbitMQ性能瓶颈
【7月更文挑战第13天】
147 11
|
2月前
|
消息中间件 NoSQL Kafka
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
|
2月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
2月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。