RabbitMQ工作模式2 Work queues工作队列模式

简介: RabbitMQ工作模式2 Work queues工作队列模式

RabbitMQ工作模式2  Work queues工作队列模式

RabbitMQ一共有六种工作模式,上面写的生产者是最简单的简单模式,工作模式就是消息的分发的方式

不同的工作模式指的是消息和路由的策略的不同


image.png


Work queues工作队列模式

模式说明 一个生产者发消息到队列里面,这个队列对应着两个消费者,两个消费者从同一个队列获取消息,是竞争的关系(竞争关系,从同一个队列中获取消息,同时也可以分担压力)

特点:与入门的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息

应用场景:对于任务过重或任务较多的情况使用工作队列可以提高任务处理的速度


image.png


代码实现

一个生产者

package com.wyh.producer;
/**
 * @program: SpringBoot-RabbitMQ
 * @description: RabbitMQ生产者 工作队列模式
 * @author: 魏一鹤
 * @createDate: 2022-03-23 22:40
 **/
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 *  producer主要用来发送消息
 *
 *
**/
public class WorkQueues_producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂  一般连接都是通过连接工厂进行连接 所以要创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
        factory.setHost("127.0.0.1");//设置主机ip  172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
        factory.setPort(5672);//设置端口 默认值也是5672
        factory.setVirtualHost("/itcast_wyh");//设置虚拟机 默认值/ 杠
        factory.setUsername("weiyihe");//设置用户名 默认值 guest 游客
        factory.setPassword("weiyihe");//设置密码 默认值 guest 游客
        //3 创建连接Connection
        Connection connection = factory.newConnection();
//4 创建channel
        Channel channel = connection.createChannel();
//5 创建队列Queue 它的参数比较多 下面一一说明
        /**
         *  1 String queue, 队列名称
         *  2 boolean durable,   是否持久化,当MQ重启之后还在 持久化到数据库中
         *  3 boolean exclusive, 它有两个功能 1是否独占:只能有一个消费者监听这个队列 2当connection关闭时,是否删除队列.一般设置为false
         *  4 boolean autoDelete, 是否自动删除,让没有consumer时,会自动删除掉
         *  5 Map<String, Object> arguments  参数信息
         *
        **/
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        AMQP.Queue.DeclareOk queue = channel.queueDeclare("work_queues", true, false, false, null);
//6 发送消息 它的参数比较多 下面一一说明
        //发送的消息队列  用来发送给消费者 一般发送的消息都是字节
        for (int i = 1; i <= 10; i++) {
            String body="hello rabbitmq work_queus!"+i;
//   1 String exchange,交换机的名称,简单模式下交换机会使用默认的 ""
            //   2 String routingKey, 路由名称
            //   3 BasicProperties props, 配置信息
            //   4 byte[] body  真实的发送的消息数据
            channel.basicPublish("","work_queues",null,body.getBytes());
        }
//7 释放资源
        channel.close();
        connection.close();
    }
}


image.png

两个消费者(竞争关系,从同一个队列中获取消息,同时也可以分担压力)

消费者1

package com.wyh.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @program: SpringBoot-RabbitMQ
 * @description: RabbitMQ消费者Consumer 工作队列模式
 * @author: 魏一鹤
 * @createDate: 2022-03-24 22:08
 **/
/**
 *  consumer主要用来消费消息
 *
 *
 **/
public class WorkQueues_consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂  一般连接都是通过连接工厂进行连接 所以要创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
        factory.setHost("127.0.0.1");//设置主机ip  172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
        factory.setPort(5672);//设置端口 默认值也是5672
        factory.setVirtualHost("/itcast_wyh");//设置虚拟机 默认值/ 杠
        factory.setUsername("weiyihe");//设置用户名 默认值 guest 游客
        factory.setPassword("weiyihe");//设置密码 默认值 guest 游客
        //3 创建连接Connection
        Connection connection = factory.newConnection();
//4 创建channel
        Channel channel = connection.createChannel();
//5 创建队列Queue 它的参数比较多 下面一一说明
        /**
         *  1 String queue, 队列名称
         *  2 boolean durable,   是否持久化,当MQ重启之后还在 持久化到数据库中
         *  3 boolean exclusive, 它有两个功能 1是否独占:只能有一个消费者监听这个队列 2当connection关闭时,是否删除队列.一般设置为false
         *  4 boolean autoDelete, 是否自动删除,让没有consumer时,会自动删除掉
         *  5 Map<String, Object> arguments  参数信息
         *
         **/
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        AMQP.Queue.DeclareOk queue = channel.queueDeclare("work_queues", true, false, false, null);
//6 接收消息  它的参数比较多 下面一一说明
        //  String queue, 队列名称
        //  boolean autoAck, 是否自动确认  消费者收到消息会自动告诉MQ它收到了消息
        //  Consumer callback  回调对象 可以监听一些方法
        //consumer本质是一个接口 需要创建它的实现类
        Consumer consumer=new DefaultConsumer(channel){
//匿名内部类 重写它的方法
            //回调方法 当收到消息后,会自动执行该方法 它有一些参数
            //String consumerTag 标识
            //Envelope envelope 可以获取一些信息 比如交换机 路由key
            //AMQP.BasicProperties properties 配置信息
            // byte[] body 数据
            //
            @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//System.out.println("consumerTag = " + consumerTag);
                //System.out.println("exchange = " + envelope.getExchange());
                //System.out.println("routingKey = " + envelope.getRoutingKey());
                //System.out.println("properties = " + properties);
                System.out.println("body = " + new String(body));
            }
/**   打印的信息
             consumerTag = amq.ctag-7ojIfmsjb9wcpbkwzwEobw
             exchange =
             routingKey = hello_world
             properties = #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
             body = hello RabbitMQ!
             consumerTag = amq.ctag-7ojIfmsjb9wcpbkwzwEobw
             exchange =
             routingKey = hello_world
             properties = #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
             body = hello RabbitMQ 222!
            **/
        };
        channel.basicConsume("work_queues",true,consumer);
//消费者本质是一个监听 所以不要去关闭资源
    }
}

消费者2

首先把两个消费者跑起来,发现有了队列,而且是两个消费者

package com.wyh.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @program: SpringBoot-RabbitMQ
 * @description: RabbitMQ消费者Consumer 工作队列模式
 * @author: 魏一鹤
 * @createDate: 2022-03-24 22:08
 **/
/**
 *  consumer主要用来消费消息
 *
 *
 **/
public class WorkQueues_consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂  一般连接都是通过连接工厂进行连接 所以要创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
        factory.setHost("127.0.0.1");//设置主机ip  172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
        factory.setPort(5672);//设置端口 默认值也是5672
        factory.setVirtualHost("/itcast_wyh");//设置虚拟机 默认值/ 杠
        factory.setUsername("weiyihe");//设置用户名 默认值 guest 游客
        factory.setPassword("weiyihe");//设置密码 默认值 guest 游客
        //3 创建连接Connection
        Connection connection = factory.newConnection();
//4 创建channel
        Channel channel = connection.createChannel();
//5 创建队列Queue 它的参数比较多 下面一一说明
        /**
         *  1 String queue, 队列名称
         *  2 boolean durable,   是否持久化,当MQ重启之后还在 持久化到数据库中
         *  3 boolean exclusive, 它有两个功能 1是否独占:只能有一个消费者监听这个队列 2当connection关闭时,是否删除队列.一般设置为false
         *  4 boolean autoDelete, 是否自动删除,让没有consumer时,会自动删除掉
         *  5 Map<String, Object> arguments  参数信息
         *
         **/
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        AMQP.Queue.DeclareOk queue = channel.queueDeclare("work_queues", true, false, false, null);
//6 接收消息  它的参数比较多 下面一一说明
        //  String queue, 队列名称
        //  boolean autoAck, 是否自动确认  消费者收到消息会自动告诉MQ它收到了消息
        //  Consumer callback  回调对象 可以监听一些方法
        //consumer本质是一个接口 需要创建它的实现类
        Consumer consumer=new DefaultConsumer(channel){
//匿名内部类 重写它的方法
            //回调方法 当收到消息后,会自动执行该方法 它有一些参数
            //String consumerTag 标识
            //Envelope envelope 可以获取一些信息 比如交换机 路由key
            //AMQP.BasicProperties properties 配置信息
            // byte[] body 数据
            //
            @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//System.out.println("consumerTag = " + consumerTag);
                //System.out.println("exchange = " + envelope.getExchange());
                //System.out.println("routingKey = " + envelope.getRoutingKey());
                //System.out.println("properties = " + properties);
                System.out.println("body = " + new String(body));
            }
/**   打印的信息
             consumerTag = amq.ctag-7ojIfmsjb9wcpbkwzwEobw
             exchange =
             routingKey = hello_world
             properties = #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
             body = hello RabbitMQ!
             consumerTag = amq.ctag-7ojIfmsjb9wcpbkwzwEobw
             exchange =
             routingKey = hello_world
             properties = #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
             body = hello RabbitMQ 222!
            **/
        };
        channel.basicConsume("work_queues",true,consumer);
//消费者本质是一个监听 所以不要去关闭资源
    }
}


这时再运行生产者,查看工作台,发现生产处理的消息被两个消费者分别消费了


image.png


消费者不一定非要配置两个,配置多个也是可以的


工作队列模式小结

1 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系

2 WorkQueues对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度,例如:短信服务部署多个,只需要有一个节点发送成功即可

3 消费者不一定非要部署一个,两个以上也是可以的,会分别分配队列消息资源

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8月前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
71 1
|
3月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
266 6
|
4月前
|
消息中间件 JSON Java
|
4月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
5月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
182 2
|
4月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
117 0
|
6月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
6月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
110 0
说说RabbitMQ延迟队列实现原理?
|
6月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
193 1
|
7月前
|
消息中间件 存储 监控
RabbitMQ 死信队列
RabbitMQ的死信队列(DLQ)是存储无法正常消费消息的特殊队列,常见于消息被拒绝、过期或队列满时。DLQ用于异常处理、任务调度和监控,通过绑定到普通队列自动路由死信消息。通过监听死信队列,可以对异常消息进行补偿和进一步处理,提升系统稳定性和可维护性。
136 1