消息发送3-选择队列|学习笔记

简介: 快速学习消息发送3-选择队列

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息发送3-选择队列】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12473


消息发送3-选择队列


选择队列

第二步查找路由已经完成,确定路由就是确定要给哪个broker发送消息,broker中有需要messageQueue,所以第三步是查找当前路由中给哪个messageQueue发送信息

DefaultMQProducerImpl代码:

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

selectOneMessageQueue传递了两个参数:topicPublishInfo, lastBrokerName。

lastBrokerName是上一次发送的broker,作用是负载均衡。

点击selectOneMessageQueue,进入MQFaultStrategy代码:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo,final String lastBrokerWame){

if (this.sendLatencyFaultEnable) {

注:sendLatencyFaultEnable判断的是一个布尔类型的参数,默认是FALSE。在进行消息发送时,如果消息发送失败,它有一个默认延迟发送的机制,这功能默认是关闭的。

在默认FALSE的情况下,

return tpInfo.selectoneMessageQueue( lastBrokerName);

点进TopicPublishInfo:

public MessageQueue selectoneMessageQueue() {

//维护索引index

int index = this.sendwhichQueue.getAndIncrement();

//使用索引对messageQueueList取模

int pos = Math.abs(index) % this.messageQueueList.size();

if (pos < 0)

pos =0 ;

//利用pos获取队列

例:假如有一个索引,如果第一次为0,使用0对整个MessageQueue长度进行取模,得到0,返回的是第一个,做完之后要Increment。第二次进行取模,1对整个取模取的是第二个。以此类推,索引加到2,2对长度4取模选择的是第二个;如果索引是3,3对整个长度取模取的是第三个。

image.pngreturn this.messageQueueList.get(pos);

}

如果broker延迟机制默认FALSE,则是依次从broker队列取出selectoneMessageQueue并返回,在TopicPublishInfo中依次利用索引对selectoneMessageQueue长度进行取模;

如果lastBrokerNmae不为空(默认不启用broker故障延迟机制):

public MessageQueue selectoneMessageQueue(final string lastBrokerName) {

//第一次选择队列

if (lastBrokerName == null) {

return selectoneMessageQueue(;

} else {

// sendwhichQueue

int index = this.sendwhichQueue. getAndIncrement(;

//遍历消息队列集合

for (int i = 0; i < this.messageQueueList.size(); i++) {

// sendwhichQueue自增后取模

int pos = Math.abs(index++) % this.messageQueueList.size(;

if (pos < 0)

pos = 0;

//规避上次Broker队列,如果二者不同,则进行返回

MessageQueue mq = this.messageQueueList.get(pos);

if ( ! mq. getBrokerName().equals(lastBrokerName)) {

注:这行代码很重要,找到MessageQueue的broker如果和当前lastBrokerName一样,就不进行返回,如果不一样,返回。

return mq;

}

}

//如果以上情况都不满足,返回sendwhichQueue取模后的队列

Return selectOneMessageQueue();

}

public MessageQueue selectoneMessageQueue() {

int index = this.sendwhichQueue.getAndIncrement();

int pos = Math.abs(index) % this.messageQueueList.size();

if (pos < 0)

pos =0 ;

return this.messageQueueList.get(pos) ;

}

如果将sendLatencyFaultEnable默认FALSE改为TRUE:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo,final String lastBrokerWame){

try {

//根据索引在TopicPIblishInfo中选择一个队列

int index = tpInfo.getsendwhichQueue().getAndIncrement();

for (int i = o; i < tpInfo.getlMessageQueueList().size(); i++) {

int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();

if (pos < o)

pos = 0;

//队列选择后,校验当前broker是否合法

MessageQueue mq = tpInfo.getMessageQueueList().get(pos ) ;

if (latencyFaultTolerance.isAvailable(mq.getBrokerName( ))) {

//可用

if (null == lastBrokerName || mq.getBrokerName( ).equals(lastBrokerName))

return mq;

}

}

如何通过latencyFaultTolerance进行校验:

代码:LatencyFaultToleranceImpl

@override

public boolean isAvailable(final string name) {

final Faultitem faultItem = this.faultItemTable.get(name) ;if(faultItem != null) {

return faultItem.isAvailable();

}

return true;

}

在LatencyFaultToleranceImpl类中,维护了所有失败的broker表,如果根据name找到faultItem,代表当前不可用;如果没有找到,代表可用,直接取出返回;

如果不可用:

//从LatencyFaultToleranceImpl中的缓存表中取出一个相对较好的broker去发送

final string notBestBroker = latencyFaultTolerance.pickOneAtLeast();

//获得broker的写队列集合

int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);

if (writeQueueNums > 0) {

//获得一个队列,指定broker和队列ID并返回

final MessageQueue mq = tpInfo.selectoneMessageQueue();

if (notBestBroker != null) {

mq.setBrokerName( notBestBroker);

mq.setQueueId(tpInfo.getsendwhichQueue( ).getAndIncrement() % writeQueueNums);

}

return mq;

}

如果打开broker失败延迟机制,过程是先取出,如果broker是可用的,直接返回;如果不可用,代表曾经对broker发送失败,解决策略是从faultItemTable中取出一个相对来说比较好的broker,然后取出写队列,封装一个MessageQueue并返回。

如果没有打开broker失败延迟的机制,则按照索引对messageQueue进行取模,最终再进行返回。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
人工智能 Ubuntu IDE
【Python】基础:环境配置与基础语法
本文介绍了Python编程语言及其环境配置方法。Python由Guido van Rossum于1991年创建,以其简洁、易学和强大的功能著称。文章详细讲解了Python的主要特点、Windows和Ubuntu下的安装配置步骤、基础语法、控制流、函数、文件操作、模块使用及面向对象编程等内容,帮助读者快速入门Python编程。
368 4
|
3月前
|
人工智能 JSON API
淘宝/天猫:使用物流查询API实时显示包裹位置,减少客服咨询量
在电商竞争激烈的环境下,淘宝、天猫通过集成物流查询API,实现实时追踪包裹位置,显著减少用户咨询量。本文解析其原理、实现步骤与效益,展示如何以技术手段提升用户体验、降低客服压力,助力平台高效运营。(238字)
321 0
|
Java Linux Android开发
移动应用开发与操作系统的交互:深入理解Android和iOS
在数字时代,移动应用成为我们日常生活的一部分。本文将深入探讨移动应用开发的核心概念、移动操作系统的工作原理以及它们如何相互作用。我们将通过实际代码示例,展示如何在Android和iOS平台上创建一个简单的“Hello World”应用,并解释其背后的技术原理。无论你是初学者还是有经验的开发者,这篇文章都将为你提供有价值的见解和知识。
|
传感器 物联网 测试技术
基于物联网技术的养老院管理系统设计与实现_kai
基于物联网技术的养老院管理系统设计与实现_kai
基于物联网技术的养老院管理系统设计与实现_kai
提升个人工作技能
提升个人工作技能
1242 6
|
C++
JNI Log 日志输出
JNI Log 日志输出
329 1
|
SQL Java 数据库连接
18个 SpringBoot项目中遇到的BUG,你试试 上
18个 SpringBoot项目中遇到的BUG,你试试 上
546 0
18个 SpringBoot项目中遇到的BUG,你试试   上
|
前端开发 数据库
R语言基于Bootstrap的线性回归预测置信区间估计方法
R语言基于Bootstrap的线性回归预测置信区间估计方法
|
存储 监控 算法
探索 Java JVM:深入了解虚拟机的工作原理与优化
Java 虚拟机(JVM)是 Java 语言的核心组成部分,它在代码编译和运行过程中发挥着重要作用。理解 JVM 的工作原理和优化策略对于开发高效、稳定的 Java 应用至关重要。本文将深入探讨 JVM 的工作原理、主要组成部分和性能优化策略,帮助您更好地理解 JVM 在 Java 开发中的关键地位。
|
人工智能 算法 开发工具
【视觉智能AI场景解决方案——AI智慧运动】
  随着全民健身热潮的提升,智慧健身运动随着数字化新技术的进步,以及在运动健身领域的应用逐渐趋于成熟,智能运动健身将为传统运动健身提供更多新的方向和玩法,满足不同项目爱好者的健身需求。随着AI运动健身技术的进一步普及与应用,基于ai的智慧健身运动技术未来可打造的场景化空间会越来越多,体育运动与科技娱乐,智慧健身运动在线上体育行业未来会创新运动场景,丰富运动体验,提升竞技娱乐性,推动全民健身走向新的高度。
1658 3
【视觉智能AI场景解决方案——AI智慧运动】