Giraph源码分析(四)—— Master 如何检查Worker启动成功

简介: 本文的目的说明Giraph如何借助ZooKeeper来实现Master与Workers间的同步(不太确定)。环境在单机上(机器名:giraphx)启动了2个workers。Giraph遵从单Master多Workers结构,BSPServiceMaster使用MasterThread线程来进行全局的同步。

本文的目的

说明Giraph如何借助ZooKeeper来实现Master与Workers间的同步(不太确定)。

环境

在单机上(机器名:giraphx)启动了2个workers。

Giraph遵从单Master多Workers结构,BSPServiceMaster使用MasterThread线程来进行全局的同步。每个Worker启动成功后,会向Master汇报自身的健康状况,那么Master是如何检测Workers是否都成功启动了?

Master在ZooKeeper上创两个目录,_workerHealthyDir和 _workerUnhealthyDir,分别用来记录Healthy Workers和UnHealthy Workers。

主要在BspServiceMaster类中的getAllWorkerInfos()方法来完成,其调用关系如下,注意下getAllWorkerInfos()到MasterThread.run()方法调用关系,比较难找。

创建的两个目录如下:

/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerUnhealthyDir

每个Worker在setup()中,调用registerHealth()方法来注册自身的状态。

若自身是Healthy的,则在_workerHealthyDir目录下添加子节点 /wokerInfo.getHostNameId(),否则在_workerUnhealthyDir目录下添加。wokerInfo.getHostNameId()为:Hostname+“_”+TaskId。 Task1和Task2 (Task 0是master) 创建的子节点如下:

/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_1
/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_2

Master 在checkWorkers()方法中,在While死循环中(实际有超时限制),通过调用getAllWorkerInfos()方法来获取_workerHealthyDir目录下的子节点,然后比较子节点数目是否达到maxWorkers(启动job时定义的,-w参数)。

若小于maxWorkers,则继续调用getAllWorkerInfos()方法进行下一轮检测;若等于maxWorker,退出While循环,然后返回healthyWorkersInfoList:[Worker(hostname=giraphx, MRtaskID=1, port=30001), Worker(hostname=giraphx, MRtaskID=2, port=30002)] 。

问题:由于在分布式环境中,每个Worker和Maste都是并行运行,彼此不知道对方的运行情况。上述第3步骤中,若还有子节点还没有创建,就一直在while死循环中调用来检测getAllWorkerInfos()方法检测,效率比较低下,当然也比较笨!

Giraph借用ZooKeeper来高效的进行检测。设计理念如下:

  1. master在获取子节点时,注册Watcher(为注册器,用于触发相应事件)。

若某个task创建了子节点后,就会触发Watcher事件。

若子节点数目小于maxWorkers,就调用 workerHealthRegistrationChanged的await()方法释放当前线程的锁,陷入等待状态。不会进行无用的检测。

说明:workerHealthRegistrationChanged为PredicateLock类型(implements BspEvent接口),PredicateLock里面使用可重入锁 ReentrantLock和Condition进行线程的控制。

当某个task创建了子节点后,触发Watcher事件。

调用BspService中的public final void Process(WatchedEvent event)事件,该方法根据事件的路径来激活相应的BspEvent事件。此处对应的是:

实验运行如下:

s(926)) - process: Got a new event, path = /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir, type = NodeChildrenChanged, state = SyncConnected INFO bsp.BspService (BspService.java:process(960)) - process: workerHealthRegistrationChanged (worker health reported - healthy/unhealthy )

这样就会激活master线程,开始下一轮检测。

子节点数目等于maxWorkers时,就停止。

总结:每创建一个子节点时,才会进行一次检测,效率较高!

相关文章
|
2月前
|
NoSQL Java 编译器
c++开发redis module问题之保证Redis在fork时没有处于inflight状态的命令,如何解决
c++开发redis module问题之保证Redis在fork时没有处于inflight状态的命令,如何解决
|
2月前
|
网络协议 小程序 物联网
Gateway-Worker启动失败或者启动无法正常使用的几种方法
Workerman是一款开源高性能异步PHP socket即时通讯框架。支持高并发,超高稳定性,被广泛的用于手机app、移动通讯,微信小程序,手游服务端、网络游戏、PHP聊天室、硬件通讯、智能家居、车联网、物联网等领域的开发。 支持TCP长连接,支持Websocket、HTTP等协议,支持自定义协议
67 3
|
4月前
|
NoSQL Redis 数据库
Redis server启动后会做哪些操作?
Redis server启动后会做哪些操作?
|
10月前
解决es集群启动完成后报master_not_discovered_exception
解决es集群启动完成后报master_not_discovered_exception
982 0
|
设计模式 分布式计算 Scala
Spark Master 和 Worker 项目需求 | 学习笔记
快速学习 Spark Master 和 Worker 项目需求
Spark Master 和 Worker 项目需求 | 学习笔记
|
分布式计算 安全 Scala
Master 检测心跳并删除超时的 Worker | 学习笔记
快速学习 Master 检测心跳并删除超时的 Worker
Master 检测心跳并删除超时的 Worker | 学习笔记
|
网络协议 Linux Scala
指定 Master 与 Worker 的启动参数 | 学习笔记
快速学习指定 Master 与 Worker 的启动参数
|
存储 算法 Unix
bthread源码剖析(四): 通过ParkingLot实现Worker间任务状态同步
通过之前的文章我们知道TaskGroup(以下简称TG)是在死循环等待任务,然后切换栈去执行任务。在当前TG没有任务的时候会进行“工作窃取”窃取其他TG的任务。在没有任务的时候TG会“休眠”,当任务出现的时候被唤醒然后消费。
305 0
|
SQL XML 存储
MSSQL - 应用案例 - Event Notification + Service Broker构建死锁自动收集系统
--- title: MSSQL - 应用案例 - Event Notification + Service Broker构建死锁自动收集系统 author: 风移 --- # 摘要 这篇文章介绍SQL Server的一个典型的应用案例,即如何利用Event Notification与Service Broker技术相结合来实现死锁信息自动收集系统。通过这个系统,我们可以全面把控SQL
4699 0