消息达到后实时推送机制|学习笔记

简介: 快速学习消息达到后实时推送机制

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息达到后实时推送机制】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12501


消息达到后实时推送机制


当开启长轮循之后会每隔五秒才去检查消息是否有新的消息到达。为了提高消息处理的一个实施性, RocketMQ 会在消息到达的时候也会去唤醒当前线程去做一次检查。那么消息到达之后,首先做的事情是进行消息的一个存储。this.reputMessageService.setReputFromoffset(maxPhysicalPosInLogicqueue);

this.reputNessageService.start();

在消息存储的这个类当中,有上面所示的两行代码,就是有这么一个 reputMessage Service。

然后进入到 start 的方法当中,代码如下:

public void start(){

Log.info(varl:"Try to start service thread;{} started:{} lastThread:{}"getServiceName (),started.get(), thread);

if(!started.compareAndSet( expect: false,update: true)) {

return;

}

stopped = false;

this.thread =new Thread(target:this, getServiceName());

this.thread.setDaemon(isDaemon);

this.thread.start();

}

进入这个 start 方法后,线程会被启动,然后会执行 run 方法,进入 run 方法,在run方法中,消息到达之后,在这里做doReput,然后我们看下面这段代码:

if(BrokerRole,SLAVE != DefaultMessageStore.this.getMessagestorecontig().getBrokerRole()  && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){

DefaultMessageStore.this.messageArrivingListener .arriving(dispatchRequest.getTopic(),

dispatchiequest.getQueueId(),logicOffset:dispatchRequest.getConsumeQueueOffset() dispatchRequest.getTagsCode( ), disggtchRequest.getStoreTimestamp().dispatchRequest,getBitMap(),dispatchRequest.getPropertiesMap());

这个 if 是判断,如果现在这个角色不是从节点,而是主节点,说明现在接收到了消息,它会去调用 messingarrivinglistener arriving 方法,那么这个方法的作用是什么?

看下图所示:

image.png它有一个结构的实现类是 notifymessagelistener,在notifymessagelistener类中回到了拉取请求后的Service,代码如下:

public void arriving(string topic,int queueId,long logicOffset,long tagsCode,

long msgStoreTime, byte[] filterBitMap,Map<String,String> properties) {

this.pullRequestHoldservice.notifyMessageArriving(topic,queueId,logicoffset,tagsCode,

mSgStoreTime,filterBitHap,properties);

在这个类中就是通知当前消息,如果有新的消息到达之后又回到如下代码的判断中,

if(match){

try{This.brokerController.getPullMessageProcessor().executeRequest.ggetClientChannel(),

Request.getRequestCommand());

}catch(Throwable e){

log.error(“execute request when wakeup failed.”,e);

}

continue;

}

这个消息如果是感兴趣(match),就通知客户端去开始做一个响应。如果不是,以下代码判断超时时间。

if(system.currentTimeMillis()>=(request.getSuspendTimestamp()+request.getTimeoutMillis())){

try{This.brokerController.getPullMessageProcessor().executeRequestwhenWakeup(request.getClientChannel(),

}catch(Throwable e){

log.error(“execute request when wakeup failed.”,e);

}

continue;

}

所以为了提高消息处理的一个实施性,代码:this.reputMessageService.start();

在消息到达之后。看它的 run 方法:

打开 doreput,然后在下面代码的位置做一个到达之后的一个通知。

if(BrokerRole,SLAVE != DefaultMessageStore.this.getMessagestorecontig().getBrokerRole()  && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){

DefaultMessageStore.this.messageArrivingListener .arriving(dispatchRequest.getTopic(),

dispatchiequest.getQueueId(),logicOffset:dispatchRequest.getConsumeQueueOffset() dispatchRequest.getTagsCode( ), disggtchRequest.getStoreTimestamp().dispatchRequest,getBitMap(),dispatchRequest.getPropertiesMap());

通知监听器,让他负责检查消息是不是感兴趣的,如果感兴趣,就直接返回。

image.png整个的这块就跟前面所讲的长轮询的机制合二为一。

相关文章
|
JavaScript 容器
Vue+Element UI
该博客文章介绍了如何在Vue中集成Element UI来构建后台管理系统的左侧菜单,包括使用`el-menu`、`el-submenu`和`el-menu-item`等组件,并通过Vue router动态构建菜单项及其路由设置。
|
12月前
|
存储 机器学习/深度学习 人工智能
《量子AI:突破量子比特稳定性与容错性的关键瓶颈》
量子计算的发展面临量子比特稳定性和容错性的关键挑战。量子纠错技术如表面码、Steane七量子比特颜色代码等,通过编码和解码提高可靠性。硬件设计选择超导或离子阱量子比特,结合低噪声器件减少干扰。量子噪声抑制技术优化环境,降低噪声影响。拓扑量子计算利用多体系统的拓扑性质实现天然容错。量子算法优化和AI技术助力,进一步提升抗干扰能力。尽管取得进展,但要实现大规模应用仍需克服诸多挑战。
297 13
|
运维 Prometheus 监控
运维自动化:提高IT效率的关键策略
在当今快速发展的IT领域,运维自动化已成为企业提升运营效率、降低错误率和成本的重要手段。随着云计算、大数据和人工智能技术的不断进步,实现运维流程的自动化不仅可行,而且变得日益重要。本文探讨了运维自动化的概念、关键技术及其在实际工作中的应用,旨在为IT专业人士提供一种高效管理和维护系统的方法。
|
机器学习/深度学习 人工智能 算法
为什么ChatGPT等AI大模型都是基于Python开发?
为什么ChatGPT等AI大模型都是基于Python开发?
465 0
|
前端开发
css用法 :is()、:where()和:has()的用法
【4月更文挑战第2天】 css用法 :is()、:where()和:has()的用法
299 12
|
SQL 负载均衡 网络协议
高性能数据访问中间件 OBProxy(四):一文讲透连接管理
引言上篇内容我讲到 OBProxy 的问题排查,将你在使用 OBProxy 时可能遇到的问题一一分析,并给出经过实践验证的解决方案。从本篇开始,我将介绍 OBProxy 在OceanBase分布式架构中的作用和原理,帮助你更透彻地了解OBProxy,实现“好用”和“用好”。同时,OBProxy 在上百家企业的持续运行,我积累了大量的工程实践经验,也将遇到的问题作为案例,伴随 OBProxy 的原理
789 100
高性能数据访问中间件 OBProxy(四):一文讲透连接管理
|
负载均衡 算法 网络协议
LVS、Nginx和HAProxy负载均衡器对比总结
LVS、Nginx和HAProxy负载均衡器对比总结
|
小程序
小程序一直未上架的原因及解决方案
小程序一直未上架的原因及解决方案
412 11
|
对象存储 CDN
阿里云账号怎么注册?手机号或扫码注册均可
阿里云账户注册支持多种方法,如手机号注册、账号注册、支付宝扫码注册、钉钉扫码注册和阿里云APP注册等,阿里云百科来详细说下阿里云账号注册方法:
1034 1
阿里云账号怎么注册?手机号或扫码注册均可
|
开发工具 git
Github Fork项目后如何与源主机代码保持更新同步
Github Fork项目后如何与源主机代码保持更新同步
956 0