一文带大家快速掌握RabbitMQ!(二)

简介: 一文带大家快速掌握RabbitMQ!

一文带大家快速掌握RabbitMQ!(一)https://developer.aliyun.com/article/1624433


入门使用

引入RabbitMQ依赖:

<dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>3.6.5</version>
</dependency>

创建一个生产者

public class Producer {
    private static final String QUEUE_NAME = "test01";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂并配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.58.129");
        connectionFactory.setPort(5672);
        // 设置虚拟机
        connectionFactory.setVirtualHost("/test");
        // 2. 通过连接工厂建立连接
        Connection connection = connectionFactory.newConnection();
        // 3. 通过connection创建Channel
        Channel channel = connection.createChannel();
        // 4. 通过Channel发送数据 (exchange, routingKey, props, body)
        // 不指定Exchange时, 交换机默认是AMQP default, 此时就看RoutingKey
        // RoutingKey要等于队列名才能被路由, 否则消息会被删除
        for (int i = 0; i < 5; i++) {
            String msg = "Learn For RabbitMQ-" + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("Send message : " + msg);
        }
        // 5.关闭连接
        channel.close();
        connection.close();
    }
}

创建一个消费者

public class Consumer {
    private static final String QUEUE_NAME = "test01";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂并配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.58.129");
        connectionFactory.setPort(5672);
        // 设置虚拟机
        connectionFactory.setVirtualHost("/test");
        // 2. 通过连接工厂建立连接
        Connection connection = connectionFactory.newConnection();
        // 3. 通过connection创建Channel
        Channel channel = connection.createChannel();
        // 4. 声明队列 (queue, durable, exclusive, autoDelete, args)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 5. 创建消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            /**
             * 获取消息 (监听到有消息时调用)
             * @param consumerTag 消费者标签, 在监听队列时可以设置autoAck为false,即手动确认(避免消息的丢失), 消息唯一性处理
             * @param envelope 信封
             * @param properties 消息的属性
             * @param body 消息的内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("Received message : " + msg);
            }
        };
        // 6. 设置Channel, 监听队列(String queue, boolean autoAck,Consumer callback)
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

参数:

  • queue:队列名称
  • durable:持久化,true 即使服务重启也不会被删除
  • exclusive:独占,true 队列只能使用一个连接,连接断开队列删除
  • autoDelete:自动删除,true 脱离了Exchange(连接断开),即队列没有Exchange关联时,自动删除
  • arguments:扩展参数
  • autoAck:是否自动签收(回执)

不指定Exchange时,交换机默认是AMQP default,此时就看RoutingKey,RoutingKey要等于队列名才能被路由,否则消息会被删除

交换机属性

Name:交换机名称

Type:交换机类型—— direct、topic、fanout、header

Durability:是否需要持久化,true为持久化

Auto Delete:当最后一个绑定到Exchange上的队列删除后,即Exchange上没有队列绑定,自动删除该Exhcange

Internal:当前Exchange是否用于RabbitMQ内部使用,大多数使用默认False

Arguments:扩展参数,用于扩展AMQP协议定制化使用

Direct Exchange


// Consumer
 // 声明交换机: 
 // (String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object) arguments)
 channel.exchangeDeclare("exchangeName", BuiltinExchangeType.DIRECT, true, false, false, null);
 // 声明队列 (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object) args)
 channel.queueDeclare("queueName", true, false, false, null);
 
 // 建立绑定关系:
 channel.queueBind("queueName", "exchangeName", "routingKey");
 // ===================================================================
 // Producer
 // 发送消息 (String exchange, String routingKey, BasicProperties props, Bytes[] body)
 channel.basicPublish("exchangeName", "routingKey", null, "msg".getBytes());

Topic Exchange


// Consumer
 // 声明交换机: 
 // (String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object) arguments)
 channel.exchangeDeclare("exchangeName", BuiltinExchangeType.TOPIC, true, false, false, null);
 // 声明队列 (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object) args)
 channel.queueDeclare("queueName", true, false, false, null);
 
 // 建立绑定关系:
 channel.queueBind("queueName", "exchangeName", "routingKey.#");
 // ===================================================================
 // Producer
 // 发送消息 (String exchange, String routingKey, BasicProperties props, Bytes[] body)
 channel.basicPublish("exchangeName", "routingKey.hi", null, "msg".getBytes());
 channel.basicPublish("exchangeName", "routingKey.save", null, "msg".getBytes());
 channel.basicPublish("exchangeName", "routingKey.save.hi", null, "msg".getBytes());

因为使用了模糊匹配的"#",可以匹配到发送的三条消息。因此可以收到三条消息

Fanout Exchange


// Consumer
 // 声明交换机: 
 // (String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object) arguments)
 channel.exchangeDeclare("exchangeName", BuiltinExchangeType.FANOUT, true, false, false, null);
 // 声明队列 (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object) args)
 channel.queueDeclare("queueName", true, false, false, null);
 
 // 建立绑定关系: 
 //(不设置routingKey, 这里为空)
 channel.queueBind("queueName", "exchangeName", "");
 // ===================================================================
 // Producer
 // 发送消息 (String exchange, String routingKey, BasicProperties props, Bytes[] body)
 // 同样routingKey为空 (也可以是任意字符串, 因为fanout并不依据routingKey)
 channel.basicPublish("exchangeName", "", null, "msg".getBytes());

高级特性

可靠性投递

什么是生产端的可靠性投递

  • 保障消息的成功发出
  • 保障MQ节点成功接收
  • 发送端收到MQ节点(Broker)确认应答(已收到)
  • 完善消息进行补偿机制

可靠性投递的方案一

消息落库(持久化至数据库),对消息状态进行打标,如若消息未响应,进行轮询操作

1.把业务消息落库,再生成一条消息落库到消息DB用来记录(譬如消息刚创建,正在发送中 status: 0)

缺点:对数据库进行两次持久化

2.生产端发送消息。

3.Broker端收到后,应答至生产端。Confirm Listener异步监听Broker的应答。

4.应答表明消息投递成功后,去消息DB中抓取到指定的消息记录,更新状态,如status: 1

5.如在3中出现网络不稳定等情况,导致Listener未收到消息成功确认的应答。

那么消息数据库中的status就还是0,而Broker可能是接收到消息的状态。

因此设定一个规则(定时任务),例如消息在落库5分钟后(超时)还是0的状态,就把该条记录抽取出来。

6.重新投递

7.限制一个重试的次数,譬如3次,如果大于3次,即为投递失败,更新status的值

用人工补偿机制去查询消息失败的原因

高并发场景消息的延迟投递,做二次确认,回调检查

Upstream service:生产端

Downstream service:消费端

1:业务消息落库后,发送消息至Broker。

2:紧接着发送第二条延迟(设置延迟时间)检查的消息。

3:消费端监听指定的队列接收到消息进行处理

4:处理完后,生成一条响应消息发送到Broker。

5:由Callback服务去监听该响应消息,收到该响应消息后持久化至消息DB(记录成功状态)。

6:到了延迟时间,延迟发送的消息也被Callback服务的监听器监听到后,去检查消息DB。如果未查询到成功的状态,Callback服务需要做补偿,发起RPC通讯,让生产端重新发送。生产端通过接收到的命令中所带的id去数据库查询该业务消息,再重新发送,即跳转到1。

该方案减少了对数据库的存储,保证了性能

消费端幂等性

避免消息的重复消费

消费端实现幂等性,接收到多条相同的消息,但不会重复消费,即收到多条一样的消息。

方案:

1.唯一ID + 指纹码机制

  • 唯一ID + 指纹码(业务规则、时间戳等拼接)机制,利用数据库主键去重
  • SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指纹码 未查询到就insert,如有说明已处理过该消息,返回失败
  • 优点:实现简单
  • 缺点:高并发下有数据库写入的性能瓶颈
  • 解决方案:根据ID进行分库分表、算法路由

2.利用Redis的原子性

需要考虑的问题:

  • 是否要落库数据库,如落库,数据库和缓存如何做到数据的一致性
  • 不落库,数据存储在缓存中,如何设置定时同步的策略(可靠性保障)

Confirm确认消息

生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。

生产者进行接收应答,用来确认这条消息是否正常发送到Broker是消息可靠性投递的核心保障。

确认机制的流程图

发送消息与监听应答的消息是异步操作。

确认消息的实现

  1. 在channel开启确认模式:channel.confirmSelect();
  2. 在channel添加监听:channel.addConfirmListener(ConfirmListener listener); 返回监听成功和失败的结果,对具体结果进行相应的处理(重新发送、记录日志等待后续处理等)

具体代码:

public class ConfirmProducer {
    private static final String EXCHANGE_NAME = "confirm_exchange";
    private static final String ROUTING_KEY = "confirm.key";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.58.129");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 指定消息的投递模式: 确认模式
        channel.confirmSelect();
        // 发送消息
        String msg = "Send message of confirm demo";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes());
        // 添加确认监听
        channel.addConfirmListener(new ConfirmListener() {
            // 成功
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("========= Ack ========");
            }
            // 失败
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("========= Nack ========");
            }
        });
    }
}


一文带大家快速掌握RabbitMQ!(三)https://developer.aliyun.com/article/1624435

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 SQL Java
Rabbitmq
Rabbitmq
49 1
|
2天前
|
消息中间件 存储 网络协议
一文带大家快速掌握RabbitMQ!(一)
一文带大家快速掌握RabbitMQ!
|
5月前
|
消息中间件 大数据 Java
RabbitMQ
RabbitMQ
90 1
|
5月前
|
消息中间件 Java 中间件
一、RabbitMQ介绍
一、RabbitMQ介绍
116 0
|
12月前
|
消息中间件 存储 数据库
RabbitMQ特殊应用
RabbitMQ特殊应用
56 0
|
消息中间件 存储 缓存
RabbitMQ到底为什么要使用它?
在多服务体系架构中,必然存在着多个服务之间的调用关系,当用户提交了订单,订单服务会调用支付服务执行用户的金钱操作,执行完毕之后紧接着调用商品服务对商家的商品信息(库存、成交量、收入等)进行更新,执行完毕之后又调用物流服务
|
消息中间件 存储 缓存
初识RabbitMQ
初识RabbitMQ
106 1
|
消息中间件 存储
RabbitMq
RabbitMq
114 0
|
消息中间件 存储 JSON
关于RabbitMQ
MQ是一种应用程序键一步通讯的技术,MQ是消息队列的缩写(Message Queue) 在MQ中,消息由一个应用程序发送到一个称为队列的中间件中,接着被中间件存储,并最终被另一个或多个消费者应用程序读取和处理; MQ组成:消息——生产者——队列——中间件——消费者!
73 0
|
消息中间件 存储 Java
RabbitMq使用
RabbitMq使用
144 0