生产者启动流程|学习笔记

简介: 快速学习生产者启动流程

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)生产者启动流程】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12470


生产者启动流程

 

启动流程

image.png

生产者启动入口是DefaultMQProducer,它里面有个start方法:

public void start() throws MQclientException {

this.setProducerGroup(withNamespace(this.producerGroup));

this.defaultMQProducerImpl.start();

if (null != traceDispatcher) {

try {

traceDispatcher.start(this.getNamesrvAddr(), this.getAccesschannel());

catch (MQclientException e) {

log.warn( "trace dispatcher start failed ", e);

}

}

}

通过start方法启动生产者,首先是设置当前生产者的组名,真正的启动的业务逻辑是在defaultMQProducerImpl类中完成的,这个类在创建DefaultMQProducer时已经完成实例化

public DefaultwQProducer(final string producerGroup,RPCHook rpcHook, boolean enableMsgTrace

final string customizedTraceTopic) {

this.producerGroup = producerGroup;

defaultMQProducerImpl = new pefaultMOProducerImp

(defaultMQProducer: this,rpcHook);

真正的启动流程是在defaultMQProducerImpl的start方法中:

//检查生产者组是否满足要求

this.checkconfig(;

//更改当前instanceName为进程IDif(!this.defau7tMQProducer.getProducerGroup().equa1s(Nix411.CLTENT_INNER_PRODUCER_GROUP)){

this.defau1tMQProducer.changeInstanceNameToPID();

}

//获得MQ客户端实例(获取MQ客户端管理器,通过MQClientManger获取客户端示例)

this.mQclientFactory =

MQc1ientManager.getInstance(). getAndcreateMQc1ientInstance(this.defaultNQProducer,rpcHook);

在check.config中完成生产者组名校验工作:

private void checkconfig() throws MQclientException {validators.checkGroup(this.defaultMQProducer.getProducerGroup());

//组名不能为空,组名不能和默认组名相同

if (null == this.defaultMQProducer.getProducerGroup()) {

throw new MQclientException("producerGroup is null" , null);

}

If (this.defaultMQProducer.getProducerGroup( ) .equal

s(NixAll.DEFAULT_PRODUCER_GROUP)){

throw new MCclientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GRO!

null);

}

}

说明:

整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表

ConcurrentMap<String/* clientld */,MQClientInstance> factoryTable = new

ConcurrentHashMap<String,MQClientlnstance>();

同一个clientld只会创建一个MQClientInstance。

MQClientinstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道;包括和MQ进行交互时主要通过MQClientInstance向外发送请求。

代码:MQClientManager#getAndCreateMQClientInstan

Ce

pub7ic MoclientInstance getAndcreateMQclientInstance(final clientconfig clientconfig, RPCHook rpcHook) {

//构建客户端ID

string c1ientId = clientconfig.bui1dMQc1ientId(;

//根据客户端ID或者客户端实例

MQclientInstance instance = this.factoryTable.get(clientId);

//实例如果为空就创建新的实例,并添加到实例表中

if (nu17 == instance) {

instance =

new MQclientInstance(clientconfig.cloneclientconfig(),

this.factoryIndexGenerator.getAndIncrement(,clientId,rpcHook);

MQclientInstance prev = this.factoryTab1e.putIfAbsent(clientId,instance);

if (prev != null) {

instance = prev;

1og.warn("Returned Previous MQclientInstance for clientId:[i]",clientId);

}else {

log.info("created new MoclientInstance for clientId:[0]",clientId);

}

}

return instance;

}

拿到客户端实例之后,将当前生产者注册到客户端实例中,注册完成之后,客户端实例就启动了,下面是启动过程。

//注册当前生产者到MQClientInstance管理中,方便后续调用网路请求

boolean registerOK = mClientFactory.registerProducer

(this.defaultMQProducer.getProducerGroup()producer.this);

if ( !registeroK) {

this.servicestate = servicestate.CREATE_JUST;

throw new MQclientException("The producer group["+ this.defaultMQProducer.getProducerGroup()

+ "] has been created before, specify another name please."+FAQUrl.suggestTodo(FAQUr1.GRoup_NAME_DUPLICATEnull);this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

//启动生产者

if (startFactory) {

mQclientFactory . start();

}

总结:真正的producer启动过程是在DefaultMQProducerImpl的start方法中完成的,首先是检查生产者组是否符合要求,然后将实例名称改为进程ID、获取MQ客户端实例,得到客户端实例后将当前生产者注册到客户端实例中去启动生产者。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
SQL 存储 搜索推荐
SQL游标的原理与在数据库操作中的应用
SQL游标的原理与在数据库操作中的应用
|
监控 Shell 数据处理
Python执行Shell并获取结果的全面指南
Python执行Shell并获取结果的全面指南
920 1
|
SQL druid Java
解决 ‘The last packet successfully received from the server was xxx milliseconds ago‘ 问题
解决 ‘The last packet successfully received from the server was xxx milliseconds ago‘ 问题
7631 0
|
运维 Prometheus 监控
java异常 | 处理规范、全局异常、Error处理
java异常 | 处理规范、全局异常、Error处理
|
JavaScript 前端开发 Linux
pipx — 在隔离环境中安装和运行 Python 应用程序
pipx — 在隔离环境中安装和运行 Python 应用程序
598 1
|
弹性计算 算法 Java
一文说清linux system load averages
深入浅出阐释linux system load averages的语义,算法和计算流程,并分享了实际load飙高问题的排查经验和心得。
一文说清linux system load averages
|
网络协议 Windows
纯IPv4环境访问IPv6网站
在纯IPv4环境中访问IPv6网站,可以通过Teredo协议。适用于Windows 10 19043.928版。操作包括:检查Teredo状态、设置为不可用或企业客户端、指定服务器(如teredo.iks-jena.de)、配置端口(可选),然后验证通过ping IPv6地址(如6.ipw.cn)来确认功能是否正常。
8231 0
|
小程序
APP备案:遇到的问题汇总指南!
APP备案是2023年7月工信部要求的一项基础性工作,会影响所有APP运营者,到2024年4月1日前如果APP没有备案,将会无法上架,所以本文告诉你在APP备案过程中遇到的一些问题,希望大家能顺利快速完成备案。
1840 7
|
XML Java 数据格式
掌握Spring Environment:配置管理的关键
掌握Spring Environment:配置管理的关键
624 1
|
Web App开发 前端开发 程序员
【 selenium】selenium4新版本使用指南
【 selenium】selenium4新版本使用指南
2564 1