直接使用
请打开如何使用DLC进行TensorFlow 1.x 分布式训练,并点击右上角 “ 在DSW中打开” 。
如何使用DLC进行Tensorflow1.x的分布式训练
基于云原生深度学习训练平台DLC可以进行分布式训练,本文以TensorFlow分布式训练为例,讲述如何使用DLC进行TensorFlow分布式训练。一般来说,分布式TensorFlow的使用者需要关心下面3件事情:
- 寻找足够运行训练的资源。在数据并行的情况下,有下列两种模式: a. PS/Worker模式 b. AllReduce模式 两种模式各有优点,PS/Worker模式为例,展示如何使用DLC进行TensorFlow的数据并行模式的分布式训练。
- 安装和配置支撑程序运算的软件和应用
- 根据分布式TensorFlow的设计,需要配置ClusterSpec。这个json格式的ClusterSpec是用来描述整个分布式训练集群的架构,比如需要使用两个Worker和PS。一般而言,我们需要在模型代码中去手动的配置ClusterSpec中的worker和ps的属性,这样告诉我们的训练框架他们的彼此的位置,然后在训练的过程中实现weight的归集和分发。
针对上面的三个点,DLC都相应的给出了很便捷,基于云原生的解决方案。
- DLC是云原生的深度学习训练平台,所以基于k8s的调度能力和云的资源,可以很好的实现CPU/GPU的按需高效调度
- 第二点实际上是运行环境,这一点Docker完美契合这个场景
- 最后一点, 针对不同版本的TensorFlow,需要有不同的处理方式。对于TensorFlow 1.x版本而言,我们将配置ClusterSpec的工作自动化的完成,用户可以从模型代码中获取TF_CONFIG环境变量,然后按照实际的模型逻辑的需求进行适配.
Tensorflow 1.x的单机训练代码,需要加入如下的逻辑来构建分布式训练
# 从环境变量TF_CONFIG中读取json格式的配置信息 tf_config_json = os.environ.get("TF_CONFIG", "{}") # 反序列化为python对象 tf_config = json.loads(tf_config_json) # 拿到ClusterSpec的内容 cluster_spec = tf_config.get("cluster", {}) cluster_spec_object = tf.train.ClusterSpec(cluster_spec) # 获取角色类型和id, 比如这里的job_name 是 "worker" and task_id 是 0 task = tf_config.get("task", {}) job_name = task["type"] task_id = task["index"] # 创建TensorFlow Training Server对象 server_def = tf.train.ServerDef( cluster=cluster_spec_object.as_cluster_def(), protocol="grpc", job_name=job_name, task_index=task_id) server = tf.train.Server(server_def) # 如果job_name为ps,则调用server.join() if job_name == 'ps': server.join() # 检查当前进程是否是master, 如果是master,就需要负责创建session和保存summary。 is_chief = (job_name == 'master') # 其余的代码就可以按照正常的规范去写了 with tf.compat.v1.train.MonitoredTrainingSession( master=server.target, ......
Sample代码(PS-Worker模式):
from __future__ import absolute_import from __future__ import division from __future__ import print_function import argparse import os import sys import ast import json import time import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data FLAGS = None def train(): tf_config_json = os.environ.get("TF_CONFIG", "{}") tf_config = json.loads(tf_config_json) task = tf_config.get("task", {}) cluster_spec = tf_config.get("cluster", {}) cluster_spec_object = tf.train.ClusterSpec(cluster_spec) job_name = task["type"] task_id = task["index"] print("job_name",job_name) print("task_id: ",task_id) server_def = tf.train.ServerDef( cluster=cluster_spec_object.as_cluster_def(), protocol="grpc", job_name=job_name, task_index=task_id) server = tf.distribute.Server(server_def) print("server: ",server.target) is_chief = (job_name == 'master') if is_chief: print("Worker %d: Initializing session..." % task_id) tf.reset_default_graph() else: print("Worker %d: Waiting for session to be initialized..." % task_id) batch_size = 5000 # As big as will fit on my gpu learning_rate = 0.016 # Fast learning training_epochs = 5 n_hidden = 2000 logs_path = "/tmp/mnist/2" # load mnist data set mnist = input_data.read_data_sets('MNIST_data', one_hot=True) if job_name == "ps": server.join() elif job_name == "worker": # Between-graph replication with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_id, cluster=cluster_spec)): # count the number of updates global_step = tf.get_variable('global_step', [], initializer=tf.constant_initializer(0), trainable=False) # input images with tf.name_scope('input'): # None -> batch size can be any size, 784 -> flattened mnist image x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input") # target 10 output classes y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input") # model parameters will change during training so we use tf.Variable tf.set_random_seed(1) with tf.name_scope("weights"): W1 = tf.get_variable('W1', shape=(784, n_hidden), initializer=tf.contrib.layers.xavier_initializer()) W2 = tf.get_variable('W2', shape=(n_hidden, 10), initializer=tf.contrib.layers.xavier_initializer()) # bias with tf.name_scope("biases"): b1 = tf.Variable(tf.zeros([n_hidden])) b2 = tf.Variable(tf.zeros([10])) # implement model with tf.name_scope("softmax"): # y is our prediction z2 = tf.add(tf.matmul(x, W1), b1) a2 = tf.nn.sigmoid(z2) logits = tf.add(tf.matmul(a2, W2), b2) dropout_logits = tf.nn.dropout(logits, 0.3) softmax_logits = tf.nn.softmax(logits) # specify cost function with tf.name_scope('cross_entropy'): # this is our cost cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=dropout_logits, labels=y_) loss = tf.reduce_mean(cross_entropy) # specify optimizer with tf.name_scope('train'): # optimizer is an "operation" which we can execute in a session # grad_op = tf.train.AdamOptimizer(learning_rate=learning_rate) grad_op = tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate) train_op = grad_op.minimize(loss, global_step=global_step) with tf.name_scope('Accuracy'): # accuracy correct_prediction = tf.equal(tf.argmax(softmax_logits, 1), tf.argmax(y_, 1)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) # create a summary for our cost and accuracy # tf.scalar_summary("cost", loss) # tf.scalar_summary("accuracy", accuracy) tf.compat.v1.summary.scalar("cost", loss) tf.compat.v1.summary.scalar("accuracy", accuracy) # merge all summaries into a single "operation" which we can execute in a session # summary_op = tf.merge_all_summaries() summary_op = tf.compat.v1.summary.merge_all() # init_op = tf.initialize_all_variables() print("Variables initialized ...") # sv = tf.train.Supervisor(is_chief=(task_id == 0), global_step=global_step, init_op=init_op) config = tf.compat.v1.ConfigProto(device_filters=[ '/job:ps', '/job:worker/task:%d' % task_id]) begin_time = time.time() frequency = 100 # with sv.prepare_or_wait_for_session(server.target) as sess: with tf.compat.v1.train.MonitoredTrainingSession( master=server.target, config=config, is_chief=(task_id == 0 and ( job_name == 'worker'))) as sess: # create log writer object (this will log on every machine) # writer = tf.train.SummaryWriter(logs_path, graph=tf.get_default_graph()) while not sess.should_stop(): # perform training cycles start_time = time.time() for epoch in range(training_epochs): # number of batches in one epoch batch_count = int(mnist.train.num_examples / batch_size) count = 0 for i in range(batch_count): batch_x, batch_y = mnist.train.next_batch(batch_size) # perform the operations we defined earlier on batch _, cost, summary, step, train_accuracy = sess.run([train_op, loss, summary_op, global_step, accuracy], feed_dict={x: batch_x, y_: batch_y}) # writer.add_summary(summary, step) print("step: ",step,"--train_accuracy: ",train_accuracy) count += 1 if count % frequency == 0 or i + 1 == batch_count: elapsed_time = time.time() - start_time start_time = time.time() print("Step: %d," % (step + 1), " Epoch: %2d," % (epoch + 1), " Batch: %3d of %3d," % (i + 1, batch_count), " Cost: %.4f," % cost, " Train acc %2.2f" % (train_accuracy * 100), " AvgTime: %3.2fms" % float(elapsed_time * 1000 / frequency)) count = 0 print("Test-Accuracy: %2.2f" % (sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}) *100)) print("Total Time: %3.2fs" % float(time.time() - begin_time)) print("Final Cost: %.4f" % cost) break # sess.close() print("done") def main(_): train() if __name__ == '__main__': tf.compat.v1.app.run(main=main, argv=[sys.argv[0]])
如何在DLC上面提交一个分布式的训练任务
1. 创建新的工作空间
2. 进入工作空间,新增代码配置
也可以参考: https://help.aliyun.com/document_detail/202277.html
创建新的代码配置: 如下图,用户在创建代码配置的时候,需要输入下列信息:
- 代码配置的名称
- 代码仓库的Git地址
- 如果用用户名密码或者Token,也请输入
- 本地存储目录,这个参数定义了代码被clone到容器中之后,保存的路径,默认是 /root/code/ 输入上述信息之后,保存代码配置
3. 创建数据集配置
目前工作空间中支持下列四种数据集(如下图): 推荐使用阿里云存储,这样无论是性能还是可靠性都有保障
下面以阿里云存储的数据集为例,展示如何创建数据集。 目前支持两种阿里云存储:
NAS:
这里有三个参数:
- 数据存储类型:本例选择NAS
- 选择要挂载的NAS文件系统的ID,这里会有一个列表,列出当前用户所有的NAS文件系统
- 挂载路径:这是指定要挂载的文件系统的目录,本例中挂载文件系统的根目录
- OSS:
- 数据存储类型:本例选择OSS
- 路径: 要挂载的文件系统的路径,可以点击红色框中的按钮选择当前用户所有的OSS文件系统
4. 提交JOB
本文以PS/Worker模式的分布式训练为例,展示如何使用DLC创建一个TensorFlow分布式训练任务。
上面图中的配置是JOB的通用配置,下面选择“进阶模式”,配置分部署训练相关的规格信息: