RocketMQ Tag 详解!

简介: 本文详细介绍了 RocketMQ 中 Tag 的原理及其应用场景。Tag 是一种消息过滤机制,允许生产者在发送消息时指定标签,消费者据此选择性消费。文章通过源码分析展示了 Tag 在消息发送、存储及消费阶段的作用,并提供了完整的示例代码。尽管 Tag 功能简单高效,但也存在单一维度过滤等局限性。适合需要高效、低延迟消息传递的场景,如日志监控、电商系统等。

你好,我是猿java。

Tag 是 RocketMQ 提供的一种消息过滤机制,允许生产者在发送消息时指定一个或多个标签,消费者则可以根据这些标签来选择性地消费消息。这篇文章,我们将详细介绍 RocketMQ 中 Tag 的原理、源码分析以及示例。

Tag 的原理

在 RocketMQ 中,Tag 主要用于消息过滤。每个消息可以携带一个 Tag,消费者可以根据 Tag 来订阅特定的消息,从而实现消息的过滤和分类处理。

消息发送阶段

生产者在发送消息时,可以指定一个 Tag。这个 Tag 会被附加到消息的元数据中,并存储在 RocketMQ 的消息存储系统中。

消息存储阶段

消息被存储在 RocketMQ 的 Broker 中,消息的元数据(包括 Tag)也会被存储。

消息消费阶段

消费者在订阅消息时,可以指定要消费的 Tag。Broker 会根据消费者订阅的 Tag,将符合条件的消息投递给消费者。

源码分析

为了更好的理解 Tag的原理,我们通过 RocketMQ 中Tag 相关的几个主要代码片段进行演示。

生产者发送消息时的代码

// 创建消息实例,并指定Topic和Tag
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());

// 发送消息
SendResult sendResult = producer.send(msg);

Message 类中,Tag 是通过构造函数传递的,并存储在 Message 对象的 tags 字段中。

消费者订阅消息时的代码

// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");

// 订阅Topic,并指定Tag
consumer.subscribe("TopicTest", "TagA");

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
   
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   
        for (MessageExt msg : msgs) {
   
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

// 启动消费者
consumer.start();

DefaultMQPushConsumer 类中,通过 subscribe 方法指定要订阅的 Topic 和 Tag,RocketMQ 内部会根据订阅的 Tag 进行消息过滤。

示例

下面是一个完整的示例,演示如何使用 RocketMQ 的 Tag 功能。

生产者代码

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
   
    public static void main(String[] args) throws Exception {
   
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
        producer.setNamesrvAddr("localhost:9876");

        // 启动生产者
        producer.start();

        // 发送消息
        for (int i = 0; i < 10; i++) {
   
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes());
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        // 关闭生产者
        producer.shutdown();
    }
}

消费者代码

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
   
    public static void main(String[] args) throws Exception {
   
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅Topic,并指定Tag
        consumer.subscribe("TopicTest", "TagA");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
   
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   
                for (MessageExt msg : msgs) {
   
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

尽管 RocketMQ 的 Tag 功能在消息过滤和分类处理方面提供了极大的便利,但也有其优缺点。下面详细分析一下:

优点

简单易用

Tag 的使用非常简单,生产者只需在发送消息时指定 Tag,消费者在订阅消息时指定相应的 Tag 即可。

高效过滤

通过 Tag 进行消息过滤,减少了消费者处理不相关消息的开销,从而提高了系统的性能。

灵活性高

支持一个 Topic 下多个 Tag,使得消息的分类和过滤更加灵活。

低延迟

Tag 过滤是在 Broker 端进行的,不会显著增加消息传递的延迟。

减少网络带宽

消费者只会接收到自己感兴趣的消息,减少了不必要的网络传输,从而节省了带宽。

缺点

单一维度过滤

Tag 只能提供单一维度的消息过滤,无法进行更复杂的多维度过滤。如果需要多维度过滤,需要结合其他机制(如消息属性)来实现。

有限的灵活性

Tag 的数量和种类在设计阶段需要规划好,灵活性有限。如果后期需要添加新的 Tag,可能需要重新设计和部署。

不支持复杂逻辑

Tag 过滤支持的逻辑较为简单,只能进行基于字符串匹配的过滤,无法支持复杂的过滤逻辑。

管理复杂性

随着系统规模的增大,Tag 的管理和维护可能变得复杂,尤其是在多个应用共享同一个 Topic 的情况下。

潜在的性能瓶颈

虽然 Tag 过滤在大多数场景下性能良好,但在极端情况下(如大量不同 Tag 的消息和高并发消费),可能会带来性能瓶颈。

适用场景

日志和监控

不同类型的日志和监控数据可以通过 Tag 进行分类和过滤。

电商系统

不同类型的订单、商品信息等可以通过 Tag 进行分类和过滤,消费者只处理自己感兴趣的消息。

金融系统

不同类型的交易、通知等可以通过 Tag 进行分类和过滤,提高系统的处理效率。

社交平台

不同类型的消息(如评论、点赞、私信等)可以通过 Tag 进行分类和过滤,提供更精准的消息推送。

总结

本文分析了 RocketMQ 的 Tag 功能,它在消息过滤和分类处理方面提供了极大的便利,适用于各种需要高效、低延迟消息传递的场景。然而,它也有一些局限性,如单一维度过滤、管理复杂性等。

在实际应用中,需要根据具体需求和系统设计,合理使用 Tag 功能,结合其他机制来实现更复杂的消息过滤和处理。

学习交流

如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注:猿java,持续输出硬核文章。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件 Java 开发工具
消息队列 MQ产品使用合集之topic相同,但是tag不同,这个类不能放入map中,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
SQL 存储 消息中间件
RocketMQ的TAG过滤和SQL过滤机制
写作目的 项目中各个中台都使用同一个DB。而DB下会使用中间件监听binlog转换成MQ消息,而下游的各个中台去MQ去拿自己感兴趣的消息。
357 0
RocketMQ的TAG过滤和SQL过滤机制
|
存储 消息中间件 文件存储
RocketMQ中msg&tag的生命周期
RocketMQ中msg&tag的生命周期
96 0
RocketMQ中msg&tag的生命周期
|
消息中间件 存储 RocketMQ
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
721 0
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
|
消息中间件 存储 缓存
我擦,RocketMQ的tag还有这个“坑”!
我擦,RocketMQ的tag还有这个“坑”!
我擦,RocketMQ的tag还有这个“坑”!
|
消息中间件 负载均衡 Java
RocketMQ入门到入土(七 )为什么同一个消费组设置不同tag会出现奇怪现象
RocketMQ入门到入土(七 )为什么同一个消费组设置不同tag会出现奇怪现象
|
消息中间件 负载均衡 Java
RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?
RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?
1313 0
|
消息中间件 RocketMQ 测试技术
rocketMq - tag不一致造成的假象
概述     这篇文章是以同事在实际工作中遇到的问题作为分析的切入点,加深自己对mq的掌握,践行“干中学”的团队理念。     当自己差不多把基本概念都掌握的差不多的时候,必须需要实际的案例或者实践来提深自己的深度,这个时候just do it 变得很重要,所以我喜欢不停的被人挑战,截止目前帮人解答的问题包括:client端消息堆积问题、批量消息拉取问题中遇到的神奇的数字32、以及本篇的tag不一致造成的假象,也就说会有3篇文章输出。
1520 0
|
3月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1天前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
30 4