【DSW Gallery】如何使用DLC进行TensorFlow 1.x 分布式训练

本文涉及的产品
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
模型训练 PAI-DLC,5000CU*H 3个月
交互式建模 PAI-DSW,5000CU*H 3个月
简介: Tensorflow1.x DEMO

直接使用

请打开如何使用DLC进行TensorFlow 1.x 分布式训练,并点击右上角 “ 在DSW中打开” 。

image.png

如何使用DLC进行Tensorflow1.x的分布式训练

基于云原生深度学习训练平台DLC可以进行分布式训练,本文以TensorFlow分布式训练为例,讲述如何使用DLC进行TensorFlow分布式训练。一般来说,分布式TensorFlow的使用者需要关心下面3件事情:

  1. 寻找足够运行训练的资源。在数据并行的情况下,有下列两种模式: a. PS/Worker模式 b. AllReduce模式 两种模式各有优点,PS/Worker模式为例,展示如何使用DLC进行TensorFlow的数据并行模式的分布式训练。
  2. 安装和配置支撑程序运算的软件和应用
  3. 根据分布式TensorFlow的设计,需要配置ClusterSpec。这个json格式的ClusterSpec是用来描述整个分布式训练集群的架构,比如需要使用两个Worker和PS。一般而言,我们需要在模型代码中去手动的配置ClusterSpec中的worker和ps的属性,这样告诉我们的训练框架他们的彼此的位置,然后在训练的过程中实现weight的归集和分发。

针对上面的三个点,DLC都相应的给出了很便捷,基于云原生的解决方案。

  1. DLC是云原生的深度学习训练平台,所以基于k8s的调度能力和云的资源,可以很好的实现CPU/GPU的按需高效调度
  2. 第二点实际上是运行环境,这一点Docker完美契合这个场景
  3. 最后一点, 针对不同版本的TensorFlow,需要有不同的处理方式。对于TensorFlow 1.x版本而言,我们将配置ClusterSpec的工作自动化的完成,用户可以从模型代码中获取TF_CONFIG环境变量,然后按照实际的模型逻辑的需求进行适配.

Tensorflow 1.x的单机训练代码,需要加入如下的逻辑来构建分布式训练

# 从环境变量TF_CONFIG中读取json格式的配置信息
tf_config_json = os.environ.get("TF_CONFIG", "{}")
# 反序列化为python对象
tf_config = json.loads(tf_config_json)
# 拿到ClusterSpec的内容
cluster_spec = tf_config.get("cluster", {})
cluster_spec_object = tf.train.ClusterSpec(cluster_spec)
# 获取角色类型和id, 比如这里的job_name 是 "worker" and task_id 是 0
task = tf_config.get("task", {})
job_name = task["type"]
task_id = task["index"]
# 创建TensorFlow Training Server对象
server_def = tf.train.ServerDef(
    cluster=cluster_spec_object.as_cluster_def(),
    protocol="grpc",
    job_name=job_name,
    task_index=task_id)
server = tf.train.Server(server_def)
# 如果job_name为ps,则调用server.join()
if job_name == 'ps':
    server.join()
# 检查当前进程是否是master, 如果是master,就需要负责创建session和保存summary。
is_chief = (job_name == 'master')
# 其余的代码就可以按照正常的规范去写了
with tf.compat.v1.train.MonitoredTrainingSession(
    master=server.target,
    ......

Sample代码(PS-Worker模式):

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os
import sys
import ast
import json
import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
FLAGS = None
def train():
  tf_config_json = os.environ.get("TF_CONFIG", "{}")
  tf_config = json.loads(tf_config_json)
  task = tf_config.get("task", {})
  cluster_spec = tf_config.get("cluster", {})
  cluster_spec_object = tf.train.ClusterSpec(cluster_spec)
  job_name = task["type"]
  task_id = task["index"]
  print("job_name",job_name)
  print("task_id: ",task_id)
  server_def = tf.train.ServerDef(
      cluster=cluster_spec_object.as_cluster_def(),
      protocol="grpc",
      job_name=job_name,
      task_index=task_id)
  server = tf.distribute.Server(server_def)
  print("server: ",server.target)
  is_chief = (job_name == 'master')
  if is_chief:
        print("Worker %d: Initializing session..." % task_id)
        tf.reset_default_graph()
  else:
        print("Worker %d: Waiting for session to be initialized..." % task_id)
  batch_size = 5000  #  As big as will fit on my gpu
  learning_rate = 0.016 #  Fast learning
  training_epochs = 5
  n_hidden = 2000
  logs_path = "/tmp/mnist/2"
# load mnist data set
  mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
  if job_name == "ps":
      server.join()
  elif job_name == "worker":
    # Between-graph replication
    with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_id, cluster=cluster_spec)):
        # count the number of updates
        global_step = tf.get_variable('global_step', [], initializer=tf.constant_initializer(0), trainable=False)
        # input images
        with tf.name_scope('input'):
            # None -> batch size can be any size, 784 -> flattened mnist image
            x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
            # target 10 output classes
            y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")
        # model parameters will change during training so we use tf.Variable
        tf.set_random_seed(1)
        with tf.name_scope("weights"):
            W1 = tf.get_variable('W1',
                                 shape=(784, n_hidden),
                                 initializer=tf.contrib.layers.xavier_initializer())
            W2 = tf.get_variable('W2',
                                 shape=(n_hidden, 10),
                                 initializer=tf.contrib.layers.xavier_initializer())
        # bias
        with tf.name_scope("biases"):
            b1 = tf.Variable(tf.zeros([n_hidden]))
            b2 = tf.Variable(tf.zeros([10]))
        # implement model
        with tf.name_scope("softmax"):
            # y is our prediction
            z2 = tf.add(tf.matmul(x, W1), b1)
            a2 = tf.nn.sigmoid(z2)
            logits = tf.add(tf.matmul(a2, W2), b2)
            dropout_logits = tf.nn.dropout(logits, 0.3)
            softmax_logits = tf.nn.softmax(logits)
        # specify cost function
        with tf.name_scope('cross_entropy'):
            # this is our cost
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=dropout_logits, labels=y_)
            loss = tf.reduce_mean(cross_entropy)
        # specify optimizer
        with tf.name_scope('train'):
            # optimizer is an "operation" which we can execute in a session
            # grad_op = tf.train.AdamOptimizer(learning_rate=learning_rate)
            grad_op = tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate)
            train_op = grad_op.minimize(loss, global_step=global_step)
        with tf.name_scope('Accuracy'):
            # accuracy
            correct_prediction = tf.equal(tf.argmax(softmax_logits, 1), tf.argmax(y_, 1))
            accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
        # create a summary for our cost and accuracy
        # tf.scalar_summary("cost", loss)
        # tf.scalar_summary("accuracy", accuracy)
        tf.compat.v1.summary.scalar("cost", loss)
        tf.compat.v1.summary.scalar("accuracy", accuracy)
        # merge all summaries into a single "operation" which we can execute in a session
        # summary_op = tf.merge_all_summaries()
        summary_op = tf.compat.v1.summary.merge_all()
        # init_op = tf.initialize_all_variables()
        print("Variables initialized ...")
    # sv = tf.train.Supervisor(is_chief=(task_id == 0), global_step=global_step, init_op=init_op)
    config = tf.compat.v1.ConfigProto(device_filters=[
        '/job:ps', '/job:worker/task:%d' % task_id])
    begin_time = time.time()
    frequency = 100
    # with sv.prepare_or_wait_for_session(server.target) as sess:
    with tf.compat.v1.train.MonitoredTrainingSession(
        master=server.target,
        config=config,
        is_chief=(task_id == 0 and (
                job_name == 'worker'))) as sess:
        # create log writer object (this will log on every machine)
        # writer = tf.train.SummaryWriter(logs_path, graph=tf.get_default_graph())
        while not sess.should_stop():
            # perform training cycles
            start_time = time.time()
            for epoch in range(training_epochs):
                # number of batches in one epoch
                batch_count = int(mnist.train.num_examples / batch_size)
                count = 0
                for i in range(batch_count):
                    batch_x, batch_y = mnist.train.next_batch(batch_size)
                    # perform the operations we defined earlier on batch
                    _, cost, summary, step, train_accuracy = sess.run([train_op, loss, summary_op, global_step, accuracy],
                        feed_dict={x: batch_x, y_: batch_y})
                    # writer.add_summary(summary, step)
                    print("step: ",step,"--train_accuracy: ",train_accuracy)
                    count += 1
                    if count % frequency == 0 or i + 1 == batch_count:
                        elapsed_time = time.time() - start_time
                        start_time = time.time()
                        print("Step: %d," % (step + 1), " Epoch: %2d," % (epoch + 1),
                            " Batch: %3d of %3d," % (i + 1, batch_count), " Cost: %.4f," % cost,
                            " Train acc %2.2f" % (train_accuracy * 100),
                            " AvgTime: %3.2fms" % float(elapsed_time * 1000 / frequency))
                        count = 0
            print("Test-Accuracy: %2.2f" % (sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}) *100))
            print("Total Time: %3.2fs" % float(time.time() - begin_time))
            print("Final Cost: %.4f" % cost)
            break
    # sess.close()
    print("done")
def main(_):
  train()
if __name__ == '__main__':
  tf.compat.v1.app.run(main=main, argv=[sys.argv[0]])

如何在DLC上面提交一个分布式的训练任务

1. 创建新的工作空间

image.png

2. 进入工作空间,新增代码配置

也可以参考: https://help.aliyun.com/document_detail/202277.html

创建新的代码配置: 如下图,用户在创建代码配置的时候,需要输入下列信息:

  1. 代码配置的名称
  2. 代码仓库的Git地址
  3. 如果用用户名密码或者Token,也请输入
  4. 本地存储目录,这个参数定义了代码被clone到容器中之后,保存的路径,默认是 /root/code/ 输入上述信息之后,保存代码配置
  5. image.png

3. 创建数据集配置

目前工作空间中支持下列四种数据集(如下图): 推荐使用阿里云存储,这样无论是性能还是可靠性都有保障

image.png

下面以阿里云存储的数据集为例,展示如何创建数据集。 目前支持两种阿里云存储:

NAS:

这里有三个参数:

  1. 数据存储类型:本例选择NAS
  2. 选择要挂载的NAS文件系统的ID,这里会有一个列表,列出当前用户所有的NAS文件系统
  3. 挂载路径:这是指定要挂载的文件系统的目录,本例中挂载文件系统的根目录
  4. image.png
  5. OSS:
  6. 数据存储类型:本例选择OSS
  7. 路径: 要挂载的文件系统的路径,可以点击红色框中的按钮选择当前用户所有的OSS文件系统
  8. image.png

4. 提交JOB

本文以PS/Worker模式的分布式训练为例,展示如何使用DLC创建一个TensorFlow分布式训练任务。

image.png

上面图中的配置是JOB的通用配置,下面选择“进阶模式”,配置分部署训练相关的规格信息:

image.png


相关实践学习
使用PAI-EAS一键部署ChatGLM及LangChain应用
本场景中主要介绍如何使用模型在线服务(PAI-EAS)部署ChatGLM的AI-Web应用以及启动WebUI进行模型推理,并通过LangChain集成自己的业务数据。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
29天前
|
机器学习/深度学习 自然语言处理 监控
利用 PyTorch Lightning 搭建一个文本分类模型
利用 PyTorch Lightning 搭建一个文本分类模型
50 8
利用 PyTorch Lightning 搭建一个文本分类模型
|
机器学习/深度学习 数据挖掘 PyTorch
视觉神经网络模型优秀开源工作:PyTorch Image Models(timm)库(上)
视觉神经网络模型优秀开源工作:PyTorch Image Models(timm)库(上)
|
3月前
|
机器学习/深度学习 API TensorFlow
深入解析TensorFlow 2.x中的Keras API:快速搭建深度学习模型的实战指南
【8月更文挑战第31天】本文通过搭建手写数字识别模型的实例,详细介绍了如何利用TensorFlow 2.x中的Keras API简化深度学习模型构建流程。从环境搭建到数据准备,再到模型训练与评估,展示了Keras API的强大功能与易用性,适合初学者快速上手。通过简单的代码,即可完成卷积神经网络的构建与训练,显著降低了深度学习的技术门槛。无论是新手还是专业人士,都能从中受益,高效实现模型开发。
26 0
|
4月前
|
TensorFlow 算法框架/工具 C++
构建NLP 开发问题之如何将模型导出为 ONNX、TensorRT 或 Tensorflow 格式以便部署
构建NLP 开发问题之如何将模型导出为 ONNX、TensorRT 或 Tensorflow 格式以便部署
|
4月前
|
机器学习/深度学习 PyTorch TensorFlow
PAI DLC与其他深度学习框架如TensorFlow或PyTorch的异同
PAI DLC与其他深度学习框架如TensorFlow或PyTorch的异同
|
12月前
|
PyTorch 算法框架/工具
ModelScope是一个基于PyTorch的模型管理平台
ModelScope是一个基于PyTorch的模型管理平台
307 3
|
6月前
|
机器学习/深度学习 Kubernetes TensorFlow
基于ASK+TFJob快速完成分布式Tensorflow训练任务
本文介绍如何使用TFJob在ASK+ECI场景下,快速完成基于GPU的TensorFlow分布式训练任务。
252 0
基于ASK+TFJob快速完成分布式Tensorflow训练任务
|
机器学习/深度学习 人工智能 TensorFlow
ModelScope使用之模型部署
ModelScope是阿里巴巴打造下一代开源的模型即服务共享平台,为泛AI开发者提供灵活、易用、低成本的一站式模型服务产品,让模型应用更简单!本文演示如何将模型部署到阿里云的EAS,对外提供服务。
15814 0
ModelScope使用之模型部署
|
存储 机器学习/深度学习 Cloud Native
|
存储 机器学习/深度学习 Kubernetes
【DSW Gallery】如何在DLC中进行Pytorch DDP分布式训练任务
本文基于Pytorch 1.8版本,介绍了如何使用DLC进行Pytorch DDP分布式训练任务.
【DSW Gallery】如何在DLC中进行Pytorch DDP分布式训练任务

热门文章

最新文章