消息发送2-消息路由查找|学习笔记

简介: 快速学习消息发送2-消息路由查找

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息发送2-消息路由查找】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12472


消息发送2-消息路由查找


查找路由

查找路由的目的是要知道当前的消息发给哪个broker,找到broker之后选择队列发送消息。

DefaultMQProducerlmpl中消息发送的全部业务逻辑:

private sendResult sendbefaultimpk

Message msg,

final communicationtode communicationMode,

final Sendcal1back sendcal1back,

final long timeout

)throws wQclientException,RemotingException,MQBrokerException,InterruptedException {

this.makesurestateOK(;

validators.checkMessage(msg, this.dofaultNQProducer);

final long invokeID = random.nextLong(;

long beginTimestampFirst - system.currentTimeMillis();

long beginTimestampPrev - beginTimestampFirst;

long endTimestamp - beginTimestampFirst;

//根据主题查找当前要发送的broker的信息,最后返回TopicpublishInfo

TopicpublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

if (topicPublishInfo != nul1 && topicPublishInfo.ok()) {

boolean callTimeout = false;

MessageQueue mq = null;

Exception exception = null;

sendResult sendResult = null;

int timesTotal = communicationMode =m CommunicationMode.sYNc ? 1 + this.defaultMQProducor

int times = 0;

string[brokerssent = new String[timesTotal];

for (; times < timesTotal; timesi) {

在TopicpublishInfo中,有messageQulueList,通过它进行负载均衡。

public class TopicPublishInfo i

private boolean orderTopic = false;

private boolean haveTopicRouterInfo = false;

private List<MessageQueue> messageQulueList = new ArrayList<~>( );

private volatile ThreadLocalIndex sendwhichQueue = new ThreadLocalIndex();

private TopicRouteData topicRouteData;

在DefaultMQProducerlmpl中如何进行路由查找:

image.png生产者为了提高发送的效率,不会每一次都去NameServer当中根据主题查找路由信息,而是在本地缓存路由表,如果没有查到,才会在NameServer中去查找,如果当前已经查到了TopicPublishInfo在当前的缓存表中查到了,就直接返回。

private TopicPublishInfo tryToFindTopicPublishInfo(final string topic){

//从缓存中获得主题的路由信息

TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);

//路由信息为空,则从NameServer获取路由;不为空,直接返回topicPublishInfo

if (null == topicPublishInfo || !topicPublishInfo.ok()) {

this.topicPublishInfoTable.putIfAbsent(topic,new TopicPublishInfo());this.mQclientFactory.updateTopicRouteInfoFromNameserver(topic);

topicPublishInfo = this.topicPublishInfoTable.get(topic);

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()){

return topicPublishInfo;

}else {

//如果未找到当前主题的路由信息,则用默认主题继续查找this.mQclientFactory.updateTopicRouteInfoFromNameServer(topic,isDefault: true,this.defaultNQProducer);

topicPublishInfo = this.topicPublishInfoTable.get(topic);

return topicPublishInfo;

}

}

缓存表样式如下,key是topic,value是PublihInfo发送路由信息:

private final ConcurrentMap<string/*topic*/, TopicPublihInfo>topicPublishInfoTable= new concurrentHashMap<~>();

MQClientInstance是向外发送请求的类,从NameServer获取路由是调用MQClientInstance完成的,请求的代码过程:

//如果当前未指定主题,使用的就是默认主题

if (isDefault &&defaultMQProducer!= nul1) i

topicRouteData = this.mClientAPIImp1.getDefaultTopicRuteInfoFromNameServer(defaultMQProducer.getCreateTopic.

timeoutMillis: 1000* 3);

if (topicRouteData != null) {

for (QueueData data : topicRouteData.getQueueDatas()) {

int queueNums = Math.min(defaultwQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());

data.setReadQueueNums(queueNums ) ;

data.setwriteQueueNums(queueNums );

}

}

} else {

topicRouteData = this.mQclientAPIImpl.getTopicRouteInfoFromNameServer(topic,timeoutilli:1000 * 3);

注:一般发送消息会设置主题,所以运行getTopicRouteInfoFromNameServer,在MQClientAPIImpl中请求NameServer,获取当前主题所对应的路由信息。

完成调用之后,会返回topicRouteData主题路由数据,后判断topicRouteData:

//如果数据不为空,根据主题从本地缓存的表查询当前主题原先的路由信息,比较原来路由信息和当前路由信息是否一致

if (topicRouteData != null){

TopicRouteData old = this.topicRouteTable.get(topic);

boolean changed = topicRouteDataIsChange(old,topicRouteData);

if ( !changed) {

changed = this.isNeedupdateTopicRouteInfo(topic);

} else {

log.info( varl: "the topic[{]] route info changed,old[(}] ,new[()]", topic,old,topicRouteData);

}

//如果不一致,对本地路由表进行更新操作

比较之后会返回布尔值,如果是true代表变化了,变化之后在以下代码位置做更新操作,把本地路由表去做更新,这样下一次再去发送消息时,就可以直接从本地获取。

if ( changed) {

TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

for (BrokerData bd : topicRouteData.getBrokerDatas()){this.brokerAddrTable.put(bd.getBrokerName()bd.getBrokerAddrs( ));

}

//update pub info 将topicRouteData转换为topicPublishInfo

{

TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);

publishInfo.setHaveTopicRouterInfo(true);

Iterator<Entry<String,MQProducerInner>> it = this.producerTable.entrySet().iterator();

while (it.hasNext()){

Entry<string,MQProducerInner> entry = it.next();

MQProducerInner impl = entry.getvalue();

if ( impl != null) {

impl.updateTopicPublishInfo(topic, publishInfo);

}

如上图表,返回路由信息下面是判断是否需要更新本地路由表,其中类名为TopicRouteData,但整个方法叫TopicPublicInfo,最终是用这个类。

从NameServer中查出的类是TopicRouteData,最终在消息发送中用的类TopicPublicInfo,所以重要代码:TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);将topicRouteData中的数据取出放到TopicPublishInfo 里,这就是最终消息生产者在发送消息时从TopicPublishInfo去查找路由信息。

主要流程:在发送消息时,根据主题查找路由信息,producer内部维护了一个自己的路由表,首先会在它本地缓存中查找,如果缓存中没有信息,则请求NameServer,请求NameServer借助的是MQClientInstance。查找结束,返回路由信息,之后检查是否需要更新本地缓存路由,最后将topicRouteData转换为topicPublishInfo。

TopicpublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());这一步执行完成之后,producer查找路由就结束了。

相关文章
|
3天前
|
数据采集 人工智能 安全
|
13天前
|
云安全 监控 安全
|
5天前
|
自然语言处理 API
万相 Wan2.6 全新升级发布!人人都能当导演的时代来了
通义万相2.6全新升级,支持文生图、图生视频、文生视频,打造电影级创作体验。智能分镜、角色扮演、音画同步,让创意一键成片,大众也能轻松制作高质量短视频。
1091 152
|
18天前
|
机器学习/深度学习 人工智能 自然语言处理
Z-Image:冲击体验上限的下一代图像生成模型
通义实验室推出全新文生图模型Z-Image,以6B参数实现“快、稳、轻、准”突破。Turbo版本仅需8步亚秒级生成,支持16GB显存设备,中英双语理解与文字渲染尤为出色,真实感和美学表现媲美国际顶尖模型,被誉为“最值得关注的开源生图模型之一”。
1760 9
|
10天前
|
人工智能 自然语言处理 API
一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸
一句话生成拓扑图!next-ai-draw-io 结合 AI 与 Draw.io,通过自然语言秒出架构图,支持私有部署、免费大模型接口,彻底解放生产力,绘图效率直接爆炸。
697 152
|
12天前
|
人工智能 安全 前端开发
AgentScope Java v1.0 发布,让 Java 开发者轻松构建企业级 Agentic 应用
AgentScope 重磅发布 Java 版本,拥抱企业开发主流技术栈。
661 13
|
6天前
|
SQL 自然语言处理 调度
Agent Skills 的一次工程实践
**本文采用 Agent Skills 实现整体智能体**,开发框架采用 AgentScope,模型使用 **qwen3-max**。Agent Skills 是 Anthropic 新推出的一种有别于mcp server的一种开发方式,用于为 AI **引入可共享的专业技能**。经验封装到**可发现、可复用的能力单元**中,每个技能以文件夹形式存在,包含特定任务的指导性说明(SKILL.md 文件)、脚本代码和资源等 。大模型可以根据需要动态加载这些技能,从而扩展自身的功能。目前不少国内外的一些框架也开始支持此种的开发方式,详细介绍如下。
452 5