精华推荐 | 【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的Broker服务端自动创建topic的原理分析和问题要点指

简介: 精华推荐 | 【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的Broker服务端自动创建topic的原理分析和问题要点指

前提背景


使用RocketMQ进行发消息时,一般我们是必须要指定topic,此外topic必须要提前建立,但是topic的创建(自动或者手动方式)的设置有一个开关autoCreateTopicEnable,此部分主要会在broker节点的配置文件的时候进行设置,运行环境中会使用默认设置autoCreateTopicEnable = true,但是这样就会导致topic的设置不容易规范管理,所以在生产环境中会在Broker设置参数autoCreateTopicEnable = false。


那么如果此参数稍有偏差,或者没有提前手动创建topic,则会频繁出现No route info of this topic这个错误,那么接下来我们探索一下此问题的出现原因以及系统如何进行创建topic。




No route info of this topic


相信做过RocketMQ项目的小伙伴们,可能对No route info of this topic一点都不陌生,说明的含义起始就是无法解析或者路由这个topic,但是造成的原因有很多种。



没有配置NameServer服务

Broker启动时我们没有配置NameSrv地址,发送程序会报错:No route info of this topic。但当我们配上NameSrv地址后,再次启动,可以正常发送消息。



没有建立autoCreateTopicEnable=true且没有创建该topic

当autoCreateTopicEnable=false时,DefaultMQProducerImpl.sendDefaultImpl,当发消息的时候肯定先要获取关于topic的一些信息,比如有几个消息队列,是不时有序topic,有这个topic的Broker列表等,当获取不到正确的信息时,就会抛出异常



RocketMQ的客户端版本与服务端版本不一致


RocketMQ Java客户端调用No route info of this topic错误(原因版本不一致)。此时,即使启动broker的时候设置autoCreateTopicEnable=true也没有用,假如,使用的rocketmq的版本是4.9.0,java client端版本4.3.0

image.png

RocketMQ 4.3.0版本的自动创建(autoCreateTopicEnable),客户端传递使用的AUTO_CREATE_TOPIC_KEY_TOPIC是”AUTO_CREATE_TOPIC_KEY”,新版本的client,客户端传递的默认AUTO_CREATE_TOPIC_KEY_TOPIC是“TBW102”。


org.apache.rocketmq.client.producer.DefaultMQProducer#createTopicKey
org.apache.rocketmq.common.MixAll#AUTO_CREATE_TOPIC_KEY_TOPIC
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
复制代码

实际代码


> 4.4.0版本

image.png

<=4.3.0版本

image.png




方案1:要不就进行调整client客户端版本的version

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.1</version>
</dependency>
复制代码


方案2:调整自动创建代码为AUTO_CREATE_TOPIC_KEY

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unidcque_group_name");
//设置自动创建topic的key值
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
复制代码


Topic之前并未创建过,Broker未配置NameSrv地址,无法发送,而配置NameSrv后则可以正常发送。这中间有2个问题: 1、topic是怎么自动创建的? 2、topic自动创建过程中Broker、NameSrv如何协作配合的?




分析以下如何自动创建topic的源码流程


RocketMQ基本路由规则

image.png

  1. Broker在启动时向Nameserver注册存储在该服务器上的路由信息,并每隔30s向Nameserver发送心跳包,并更新路由信息。 Nameserver每隔10s扫描路由表,如果检测到Broker服务宕机,则移除对应的路由信息。


  1. 消息生产者每隔30s会从Nameserver重新拉取Topic的路由信息并更新本地路由表;在消息发送之前,如果本地路由表中不存在对应主题的路由消息时,会主动向Nameserver拉取该主题的消息。


  1. 如果autoCreateTopicEnable设置为true,消息发送者向NameServer查询主题的路由消息返回空时,会尝试用一个系统默认的主题名称(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC),此时消息发送者得到的路由信息为:




默认Topic的路由信息是如何创建的?


Nameserver?broker?当autoCreateTopicEnable=false时,DefaultMQProducerImpl.sendDefaultImpl,当发消息的时候肯定先要获取关于topic的一些信息,比如有几个消息队列,是不时有序topic,有这个topic的Broker列表等,当获取不到正确的信息时,就会抛出异常


private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        // 如果获取到topic的路由信息,则发送,否则抛异常
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
           ... ...
        }
        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
        }
        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }
复制代码

tryToFindTopicPublishInfo是发送的关键,如果获取到topic的信息,则发送,否则就异常;因此之前No route info of this topic的异常,就是Producer获取不到Topic的信息,导致发送失败。



先从topicPublishInfoTable缓存中获取


private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // topicPublishInfoTable是Producer本地缓存的topic信息表
    // Producer启动后,会添加默认的topic:TBW102
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 未获取到,从NameSrv获取该topic的信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
    // 获取到了,则返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        // 没获取到,再换种方式从NameSrv获取
        // 如果再获取不到,那后续就无法发送了
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
复制代码
  1. Producer本地topicPublishInfoTable变量中没有topic的信息,只缓存了TBW102。


  1. 尝试从NameSrv获取Topic的信息。获取失败,NameSrv中根本没有Topic,因为这个topic是Producer发送时设置的,没有同步到NameSrv。


  1. 再换种方式从NameSrv获取,如果获取到了,那么可以执行发送流程,如果还是没有获取到,就会抛No route info of this topic的异常了。




再从NameServer服务中进行获取


public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
}
复制代码
  1. 第1次获取时,isDefault传的false,defaultMQProducer传的null,因此在updateTopicRouteInfoFromNameServer会走else分支,用Topic去获取
  2. 第2次获取时,isDefault传的true,defaultMQProducer也传值了,因此会走if分支,将入参的topic转换为默认的TBW102,获取TBW102的信息
  3. 不管Broker配没配NameSrv地址,获取Topic的信息,必失败
  4. 获取TBW102信息:


  • 2.1 Broker配置了NameSrv地址,成功
  • 2.2 Broker没有配置NameSrv地址,失败



生产者首先向NameServer查询路由信息,由于是一个不存在的主题,故此时返回的路由信息为空,RocketMQ会使用默认的主题再次寻找,由于开启了自动创建路由信息,NameServer会向生产者返回默认主题的路由信息。


然后从返回的路由信息中选择一个队列(默认轮询)。消息发送者从Nameserver获取到默认的Topic的队列信息后,队列的个数会改变吗?


从NameServer中获取,注意这个isDefault=false,defaultMQProducer=null


温馨提示:消息发送者在到默认路由信息时,其队列数量,会选择DefaultMQProducer#defaultTopicQueueNums与Nameserver返回的的队列数取最小值,DefaultMQProducer#defaultTopicQueueNums默认值为4,故自动创建的主题,其队列数量默认为4。




获取消息对应的topic信息


发请求RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader),但是因为没有任何一个Broker有关于这个topic的信息,所以namesrv就会返回topic不存在,处理请求的代码在DefaultRequestProcessor的。


case RequestCode.GET_ROUTEINTO_BY_TOPIC:  return this.getRouteInfoByTopic(ctx, request);
复制代码

也就是回应码ResponseCode.TOPIC_NOT_EXIST,然后抛出异常 throw new MQClientException(response.getCode(), response.getRemark());被捕获之后退出返回false。




从NameServer获取相关的Topic信息数据


updateTopicRouteInfoFromNameServer最终会发给NameSrv一个GET_ROUTEINTO_BY_TOPIC请求

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                } catch (Exception e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }
        return false;
    }
复制代码



因为if条件不满足,所以获取默认的topic信息,注意isDefault=true,defaultMQProducer=defaultMQProducer

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
}
复制代码




默认的topic为"TBW102",这个时候如果namesrv中如果还是没有这个topic的信息的话,就会抛出异常No route info of this topic。 autoCreateTopicEnable=true的作用。




Broker启动流程自动创建topic


  • 在Broker启动流程中,会构建TopicConfigManager对象,其构造方法中首先会判断是否开启了允许自动创建主题,如果启用了自动创建主题,则向topicConfigTable中添加默认主题的路由信息。


  • 当Broker启动时,TopicConfigManager初始化,这里会判断该标识,创建TBW102topic,并且在后续的心跳中把信息更新到namesrv中,这样在发消息的时候就不会抛出不存在的异常。
// MixAll.DEFAULT_TOPIC
            if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                String topic = MixAll.DEFAULT_TOPIC;
                TopicConfig topicConfig = new TopicConfig(topic);
                this.systemTopicList.add(topic);
                topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                    .getDefaultTopicQueueNums());
                topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                    .getDefaultTopicQueueNums());
                int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
                topicConfig.setPerm(perm);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }
复制代码

该topicConfigTable中所有的路由信息,会随着Broker向Nameserver发送心跳包中,Nameserver收到这些信息后,更新对应Topic的路由信息表。


BrokerConfig的defaultTopicQueueNum默认为8。两台Broker服务器都会运行上面的过程,故最终Nameserver中关于默认主题的路由信息中,会包含两个Broker分别各8个队列信息。



TopicConfigManager构造方法


当从namesrv查出Topic相关的信息时,在topicRouteData2TopicPublishInfo设置消息队列数量 info.getMessageQueueList().add(mq);,调用updateTopicPublishInfo方法更新缓存topicPublishInfoTable

// Update Pub info
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }
复制代码

然后if (topicPublishInfo != null && topicPublishInfo.ok()) 这个条件就会符合,那个异常就不会抛出。



当autoCreateTopicEnable=false时


  1. 创建topic的类UpdateTopicSubCommand(),设置相应的信息,最后调用defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
  2. 发消息RequestCode.UPDATE_AND_CREATE_TOPIC,AdminBrokerProcessor处理消息 case RequestCode.UPDATE_AND_CREATE_TOPIC: return this.updateAndCreateTopic(ctx, request);
  3. 同步给其他Broker

this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.registerBrokerAll(false, true);
复制代码




Broker端收到消息后的处理流程


服务端收到消息发送的处理器为:SendMessageProcessor,在处理消息发送时,会调用super.msgCheck方法:


AbstractSendMessageProcessor#msgCheck


在Broker端,首先会使用TopicConfigManager根据topic查询路由信息,如果Broker端不存在该主题的路由配置(路由信息),此时如果Broker中存在默认主题的路由配置信息,则根据消息发送请求中的队列数量,在Broker创建新Topic的路由信息。这样Broker服务端就会存在主题的路由信息。


在Broker端的topic配置管理器中存在的路由信息,一会向Nameserver发送心跳包,汇报到Nameserver,另一方面会有一个定时任务,定时存储在broker端,具体路径为${ROCKET_HOME}/store/config/topics.json中,这样在Broker关闭后再重启,并不会丢失路由信息。




TBW102是为何物?


TBW102是Broker启动时,当autoCreateTopicEnable的配置为true时,会自动创建该默认topic。

public TopicConfigManager(BrokerController brokerController) {
    this.brokerController = brokerController;
    // ...
    {
        // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
        if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
            String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
            TopicConfig topicConfig = new TopicConfig(topic);
            this.systemTopicList.add(topic);
            topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                .getDefaultTopicQueueNums());
            topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                .getDefaultTopicQueueNums());
            int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
            topicConfig.setPerm(perm);
            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        }
    }
    // ...
}
复制代码

autoCreateTopicEnable的默认值是true,可以同步外部配置文件,让Broker启动时加载,来改变该值。我理解的TBW102的作用是当开启自动创建topic功能,发送时用了未配置的topic,可以让该topic继承默认TBW102的配置,实现消息的发送。




总结分析


  1. client本地首先没有缓存对应topic的路由信息,然后先去nameserver去查找,nameserver中也没有此topic的路由信息,然后返回给client。client接收到返回后再向nameserver请求topic为tbw102的路由信息。


  1. 如果有broker设置了autocreateTopic,则broker在启动的时候会在topicManager中创建对应的topicconfig通过心跳发送给nameserver,namerserver会将其保存。nameserver将之前保存的tbw102的路由信息返回给请求的client。


  1. client拿到了topic为tbw102的路由信息后返回,client根据返回的tbw102路由信息(里面包含所有设置了autocreateTopic为true的broker,默认每个broker会在client本地创建DefaultTopicQueueNums=4个读写队列选择,假设两个broker则会有8个队列让你选择)先缓存到本地的topicPublishInfoTable表中,key为此topic ,value为此topicRouteData,轮询选择一个队列进行发送。

image.png

根据选择到的队列对应的broker发送该topic消息。


broker在接收到此消息后会在msgcheck方法中调用createTopicInSendMessageMethod方法创建topicConfig信息塞进topicConfigTable表中,然后就跟发送已经创建的topic的流程一样发送消息了。

同时topicConfigTable会通过心跳将新的这个topicConfig信息发送给nameserver。


nameserver接收到后会更新topic的路由信息,如果之前接收到消息的broker没有全部覆盖到,因为broker会30S向nameserver发送一次心跳,心跳包里包含topicconfig,覆盖到的broker会将自动创建好的topicconfig信息发送给nameserver,从而在nameserver那边接收到后会注册这个新的topic信息,因为消费者每30S也会到nameserver去更新本地的topicrouteinfo,请求发送到nameserver得到了之前覆盖到的broker发送的心跳包更新后的最新topic路由信息,那么未被覆盖的broker就永远不会加入到这个负载均衡了,就会造成负载均衡达不到预期了,即所有能自动创建topic的broker不能全部都参与进来。




相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 存储 缓存
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
358 7
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
|
3月前
|
消息中间件 负载均衡 算法
聊聊 RocketMQ中 Topic,Queue,Consumer,Consumer Group的关系
本文详细解析了RocketMQ中Topic、Queue、Consumer及Consumer Group之间的关系。文中通过图表展示了Topic可包含多个Queue,Queue分布在不同Broker上;Consumer组内多个消费者共享消息;并深入探讨了集群消费与广播消费模式下Queue与Consumer的关系,以及Rebalancing机制在实例增减时如何确保负载均衡。理解这些关系有助于更好地掌握RocketMQ的工作原理,提升系统运维效率。
622 2
|
3月前
|
消息中间件 数据采集 中间件
RabbitMQ的使用—实战
RabbitMQ的使用—实战
129 0
|
4月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
213 0
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
96 2
|
5月前
|
消息中间件 SQL 监控
RocketMQ 5.3.0 版本中 Broker IP 配置为 IPv6 的情况
【8月更文第28天】RocketMQ 是一款分布式消息中间件,支持多种消息发布和订阅模式。在 RocketMQ 5.3.0 版本中,Broker 的配置文件 `broker.conf` 允许配置 IPv6 地址。当 Broker 的 `brokerIP1` 配置为 IPv6 地址时,会对 Broker 的启动、消息推送和状态监控等方面产生影响。本文将探讨如何在 RocketMQ 中配置 IPv6 地址,并检查 Broker 的状态。
321 0
|
6月前
|
消息中间件 存储 Java
消息队列 MQ使用问题之如何将RocketMQ中某个集群的topic迁移到另一个集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
796 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67805 2
3 张图带你彻底理解 RocketMQ 事务消息
|
消息中间件 Java uml
5张图带你理解 RocketMQ 顺序消息实现机制
5张图带你理解 RocketMQ 顺序消息实现机制
727 1
5张图带你理解 RocketMQ 顺序消息实现机制