直接使用
请打开基于如何在DLC上提交ElasticBatch任务,并点击右上角 “ 在DSW中打开” 。
1. ElasticBatch介绍
ElasticBatch 是一种分布式离线弹性批量推理作业类型。ElasticBatch作业具有以下特点
- 任务等待时间大大降低, 部分worker有资源即可运行;
- 支持自动检测慢机并启动backup worker替换,避免任务长尾或者Hang;
- 支持数据分片全局动态分发,让快节点可以处理更多数据;
- 支持任务早停,数据全部处理完成后,未启动的worker不再启动,避免增加任务结束时间;
- 支持容错处理,偶发失败worker会被重新拉起执行;
- 支持多种IO数据源,比如OSS、NAS以及MaxCompute Table等;
- 支持使用各种深度学习框架推理模型,比如PyTorch、TensorFlow以及OneFlow等;
ElasticBatch job包含AIMaster和Worker两类节点,AIMaster负责job的全局管控,包括弹性扩缩容、数据分片全局动态分发、慢机检测以及容错处理。Worker是工作节点,从AIMaster获取一个数据分片后,进行数据读取、预处理、推理模型预测、数据后处理以及数据写回,之后重复上述流程,直到无法获取新的数据分片。
2. ElasticBatch任务提交方式
ElasticBatch任务可以通过DLC-Web或DLC Python SDK提交。
ElasticBatch任务在提交时建议打开容错监控,避免任务运行过程中出现的偶发异常导致任务失败。
ElasticBatch任务推荐的容错监控配置如下所示
--job-execution-mode=Async --fault-tolerant-policy=OnFailure --max-num-of-same-error=20
2.1 在DLC-Web提交
在DLC-Web创建任务时,任务类型选择ElasticBatch即可创建ElasticBatch任务,节点镜像可以根据您的推理模型来选择,比如使用PyTorch模型,那么您可以选择PyTorch镜像作为任务运行环境。
2.2 使用DLC Python SDK提交#
如果您的环境未安装DLC Python SDK,那么可以通过以下命令安装
!pip install alibabacloud-pai-dlc20201203 -U -q
下面是DLC Python SDK提交ElasticBatch任务示例,示例选择TF镜像作为离线推理运行环境,设置了3个运行worker并开启了容错监控。
import time from alibabacloud_pai_dlc20201203.client import Client from alibabacloud_pai_dlc20201203.models import CreateJobRequest, JobSpec, JobSettings from alibabacloud_tea_openapi.models import Config workspace_id = "***已有的AI工作空间ID***" region_id = "cn-hangzhou" # Region,可以是cn-hangzhou,cn-shanghai,cn-shenzhen等 config = Config( access_key_id="***你的access_key_id***", access_key_secret="***你的access_key_secret***", region_id=region_id, endpoint= "pai-dlc.{}.aliyuncs.com".format(region_id)) dlc_client = Client(config) docker_image = "registry.{}.aliyuncs.com/pai-dlc/tensorflow-training:1.15-cpu-py36-ubuntu18.04".format(region_id) job_spec = [JobSpec( type = "Worker", pod_count = 3, image = docker_image, ecs_spec = "ecs.c6.large",)] settings = JobSettings( enable_error_monitoring_in_aimaster = True, error_monitoring_args = "--job-execution-mode=Async --fault-tolerant-policy=OnFailure --max-num-of-same-error=20") create_job_req = CreateJobRequest( display_name = "ElasticBatchJobDemo", job_type = "ElasticBatchJob", workspace_id = workspace_id, job_specs = job_spec, user_command = "sleep 100", settings = settings, ) create_job_resp = dlc_client.create_job(create_job_req) job_id = create_job_resp.body.job_id while True: job = dlc_client.get_job(job_id).body print('job is {}'.format(job.status)) if job.status in ('Succeeded', 'Failed', 'Stopped'): break time.sleep(10)
3. ElasticBatch SDK接口说明
您可以使用ElasticBatch SDK编写具体的离线推理代码,由于不同的数据源读写方式差别很大,ElasticBatch提供了针对MaxCompute Table以及POSIX File这两种数据源的SDK接口,下面给出具体接口说明以及使用示例。
3.1 MaxCompute Table数据源#
针对MaxCompute Table数据源,ElasticBatch基于PAIIO以及COMMON_IO做了二次封装,提供了简洁易用的数据输入输出接口。
3.1.2 接口说明
from aimaster import inference as elastic_inference class MaxComputeTableConfig: def __init__(self, input_table, slice_size=4096, output_table=None): """定义MaxComputeTable配置. 参数: input_table: 输入表; slice_size: 分片大小; output_table: 输出表; """ class MaxComputeTableClient: def __init__(self, config): """定义MaxComputeTable Client, 使用该client可以创建输入dataset以及写表writer. 参数: config: 上述定义的MaxComputeTableConfig对象; """ def create_torch_dataset(self, table_path=None, batch_size=1, selected_cols=None, excluded_cols=None, num_threads=1, capacity=1024, transform_fn=None): """创建PyTorch dataset, 内部是调用COMMON_IO实现,您可以参看COMMON_IO使用手册了解细节. 参数: table_path: 带有分片信息的表路径; batch_size: 每次读取batch_size行数据进行处理; selected_cols: 取的列,格式为英文逗号,分隔的字符串。默认值表示读取所有列。该参数与excluded_cols不能同时使用; excluded_cols: 排除的列,格式为英文逗号,分隔的字符串。默认值表示读取所有列。该参数与selected_cols不能同时使用; num_threads: 内部数据预取线程数; capacity: 内部数据预取总量,单位行数; transform_fn: 一元函数,对输入的数据进行处理; """ def create_tf_dataset(self, record_defaults, selected_cols=None, excluded_cols=None, num_threads=1, capacity=1024): """创建TF dataset, 内部调用PAIIO TableRecordDataset实现,参数详情同TableRecordDataset. 参数: record_defaults: 用于读出列的数据类型转换及列为空时的默认值; selected_cols: 取的列,格式为英文逗号,分隔的字符串。默认值表示读取所有列。该参数与excluded_cols不能同时使用; excluded_cols: 排除的列,格式为英文逗号,分隔的字符串。默认值表示读取所有列。该参数与selected_cols不能同时使用; num_threads: 内部数据预取线程数; capacity: 内部数据预取总量,单位行数; 返回值: 两个返回值(table_dataset, table_path_placeholder), 其中table_path_placeholder是table_dataset输入占位符 """ def create_table_writer(self, buffer_size=0, filter_value=None): """创建写表的writer. 参数: buffer_size: 写缓存大小,如果该值大于0,写操作会放到子进程中进行; filter_value: 需要过滤的值,写的数据中含有该值会被丢弃; 返回值: 返回一个writer对象,使用方式同COMMON_IO的writer; """
3.1.2 使用示例
- TensorFlow模型推理示例
import aimaster from aimaster import inference as elastic_inference import tensorflow as tf import time input_table = "odps://project/tables/test_input" output_table = "odps://project/tables/test_output" # 定义data client, 通过client创建dataset以及writer mc_config = elastic_inference.MaxComputeTableConfig(input_table, slice_size=1024, output_table=output_table) mc_client = elastic_inference.MaxComputeTableClient(mc_config) # 创建一个dataset, 用于读取表数据 table_dataset, table_path_placeholder = mc_client.create_tf_dataset(record_defaults=("",)) iterator = table_dataset.batch(128).make_initializable_iterator() table_data = iterator.get_next() # 创建一个writer, 用于将模型预测结果写表 writer = mc_client.create_table_writer() with tf.Session() as sess: while True: try: # 获取一个表数据分片, E.g. odps://project/tables/test_input?start=0&end=1024 table_path = mc_client.get_next_slice() except aimaster.data.OutOfRangeException: print("All table data have been processed") break # 处理当前拿到的表数据分片, 只有当前分片数据全部被处理完毕后才可以去获取下一个分片 sess.run(iterator.initializer, feed_dict={table_path_placeholder:table_path}) while True: try: # 读取表数据 values = sess.run(table_data) # 这里模拟模型预测,实际使用时替换成真实的TF模型 time.sleep(5) # 将预测结果写表 writer.write(values, col_indices=(0,)) except tf.errors.OutOfRangeError: # 表示该分片数据已经处理完成 break # 关闭writer writer.close()
- PyTorch模型推理示例
import aimaster from aimaster import inference as elastic_inference import torch def get_next_dataLoader(mc_client): # 获取一个表数据分片, E.g. odps://algo_platform/tables/table_test?start=0&end=1024 table_path = mc_client.get_next_slice() # 创建一个table dataset table_dataset = mc_client.create_torch_dataset(table_path, batch_size=1, num_threads=2, capacity=512) # 创建一个dataloader dataloader = torch.utils.data.DataLoader(table_dataset, batch_size=128, num_workers=3) return dataloader def predict(values): # 这里模拟模型预测,实际使用时替换成真实的torch模型 result = [(v,) for v in values[0][0]] return result input_table = "odps://project/tables/test_input" output_table = "odps://project/tables/test_output" # 定义data client, 通过client创建dataset以及writer mc_config = elastic_inference.MaxComputeTableConfig(input_table, slice_size=1024, output_table=output_table) mc_client = elastic_inference.MaxComputeTableClient(mc_config) # 创建一个writer, 用于写预测结果 writer = mc_client.create_table_writer() while True: try: # 获取一个table dataLoader,如果没有分片要处理,抛aimaster.data.OutOfRangeException异常 dataloader = get_next_dataLoader(mc_client) except aimaster.data.OutOfRangeException: print("All table data have been processed") break for data in dataloader: result = predict(data) writer.write(result, col_indices=(0,)) # 关闭writer writer.close()
3.2 POSIX File数据源
如果您的数据存在OSS或NAS上,由于OSS或NAS数据源会被挂载到实例的POD中,在POD中可以直接使用POSIX接口进行访问,这里给出ElasticBatch SDK中有关POSIX File弹性推理接口。
3.2.1 接口说明
from aimaster import inference as elastic_inference class PosixFileConfig: def __init__(self, input_dir, output_dir, slice_size, auto_create_output_dir=True, input_file_meta_path=""): """定义PosixFile配置. 参数: input_dir: 输入文件目录 output_dir: 输出文件目录 slice_size: 分片大小, 单位是文件个数 auto_create_output_dir: 是否自动创建输出目录 input_file_meta_path: 输入文件的meta信息文件路径; 如果为空,AIMaster会从input_dir中list file,这一步可能会很耗时; Meta文件要求Json格式,必须包含FileCount以及FileList两个key, 示例如下 { "FileCount": 2, "FIleDir": "/root/data", "FileList": [ "image_1.txt", "image_2.txt", ] } """ class PosixFileClient: def __init__(self, config): """定义PosixFile Client. 参数: config: 上述定义的PosixFileConfig对象; """ def get_next_slice(self): """获取一个数据分片. 返回值: 包含多个文件路径的list, 比如 [path1, path2, ..., path1024]; 异常: 如果没有分片可以获取将抛出 aimaster.data.OutOfRangeException 异常; """ def commit_slice(self): """提交一个分片,表示此分片已经处理完成. """
3.2.2 使用示例
- 简单示例
import aimaster from aimaster import inference as elastic_inference import time data_config = elastic_inference.PosixFileConfig( input_dir="/root/data/input_files/", slice_size=1024, output_dir="/root/data/output_files/", auto_create_output_dir=True) pf_client = elastic_inference.PosixFileClient(data_config) while True: try: # 获取一个slice,类型为list,包含slice_size个file path file_slice = pf_client.get_next_slice() except aimaster.data.OutOfRangeException: print("All file data have been processed") break # 这里模拟预测以及输出 time.sleep(10) # 提交slice,表示当前slice数据已完成处理 pf_client.commit_slice()
- PyTorch inception_v3模型推理示例
# Code adapted from https://pytorch.org/hub/pytorch_vision_inception_v3/ import os import sys import torch from PIL import Image from torchvision import models from torchvision import transforms import aimaster from aimaster import inference as elastic_inference class DataInputor(object): def __init__(self, pf_client): self.pf_client = pf_client self.preprocess = self.get_data_preprocess() def get_next_dataloader(self): filepath_slice = self.pf_client.get_next_slice() for filepath in filepath_slice: input_image = Image.open(filepath) try: input_tensor = self.preprocess(input_image) except: print("Invalid file data {}".format(filepath)) continue input_batch = input_tensor.unsqueeze(0) if torch.cuda.is_available(): input_batch = input_batch.to('cuda') yield input_batch def get_data_preprocess(self): return transforms.Compose([ transforms.Resize(299), transforms.CenterCrop(299), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),]) class DataWriter(object): def __init__(self, pf_client): self.pf_client = pf_client self.pre_slice_id = self.pf_client.get_slice_id() self.output_dir = self.pf_client.config.output_dir self.writer = None def write(self, data): new_slice_id = self.pf_client.get_slice_id() if self.pre_slice_id != new_slice_id: self._create_new_writer() self.pre_slice_id = new_slice_id top5_prob, top5_catid = data for i in range(top5_prob.size(0)): self.writer.write("{}: {}\n".format(self.categories[top5_catid[i]], top5_prob[i].item())) def close(self): if all([self.pre_slice_id, self.writer]): self.writer.close() self.pf_clinet.commit_slice(self.pre_slice_id) def _create_new_writer(self, new_slice_id): self.close() out_file = os.path.join(self.output_dir, new_slice_id) self.writer = open(out_file, "w") class InceptionV3Classification(object): def __init__(self, model_path): self.model_path = model_path self.model = models.resnet50(pretrained=False) self.model.load_state_dict(torch.load(self.model_path)) self.model.eval() if torch.cuda.is_available(): self.model.to('cuda') # wget https://raw.githubusercontent.com/pytorch/hub/master/imagenet_classes.txt with open("imagenet_classes.txt", "r") as f: self.categories = [s.strip() for s in f.readlines()] def predict(self, input_batch): with torch.no_grad(): output = self.model(input_batch) probabilities = torch.nn.functional.softmax(output[0], dim=0) top5_prob, top5_catid = torch.topk(probabilities, 5) return (top5_prob, top5_catid) data_config = elastic_inference.PosixFileConfig( input_dir='/root/data/cifar10/imagenet-large', slice_size=128, output_dir='/root/data/cifar10/output', auto_create_output_dir=True) pf_client = elastic_inference.PosixFileClient(data_config) data_inputor = DataInputor(pf_client) predictor = InceptionV3Classification(model_path='/root/data/model/resnet50-19c8e357.pth') data_writer = DataWriter(pf_client) while True: try: dataloader = data_inputor.get_next_dataloader() for input_data in dataloader: predict_result = predictor.predict(input_data) data_writer.write(predict_result) except aimaster.data.OutOfRangeException: data_writer.close() print("All file data have been processed.") break