开发者学堂课程【分布式系统开发调度技术:任务调度】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/367/detail/4368
任务调度
内容介绍:
一、海量数据的并发处理
二、MapReduce 的例子
三、飞天任务调度系统
四、任务调度系统的技术要点
一、海量数据的并发处理
写程序时,当数据量增加时怎么办?
常见的处理手法是写一个多线程的程序,能够让更多的 CPU 做这个计算,或者写多进程的程序,但单台物理机器的内存是有上限的,当需要处理的数据量超过单台物理极限时,对海量数据如何进行并发的处理,即是分布式任务调度所需要解决的问题。
面对海量数据分布处理的问题,工程师 Jeffrey Dean 在2004年OSDI一篇文章中将函数式编程中的 Map Reduce 思想借鉴到了海量数据处理,基本思想是将输入数据进行简单的切分之后做 map 操作做一个映射得到一个中间态结果,再进行reduce规约,从而得到最终结果。Map这个动作的设计非常关键,它将取决定的输入数据如何进行中间态的转换。所以将这一个海量的数据处理过程,通过 map,reduce 进行分解,reduce 进行规约之后,可以很容易的实现并发。
当将业务的场景进行 map,reduce 建模时,map 动作是非常关键的,因为它将取决输入数据如何进行中间态的表征,朝着最终解的方向进行转换,而通常 reduce是比较简单的,将 map 的结果进行规约即可。
二、MapReduce的例子
1.统计图书馆中所有属于自然科学的书本
图书馆中对书的组织是按照书架组织的,那是否可以写一个程序,以书架为单位进行扫描。这样所有的书架可以并发扫描,每本书有一个属性,属于人文科学或者自然科学,进行简单统计,最后将所有书架的执行结果进行一次规约,就可以得到最终想要的结果。
2.统计全部学生的成绩的平均分
如果写一个单机程序,可以写一个 for 循环为每一个学生,再写第二个循环for每个课程,进行统计,最后求出每个学生的平均分,但对于全校学生,如何用分布式的程序 MapReduce 方式来实现这个过程可以思考一下。
3.系统的日志分析
如一个网站通常会有很多日志,如用户的浏览日志,系统的硬件监控的日志等。则可以通过MapReduce的方式来分析日志的数量。通过不同的日志之间的相关性,是否可以分析出在流量达到高峰时,硬件的报警就会增多。这个过程是常见的MapReduce的过程,可以考虑一下如何建模。
4.网页搜索中的 PageRank
三、飞天任务调度系统
在飞天 Fuxi 系统中,参与任务调度的角色主要有 App Master 和 App Worker 以及App Worker 运行的 instance。
App Master 职责:
1>向 FuxiMaster 发起资源请求
2>将资源分配结果下发给对应的集群节点上,拉起 App Worker 进程
3>负责监控 Instance 运行期间,如果发生故障,如何进行重试
App Worker 职责:
响应 App Master 发来的任务请求
进行运算,将结果写到输出文件上
四、任务调度系统的技术要点
1.数据 Locality
当 instance 在处理的时候是需要从存储系统中读取数据的,更希望 instance 能从本地读取数据,因为如果从远程去拉取数据,除了本磁盘的读取之外,还经过一次网络,其性能会大打折扣,所以任务调度的核心关键是如何做到每一个 instanc e的数据的本地化。
如下表,有三个 instance 要跑,分别是1号,2号,3号,集群中有三个
节点node1、2、3,由于每个 instance 的输入数据是存储在分布式文件系统当中的,所以首先可以通过文件接口来获取每一个 instance,的数据在集群中的分布情况。
如1号 instance 在1号节点上有50%的数据,在2号节点上有30%的数据,在3号节点上有20%的数据,也可以直接把1号 instance 分配到1号节点上,所以将每个instance 直接分配到它所在数据最多的节点上,初步的分配结果是将1号 instance分配在节点1上,将2号 instance 分配在节点2上将3号 instance 也分配在节点2上,数据本地化最高。
但是3号节点空着而2号节点上分配了两个 instance,如果2号和3号 instance 是CPU密集型的,那2号节点CPU会变得非常繁忙,会出现大量的CPU争抢。
三台机器并没有最大化的利用,对调查结果进行改进,将3号instance分配到3号节点,数据本地化将有很大的折扣,若将2号 instance 改签到3号节点,它的数据本地化只牺牲了10%,这样的结果有一个优化空间,就是将2号 instance 重新分配到3号节点上,而3号 instance 留在2号节点上,虽然数据本地化相对与2号 instance牺牲的10%,但是三台物理机器全部的利用了,不会出现CPU争抢的情况,所以整体运行时间将会是提高的。
这是数据本地化调度时所需要考虑的细节,不是一味的追求数据的本地化,而要全盘考虑资源的负载是否均衡。
2.数据 Shuffle
数据的 shuffle 就是数据的传递,在典型的 CPU 框架下,map 的输出结果是需要传递给 reduce 进行规约动作。
一般意义上存在三种形态的数据 Shuffle,如下图,左边的是一对一模式,每个Map 的输出只会发送给某一个 Reduce,尽管这种场景在实际生产中并不常见但也是一种可以被实现的形态。
最常见的是中间的1:N,每一个Map的结果会被发送给所有的reduce,通常因为map 的输出结果是一些 key value 对序列,对每一个 key 可以求一个哈希,哈希到reduce 中的某一路。
第三种是最一般的M:N的模式,每一个Map根据自己的业务形态可以分配给不同的reduce。
作为一个调度系统,如何满足不同的业务场景的需求,Fuxi 系统是将 Shuffle 的过程抽象成用户只需要实现一个 partitioner,重载一个哈希函数就可以实现自己独有的Shuffle 的需求,是1:N或者是M:N的方式。
另一方面,map的输出结果,通常所谓的发出去,实际上是写到分布式存储系统,这个系统当文件时,需要进行命名,必须遵照一定的命名规则,才能让 reduce 的instance 能够访问到它的数据,所以命名的规则的过程以及 Shuffle 的管理 Fux 系统是将它封装到一个所作所谓的 streamline 的库中,用户在使用时,只需要进行简单的配置,是哪一种模式就可以,减轻了用户的负担。
3.Instance重试和Backup Instance
在大规模的计算中,错误是常见的,无论是硬件的错误,如磁盘、内存也可能出现硬件的故障。更多的是软件故障,如程序故障,有可能进程crash或者整个机器会宕机。
如何进行错误的处理,常见的方法是进行错误的重试,将这个 instance 重新调度到另外一个 Worker 上进行执行,还有一种情况是,这个 instance 并没有结束,但是由于硬件的性能退化,可能运行的非常慢,这个在分布式系统中常见称之为常萎,因为一般意义上数据切分之后,每个 instance 的应付的数据量差不多,但是由于 instance 被分配的硬件节点不同,而每个节点的处理速度可能不一样,如何处理这种问题,阿里云 Fuxi 采用了了一种 backup
instance,也就是备份instance 的方式,它的处理方法是当发现一个 instance出现常萎时,会将这个 instance 重新分配到另外一个 app worker 上执行同时运行这一片数据,谁先到达就取谁的结果。但这个机制的触发需要条件:
instance运行时间超过了其他 instance 的平均运行时间
instance的数据处理速度低于其他的 instance平均值
这两者的区别,第一个是 instance完成时间是需要其他的 instance 已经成功的跑完,才可以统计它的运行时间,而第二个是当其他instance还没有结束的时候,是否可以提前发现某些instance 处理速度已经开始变慢,此时就可以开始启动备份的instance。
已完成的 instance 比例
在任务启动之初进行判断可能会产生误判。所以通过 instance 重试和备份 instance 来处理 instance 出现错误及常委的情况。