路由注册之发送心跳包|学习笔记

简介: 快速学习路由注册之发送心跳包

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)路由注册之发送心跳包】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12465


路由注册之发送心跳包

 

路由注册

思考:路由注册由谁注册?

应该由broker完成路由注册。

打开broker启动的代码,在broker启动之后就会开始向NameServer注册路由信息。

image.png

image.png如上图,首先启动入口有个main方法,在main方法里首先是创建BrokerController,创建BrokerController的方式和创建namesrvcontroller相似,先创建brokerconfig,然后再创建nettyserverconfig和nettyclientconfig。对于生产者来说,broker需要处理生产者的请求,nettyserverconfig是作为服务端存在的;而对于NameServer,broker要上报数据给NameServer进行心跳检测,所以就要分别创建brokerconfig、nettyserverconfig及nettyclientconfig

final Brokerconfig brokerconfig = new Brokerconfig();

final NettyServerconfig nettyServerconfig = new Nettyserverconfig();

final NettyclienEconfig nettyclientconfig = new Nettyclientconfig();nettyclientconfig.setuseTLS(Boolean.parseBoolean(System.getproperty(TLs_ENABLE,

string.valueof(Tlssystemconfig.tlsMode == TlsMode.ENFORCING))));

nettyServerconfig.setListenPort(10911);

//nettyServerconfig.setListenPort(10911)是配置nettyServerconfig的端口号,生产者是通过10911这个端口提交信息到broker。

final Messagestoreconfig messagestoreConfig = new MessagestoreConfig();

//填充配置类

if (BrokerRole.SLAVE == messagestoreconfig.getBrokerRole()) {

int ratio = messagestoreconfig.getAccessMessageInMemory

MaxRatio() - 10;

messagestoreconfig.setAccessMessageInMemoryMaxRatio(ratio);

}

if (commndLine.hasoption( 'c')) {

//-c解析配置文件填充配置类

strig file = commandLine.getoptionvalue( 'c' );

if (file != nul1) {

configFile = file;

Inputstream in = new BufferedInputstream(new FileInputstre

am(file));

properties = new Properties();

properties. load(in);

properties2systemEnv( properties);

MixAll.properties20bject(properties, brokerConfig);

MixAll.properties20bject(properties,nettyserverConfig);

MixAll.properties2object(properties,nettyclientconfig);

MixAll.properties20bject(properties,messagestoreconfig);

BrokerPathconfigHelper.setBrokerconfigpath(file);

in.close();

}

}

以下代码是从brokerconfig中拿到NameServerAddr,因为broker要上报心跳信息到NameServer,所以在解析配置文件完成后,brokerConfig就可以得到NameServerAddr。通过这个代码看到,如果NameServer是搭建一个集群,需要以分号拼接。

string namesrvAddr = brokerconfig.getNamesrvldr();

if ( null != namesrvAddr) {

try {

string[] addrArray = namesrvAddr.split( regex: "; ");

for ( string addr : addrArray) {

Remotingutil.string2socketAddress( addr);

}

}catch (Exception e) {

system.out.printf(

"The Name Server Address[%s] illegal,please set it as follows,\"127.0.0.1:9876;192

namesrvAddr);

System.exit( status: -3);

}

}

//BrokerController创建完成

final Brokercontroller controller = new BrokerController(

brokerconfig,

nettyServerconfig,

nettyclientconfig,

messagestoreconfig);

// remember all configs to prevent discard

controller.getconfiguration( ).registerconfig(properties);

//对BrokerController进行初始化

boolean initResult = controller.initialize();

if ( !initResult) {

controller.shutdown( );

system.exit( status: -3);

}

在完成初始化里,完成的是对BrokerController属性的赋值,包括进行messageStore等等。初始化完成之后,controller就可以返回,返回到start方法,最终通过start方法进行启动。

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

private volatile boolean hasshutdown = false;

private AtomicInteger shutdownTimes = new AtomicInteger( initialValue: 0);

@override

public void run() {

synchronized (this) {

Log.info("shutdown hook was invoked,{}" , this.shutdownTimes.incrementAndGet());

if ( ! this.hasshutdown) {

this.hasshutdown = true;

long beginTime = system.currentTimeMillis();

controller.shutdown();

long consumingTimeTotal = system.currentTimeMillis() - beginTime;

log.info("shutdown hook over,consuming total time(ms): { }" , consumingTimeTotal)

}

}

}

},name: "shutdownHook" ) );

这段代码是JVM中注册钩子函数,保证JVM在退出时,进行资源释放。

之后是进行启动,启动是调用Controller里面的start方法,启动之后要开始进行注册路由信息,注册完成之后创建线程池

this.registerBrokerAll( checkOrderConfig: true,oneway:. false,forceRegister: true);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@override

public void run( ) {

try {

BrokerController.this.registerBrokerAll( checkOrderConfig: true,oneway:. false,brokercoConfig.isForceRegister()

}catch (Throwable e) {

log.error( "registerBrokerAll Exception", e);

}

}

},initialDelay: 1000 * 10,Math.max(10000,Math.min(brokerCo

nfig.getRegisterNameServerPeriod(),6000)),TimeUnit.

getRegisterNameServerPeriod()默认是30s,因为以上代码的存在,broker在启动之后每隔30s要上传细条信息给NameServer。

在registerBrokerAll注册代码中,有一个重要方法doRegisterBrokerAll:

doRegisterBrokerAll(checkorderconfig,oneway,topicconfigwrapper);

通过这个方法,调用了brokerOuterAPI,通过registerBrokerAll方法向外发送请求:

private void doRegisterBrokerAl1(boolean checkorderconfig, boolean oneway,

Topicconfigserializewrapper topicconfigwrapper) {

List<RegisterBrokerResult> registerBrokerResultList = this.bpokerouterAPI.registerBrokerAll(

this. brokerconfig.getBrokerclusterName(),

this.getBrokerAddr(),

this.brokerconfig.getBrokerName(),

this. brokerconfig-getBrokerId(),

this.getHAServerAddr(),

topicconfigwrapper,

this.filterServerManager.buildNewFilterServerList(),

oneway ,

this.brokerconfig.getRegisterBrokerTimeoutMills(),

this.brokerconfig.iscompressedRegister());

//封装请求头

final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();

requestHeader.setBrokerAddr(brokerAddr);

requestHeader.setBrokerid(brokerId);

requestHeader. setBrokerName (brokerName);

requestHeader.setc1usterName(c1usterName);

requestHeader.setHaserverAddr(haserverAddr);

requestHeader. setcompressed(compressed);

//封装请求体

RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicconfigserializewrapper(topicconfigwrapper);

requestBody . setFi1terserverList(fi1teserverList);

final byte[] body = requestBody.encode(compressed);

final int bodycrc32 = uti1A17.crc32(body);

requestHeader.setBodycrc32(bodycrc32);

final countDownLatch countDownLatch = new countDownLatch(nameServerAddressList.size());

for (fina1 string namesrvAddr : nameserverAddressList) {

brokerouterExecutor.execute(new Runnab1e( ){

在registerBrokerAll中创建了请求header、封装请求体,

请求体创建之后,在 NameServerAddressList处遍历NameServer的地址,分别给每一个NameServer注册当前broker信息。

for (final string namesrvAddr : nameServerAddressList) {

brokerouterExecutor.execute( new Runnable( ) i

@override

public void run() {

try {

RegisterBrokerResult result = registprBroker(namesr

vAddr , oneway, timeoutMills,requestHeader, ody) ;

if ( result != null) {

registerBrokerResultList.add( result);

}

registerBroker通过remotingClient向NameServer发送请求,然后上报心跳数据,如果是oneway方式表示不接受返回值;如果是其他方式,以同步方式上报数据,会获得响应,之后会解析注册结果。

RemotingCommand response = this.remotingclient.invokeSync(namesrvAddr, request,timeoutNills);

assert response != null;

switch (response.getcode()) {

case Responsecode.success: {

RegisterBrokerResponseHeader responseHeader =(RegisterBrokerResponseHeader)response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);

RegisterBrokerResult result = new RegisterBrokerResult();

result.setMasterAddr(responseHeader.getMasterAddr());

result.setHaServerAddr(responseHeader.getHaServerAddr());if (response.getBody() != nul1) {

result.setKvTable(KVTable.decode(response.getBody(),KVTable.class));

}

return result;

}

default:

break;

}

小结:路由注册的入口在BrokerStartup里,在BrokerStartup里首先是创建BrokerController,BrokerController的创建需要brokerconfig、nettyserverconfig和nettyclientconfig配置类,并且设置端口号10911,然后获取namesrvAddr。Controller创建之后就要进行对其初始化,完成controller内部的赋值。然后再注册钩子函数,释放资源。整个controller创建完成之后进行启动,在启动时会调用controller的startup,在controller启动的位置会完成路由信息注册,broker会每隔30s上报心跳信息到NameServer。

相关文章
|
消息中间件 RocketMQ
RocketMQ报错:MQClientException:no route info of this topic的解决
RocketMQ报错:MQClientException:no route info of this topic的解决
760 0
|
Windows
ts-node : 无法加载文件 C:\Users\Dell\AppData\Roaming\npm\ts-node.ps1,因为在此系统上禁止运行脚本。有关详细信息
ts-node : 无法加载文件 C:\Users\Dell\AppData\Roaming\npm\ts-node.ps1,因为在此系统上禁止运行脚本。有关详细信息
490 0
|
9月前
|
SQL 存储 人工智能
Apache Flink 2.0.0: 实时数据处理的新纪元
Apache Flink 2.0.0 正式发布!这是自 Flink 1.0 发布九年以来的首次重大更新,凝聚了社区两年的努力。此版本引入分离式状态管理、物化表、流批统一等创新功能,优化云原生环境下的资源利用与性能表现,并强化了对人工智能工作流的支持。同时,Flink 2.0 对 API 和配置进行了全面清理,移除了过时组件,为未来的发展奠定了坚实基础。感谢 165 位贡献者的辛勤付出,共同推动实时计算进入新纪元!
1103 1
Apache Flink 2.0.0: 实时数据处理的新纪元
|
7月前
|
人工智能
我说魔,你说搭-魔搭AI视频宣传片挑战赛
当大家都喊魔塔的时候,我们决定搞个事情...有人管咱们叫"魔塔"?
270 4
|
11月前
|
人工智能 运维 监控
评测报告:AI驱动的操作系统服务套件体验
评测报告:AI驱动的操作系统服务套件体验
204 3
|
存储 网络安全 开发工具
【随笔】Gitee -- Gitee仓库创建 & SSH公钥生成 (二)
【随笔】Gitee -- Gitee仓库创建 & SSH公钥生成 (二)
|
Android开发
通译灵码,果然不凡
我是一名全栈开发工程师,使用通义灵码个人版IDEA/DevEcoStudio/Android Studio进行程序开发。通过安装灵码插件,开发效率提升了约40%。安装步骤:进入Setting->插件->搜索“lingma”->点击安装。
588 3
|
消息中间件 RocketMQ
2024最全RocketMQ集群方案汇总
在研究RocketMQ集群方案时,发现网上存在诸多不一致之处,如组件包含NameServer、Broker、Proxy等。通过查阅官方文档,了解到v4.x和v5.x版本的差异。v4.x部署模式包括单主、多主、多主多从(异步复制、同步双写),而v5.x新增Local与Cluster模式,主要区别在于Broker和Proxy是否同进程部署。Local模式适合平滑升级,Cluster模式适合高可用需求。不同模式下,集群部署方案大致相同,涵盖单主、多主、多主多从等模式,以满足不同的高可用性和性能需求。
1706 0
|
消息中间件 监控 网络安全
在RocketMQ中,生产者提交数据导致连接不上问题
【6月更文挑战第19天】在RocketMQ中,生产者提交数据导致连接不上问题
512 1
百度搜索:蓝易云【uniapp中uni-popup的用法】
通过按照以上步骤,你可以在UniApp中使用uni-popup组件来实现弹出层的功能。请根据你的具体需求,使用uni-popup提供的属性和方法来定制和控制弹出层的样式和行为。更详细的用法和配置可以参考UniApp官方文档中uni-popup的相关部分。
391 0