机器学习分布式框架Ray

简介: Ray是UC Berkeley RISELab推出的一个高性能分布式执行框架,它比Spark更具计算优势,部署简单,支持机器学习和深度学习的分布式训练。Ray包括节点(head和worker)、本地调度器、object store、全局调度器(GCS),用于处理各种分布式计算任务。它支持超参数调优(Ray Tune)、梯度下降(Ray SGD)、推理服务(Ray SERVE)等。安装简单,可通过`pip install ray`。使用时,利用`@ray.remote`装饰器将函数转换为分布式任务,通过`.remote`提交并用`ray.get`获取结果。5月更文挑战第15天

机器学习分布式框架Ray

1.什么是Ray

分布式计算框架大家一定都耳熟能详,诸如离线计算的Hadoop(map-reduce),spark, 流式计算的strom,Flink等。相对而言,这些计算框架都依赖于其他大数据组件,安装部署也相对复杂。

在python中,之前有分享过的Celery可以提供分布式的计算。今天和大家分享另外一个开源的分布式计算框架Ray。Ray是UC Berkeley RISELab新推出的高性能分布式执行框架,具有比Spark更优异的计算性能,而且部署和改造更简单,同时支持机器学习和深度学习的分布式训练,支持主流的深度学习框架(pytorch,tensorflow,keras等)

md-2021-09-07-17-46-27.png

2. Ray架构

Ray的架构参见最早发布的论文Ray: A Distributed Framework for Emerging AI Applications

md-2021-09-07-18-16-08.png

由上图可以Ray主要包括:

  • Node: 节点,主要是head和worker, head可以认为是Master,worker是执行任务的单元
    • 每个节点都有自己的本地调度器local scheduler
    • object store:一个内存对象存储,允许Node之间进行通信
  • scheduler: 有两个调度器,每个节点都有本地的调度器, 在提交任务时,Local Scheduler会判断是否需要提交给Global Scheduler分发给其他worker来执行。
  • GCS:全局状态控制记录了Ray中各种对象的状态信息,可以认为是meta数据,是Ray容错的保证

Ray适用于任何分布式计算的任务,包括分布式训练。笔者最近是用在大量的时间序列预测模型训练和在线预测上。

Ray目前库支持超参数调优Ray tune, 梯度下降Ray SGD,推理服务RaySERVE, 分布式数据Dataset以及分布式增强学习RLlib。还有其他第三方库,如下所示:

md-2021-09-07-19-12-34.png

3. 简单使用

3.1 安装部署

pip install --upgrade pip
# pip install ray
pip install ray == 1.6.0

# ImportError: cannot import name 'deep_mapping' from 'attr.validators'
# pip install attr == 19.1.0

3.2 单机使用

  • 简单例子
    Ray 通过@ray.remote装饰器使得函数变成可分布式调用的任务。通过函数名.remote方式进行提交任务,通过ray.get方式来获取任务返回值。单击情况下和多线程异步执行的方式类似。

      import time
      import ray
      ray.init(num_cpus = 4) # Specify this system has 4 CPUs.
    
      @ray.remote
      def do_some_work(x):
          time.sleep(1) # Replace this is with work you need to do.
          return x
    
      start = time.time()
      results = ray.get([do_some_work.remote(x) for x in range(4)])
      print("duration =", time.time() - start)
      print("results = ", results)
    
      # duration = 1.0107324123382568
      # results =  [0, 1, 2, 3]
    

    remote返回的对象的id 如ObjectRef(7f10737098927148ffffffff0100000001000000)。需要通过ray.get来获取实际的值, 需要注意的是ray.get是阻塞式的调用,不能[ray.get(do_some_work.remote(x)) for x in range(4)]

  • 注意小任务使用情况
    需要注意的是ray分布式计算在调度的时候需要发费额外的时间,如调度,进程间通信以及任务状态的更新等等,所以避免过小的任务。可以把小任务进行合并

      @ray.remote
      def tiny_work(x):
          time.sleep(0.0001) # Replace this is with work you need to do.
          return x
    
      start = time.time()
      result_ids = [tiny_work.remote(x) for x in range(100000)]
      results = ray.get(result_ids)
      print("duration =", time.time() - start)
    
  • ray.put
    ray.put() 把一个对象放到对象存储上,返回一个object id, 这个id可以在分布式机器上都可以调用,该操作为异步的。通过ray.get()可以是获取。

      num = ray.put(10)
      ray.get(num)
    
  • ray.wait
    如果任务返回多个结果,ray.get()会等所有结果都完成之后才会执行后续的操作。如果多个结果执行的耗时不同,此时短板在于最长的那个任务。

    这个时候可以采用ray.wait()方法,ray.wait()返回执行完毕的和未执行完毕的任务结果,执行完成的结果可以继续后续的操作

      import random
      @ray.remote
      def do_some_work(x):
          time.sleep(random.uniform(0, 4)) # Replace this is with work you need to do.
          return x
    
      def process_incremental(sum, result):
          time.sleep(1) # Replace this with some processing code.
          return sum + result
    
      start = time.time()
      result_ids = [do_some_work.remote(x) for x in range(4)]
      sum = 0
      while len(result_ids):
          done_id, result_ids = ray.wait(result_ids)
          sum = process_incremental(sum, ray.get(done_id[0]))
      print("duration =", time.time() - start, "\nresult = ", sum)
    
      # duration = 5.270821809768677 
      # result =  6
    

    md-2021-09-07-10-24-14.png

2.3 集群部署

Ray的架构遵循master-slave的模式。Head Node 可以认为是Master,其他的Node为worker。在集群部署时,Head Node需要首先启动ray start --head, 其他机器依次启动worker,注意需要指定head Node的地址确定关系,ray start --address 10.8.xx.3:6379

关闭服务,需要每一台机器执行 ray.stop

md-2021-09-07-11-17-49.png

# To start a head node.
#ray start --head --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS>
ray start --head --node-ip-address 10.8.xx.3 --port=6379


# To start a non-head node.
# ray start --address=<address> --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS>
ray start --address 10.8.xx.3:6379 --node-ip-address 10.8.xx.3 --num-cpus 10 --temp-dir={your temp path}

md-2021-09-07-13-37-40.png

  • 提交任务
    任何一台worker机器都可以提交任务, 先通过init连接Head Node就可以remote起来了。
      import ray
      ray.init(10.8.xx.3:6379)
    

3. 不同任务的例子

  • 任务依赖
    任务之间存在依赖关系,Ray和Spark一样也是通过生成DAG图的方式来确定依赖关系,确定可以并行跑的任务。如下图所示zeros是可以并行跑的。

    md-2021-09-07-10-23-42.png

``` python
import numpy as np
# Define two remote functions. Invocations of these functions create tasks
# that are executed remotely.

@ray.remote
def multiply(x, y):
    return np.dot(x, y)

@ray.remote
def zeros(size):
    return np.zeros(size)

# Start two tasks in parallel. These immediately return futures and the
# tasks are executed in the background.
x_id = zeros.remote((100, 100))
y_id = zeros.remote((100, 100))

# Start a third task. This will not be scheduled until the first two
# tasks have completed.
z_id = multiply.remote(x_id, y_id)

# Get the result. This will block until the third task completes.
z = ray.get(z_id)
print(z)
```
  • 有状态任务
    上面提到的任务都是无状态的(除依赖外),即任务之间都是无关系的。Ray也是支持有状态的任务成为Actor。常是在python class上加@ray.remote,ray会跟踪每个class内部状态的不同状态。

      @ray.remote
      class Counter(object):
          def __init__(self):
              self.n = 0
    
          def increment(self):
              self.n += 1
    
          def read(self):
              return self.n
    
      counters = [Counter.remote() for i in range(4)]
    
      # 不断的执行可以每个counter计数不断增加
      [c.increment.remote() for c in counters]
      futures = [c.read.remote() for c in counters]
      print(ray.get(futures))
      # [1, 1, 1, 1]
      # [11, 11, 11, 11]
    
  • map-reduce 任务
    map-reduce任务其实可以其他分布式任务是一样的。主要是各种聚合操作。Map-Reduce常规操作如下
    md-2021-09-07-13-55-07.png

  - word count例子见:https://github.com/ray-project/ray/blob/master/doc/examples/streaming/streaming.py

这里举一个简单的例子:
``` python
@ray.remote
def map(obj, f):
    return f(obj)
@ray.remote
def sum_results(*elements):
    return np.sum(elements)

items = list(range(100))
map_func = lambda i : i*2
remote_elements = [map.remote(i, map_func) for i in items]

# simple reduce
remote_final_sum = sum_results.remote(*remote_elements)
result = ray.get(remote_final_sum)

# tree reduce
intermediate_results = [sum_results.remote(
    *remote_elements[i * 20: (i + 1) * 20]) for i in range(5)]
remote_final_sum = sum_results.remote(*intermediate_results)
result = ray.get(remote_final_sum)

```
![md-2021-09-07-19-34-45.png](https://ucc.alicdn.com/pic/developer-ecology/abb7gqinvjggw_0b8ad35cdc774ab78d681c2c782ff884.png)
  • 训练模型如pytorch
    官网提供了Best Practices: Ray with PyTorch, 主要是下载训练/测试数据和训练多个模型(感觉不是很实用)。训练多个模型,可以进行参数融合。

    参见 https://docs.ray.io/en/latest/using-ray-with-pytorch.html

4. 总结

本文分享了高效的Python分布式计算框架Ray,希望对你有帮助。总结如下:

  • Ray是UC Berkeley RISELab新推出的高性能分布式执行框架, Spark也是伯克利出品的
  • Ray架构关键:两个调度器, Head和worker节点,GCS全局状态控制保证计算容错
  • Ray应用简单:@ray.remote把任务变成分布式任务, x.remote提交任务, get/wait获取结果
  • 集群不是:ray start
  • Ray支持多种任务:有依赖DAG,有状态Actor以及深度学习支持
  • 不断丰富的库:RaySERVE, RaySGD, RayTune, Ray data,rllib
相关实践学习
部署Stable Diffusion玩转AI绘画(GPU云服务器)
本实验通过在ECS上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。
目录
相关文章
|
2月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
2月前
|
消息中间件 Java Kafka
在Java中实现分布式事务的常用框架和方法
总之,选择合适的分布式事务框架和方法需要综合考虑业务需求、性能、复杂度等因素。不同的框架和方法都有其特点和适用场景,需要根据具体情况进行评估和选择。同时,随着技术的不断发展,分布式事务的解决方案也在不断更新和完善,以更好地满足业务的需求。你还可以进一步深入研究和了解这些框架和方法,以便在实际应用中更好地实现分布式事务管理。
|
23天前
|
存储 监控 数据可视化
常见的分布式定时任务调度框架
分布式定时任务调度框架用于在分布式系统中管理和调度定时任务,确保任务按预定时间和频率执行。其核心概念包括Job(任务)、Trigger(触发器)、Executor(执行器)和Scheduler(调度器)。这类框架应具备任务管理、任务监控、良好的可扩展性和高可用性等功能。常用的Java生态中的分布式任务调度框架有Quartz Scheduler、ElasticJob和XXL-JOB。
328 66
|
18天前
|
机器学习/深度学习 人工智能
Diff-Instruct:指导任意生成模型训练的通用框架,无需额外训练数据即可提升生成质量
Diff-Instruct 是一种从预训练扩散模型中迁移知识的通用框架,通过最小化积分Kullback-Leibler散度,指导其他生成模型的训练,提升生成性能。
47 11
Diff-Instruct:指导任意生成模型训练的通用框架,无需额外训练数据即可提升生成质量
|
17天前
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
56 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
|
5天前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
37 7
|
1月前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
81 2
|
1月前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
84 4
|
2月前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
125 6
|
2月前
|
数据库
如何在Seata框架中配置分布式事务的隔离级别?
总的来说,配置分布式事务的隔离级别是实现分布式事务管理的重要环节之一,需要认真对待和仔细调整,以满足业务的需求和性能要求。你还可以进一步深入研究和实践 Seata 框架的配置和使用,以更好地应对各种分布式事务场景的挑战。
61 6