RocketMQ源码解析:手把手教老婆看懂consumer接收到的MessageExt

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: RocketMQ源码解析:手把手教老婆看懂consumer接收到的MessageExt

image.png

RocketMQ使用教程相关系列 目录


目录

源代码说明

MessageExt的源码解析

继承于Message,Message源码解析

喝点毒鸡汤


源代码说明

MessageExt位于此包下:

package org.apache.rocketmq.common.message;

MessageExt的源码解析

参数说明在代码里

public class MessageExt extends Message {
    private static final long serialVersionUID = 5720810158625748049L;
    //记录MessageQueue编号,消息在Topic下对应的MessageQueue中被拉取
    private int queueId;
    //记录消息在Broker存盘大小
    private int storeSize;
    //记录在ConsumeQueue中的偏移
    private long queueOffset;
    //记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
    private int sysFlag;
    //消息创建时间,在Producer发送消息时设置
    private long bornTimestamp;
    //记录发送该消息的producer地址
    private SocketAddress bornHost;
    //消息存储时间
    private long storeTimestamp;
    //记录存储该消息的Broker地址
    private SocketAddress storeHost;
    //消息Id
    private String msgId;
    //记录消息在Broker中存储偏移
    private long commitLogOffset;
    //消息内容CRC校验值
    private int bodyCRC;
    //消息重试消费次数
    private int reconsumeTimes;
    //这个参数没看懂,知道的大佬分享下
    private long preparedTransactionOffset;
//省略get set
    @Override
    public String toString() {
        return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset
            + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost
            + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId
            + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="
            + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset
            + ", toString()=" + super.toString() + "]";
    }
}

继承于Message,Message源码解析

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    //主题
    private String topic;
    //网络通信层标记
    private int flag;
    /**
     *MIN_OFFSET:最小偏移
     * MAX_OFFSET:最大偏移
     * CONSUME_START_TIME:消费拉取时间
     *UNIQ_KEY:
     * CLUSTER:集群
     * WAIT:
     * TAGS:消息标签
     *DELAY:延时级别
     **/
    private Map<String, String> properties;
    //Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
    private byte[] body;
    //事务消息相关的事务编号
    private String transactionId;
   //得到延时级别
    public int getDelayTimeLevel() {
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) {
            return Integer.parseInt(t);
        }
        return 0;
    }
//设置延时级别
    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
//省略 get set
    @Override
    public String toString() {
        return "Message{" +
            "topic='" + topic + '\'' +
            ", flag=" + flag +
            ", properties=" + properties +
            ", body=" + Arrays.toString(body) +
            ", transactionId='" + transactionId + '\'' +
            '}';
    }
}1.public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    //主题
    private String topic;
    //网络通信层标记
    private int flag;
    /**
     *MIN_OFFSET:最小偏移
     * MAX_OFFSET:最大偏移
     * CONSUME_START_TIME:消费拉取时间
     *UNIQ_KEY:
     * CLUSTER:集群
     * WAIT:
     * TAGS:消息标签
     *DELAY:延时级别
     **/
    private Map<String, String> properties;
    //Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
    private byte[] body;
    //事务消息相关的事务编号
    private String transactionId;
   //得到延时级别
    public int getDelayTimeLevel() {
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) {
            return Integer.parseInt(t);
        }
        return 0;
    }
//设置延时级别
    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
//省略 get set
    @Override
    public String toString() {
        return "Message{" +
            "topic='" + topic + '\'' +
            ", flag=" + flag +
            ", properties=" + properties +
            ", body=" + Arrays.toString(body) +
            ", transactionId='" + transactionId + '\'' +
            '}';
    }
}

喝点毒鸡汤

每天读点源码,进步一小步,成长一大步。

目录
相关文章
|
3月前
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
82 3
|
5月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
EMQ
|
7月前
|
运维 Linux 网络性能优化
MQTT 5.0 报文解析 05:DISCONNECT
在 MQTT 中,客户端和服务端可以在断开网络连接前向对端发送一个 DISCONNECT 报文,来指示连接关闭的原因。客户端发送的 DISCONNECT 报文还可以影响服务端在连接断开后的行为,例如是否发送遗嘱消息,是否更新会话过期间隔。
EMQ
163 0
MQTT 5.0 报文解析 05:DISCONNECT
|
2月前
|
消息中间件 存储 监控
RocketMQ消息重试机制解析!
RocketMQ消息重试机制解析!
RocketMQ消息重试机制解析!
|
2月前
|
消息中间件 负载均衡 算法
聊聊 RocketMQ中 Topic,Queue,Consumer,Consumer Group的关系
本文详细解析了RocketMQ中Topic、Queue、Consumer及Consumer Group之间的关系。文中通过图表展示了Topic可包含多个Queue,Queue分布在不同Broker上;Consumer组内多个消费者共享消息;并深入探讨了集群消费与广播消费模式下Queue与Consumer的关系,以及Rebalancing机制在实例增减时如何确保负载均衡。理解这些关系有助于更好地掌握RocketMQ的工作原理,提升系统运维效率。
204 2
|
6月前
|
消息中间件 数据可视化 Go
Rabbitmq 搭建使用案例 [附源码]
Rabbitmq 搭建使用案例 [附源码]
49 0
|
2月前
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
47 0
|
4月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
76 2
EMQ
|
6月前
|
安全 开发工具 数据安全/隐私保护
MQTT 5.0 报文解析 06:AUTH
MQTT 5.0 引入了增强认证特性,它使 MQTT 除了简单密码认证和 Token 认证以外,还能够支持质询/响应风格的认证。为了实现这一点,它在原先 CONNECT 和 CONNACK 报文的基础上,又引入了 AUTH 报文来实现任意多次的认证数据交换,以支持各种不同类型的认证机制,例如 SCRAM、Kerberos 认证等等。
EMQ
303 8
MQTT 5.0 报文解析 06:AUTH
|
4月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
69 0

推荐镜像

更多