Step By Step
1、界面组件拖拽与配置
2、Python 组件相关配置
3、运行测试
4、日志查看
一、界面组件拖拽与配置
- 1.1 Designer控制台创建工作流
- 1.2 分别拖拽读数据表组件和Python脚本组件到画布
二、组件相关配置
- 2.1 DataWorks数据开放控制台创建数据表并导入数据
脚本:
CREATE TABLE `lm_test_input_1` (
`value` bigint,
`output1` bigint
);
INSERT into table lm_test_input_1 values (1,2);
INSERT into table lm_test_input_1 values (2,4);
SELECT * FROM lm_test_input_1;
- 2.2 配置读数据表组件
- 2.3 配置Python脚本组件
main.py
from odps import ODPS
from pai_running.context import Context
context = Context()
# 获取组件第一个输入端口输入数据
input_port = context.input_artifacts.flatten()[0]
print("---input_port---",input_port)
print("---日志输出测试:---")
o = ODPS(
access_id=context.access_key_id,
secret_access_key=context.access_key_secret,
endpoint=input_port.endpoint,
project=input_port.project,
)
# 获取从上游输入的表名
input_table_name = input_port.table
print("---input_table_name---",input_table_name)
# 组件准备输出的表
output_table_name = "demo_output_table"
o.execute_sql(
f"drop table if exists {output_table_name};",
)
# 获取输入表的age列,导出到一张新表中
o.execute_sql(
f"create table {output_table_name} as select value from {input_table_name};"
)
# 通过以下调用,告知Workflow框架,当前组件输出了一张ODPS表
output_port = context.output_artifacts.flatten()[0]
output_port.write_table(
table="demo_output_table",
project=o.project,
endpoint=o.endpoint,
)
执行配置脚本
{
"_comments": [
"Python 组件将用户代码运行在公共资源组的 DLC 集群 (https://help.aliyun.com/document_detail/202277.html)",
"并且支持数据加载/保存本地文件系统,用户可以通过读写本地文件的方式,读写上下游的输入输出数据。",
"目前运行的任务通过一个 JSON 文件进行配置,从功能角度,配置项主要包括两部分内容",
"1. 数据载入/保存配置",
"1.1. inputDataTunnel: 每一项对应组件的一个输入端口, 将上游节点的输入数据(MaxComputeTable, OSS) 加载到本地目录中;",
"1.2. outputDataTunnel: 每一项对应组件的一个输出端口,指定将哪些本地文件上传保存到 OSS 中;",
"1.3. uploadConfig: 数据上传的OSS配置, 包括上传的OSS bucket 名称, endpoint,以及上传到OSS的根路径 path;",
"2. 运行负载配置(jobConfig),包含运行在 ServerLess DLC 的具体运行配置;",
"注意: 以下的配置项是一个样例说明,请根据实际组件运行的场景修改使用。",
"注意: 用户的代码执行的日志输出,可以通过点击组件输出的 DLC 的任务URL,去DLC的控制台查看"
],
"inputDataTunnel": [
],
"outputDataTunnel": [
],
"uploadConfig": {
"endpoint": "oss-<oss bucket region>.aliyuncs.com",
"bucket": "<oss bucket name>",
"path": "python_example/",
"_comments": [
"数据上传配置项, 目前数据上传功能只支持上传到 OSS。",
"如果单独的 outputDataTunnel 中没有数据上传配置(没有.uploadConfig 字段),则会使用全局的uploadDataTunnelConfig配置",
"note:每一个DataTunnel 指定的文件/目录,最终的上传的路径为 uploadConfig.path/{run_id}/{node_id}/{output_tunnel_name}/"
]
},
"jobConfig": {
"name": "example1",
"jobType": "generalJob",
"taskSpec": {
"instanceType": "ecs.c6.large",
"imageUri": "registry.cn-hangzhou.aliyuncs.com/paiflow-public/python3:v1.0.0"
},
"_comments": [
"DLC的任务配置项,包括",
"name: 运行在 DLC 的任务名称前缀",
"jobType: 任务类型,目前默认为GeneralJob,无需修改,表示是一个单节点任务(后续会支持多节点的分布式任务)",
"taskSpec: 任务worker节点配置,其中 .instanceType 表示worker使用的 ECS 实例类型; .imageUri 是worker使用的镜像",
"目前worker 支持使用官方镜像 (https://help.aliyun.com/document_detail/202834.htm) ,以及自定义镜像, 如果使用自定义镜像,请确保镜像能够支持公开访问。"
]
}
}
三、运行测试
四、日志查看
由于Python脚本需要依赖于PAI-DLC作为底层计算引擎,实际是在DLC集群创建docker运行的,所以查看Python脚本的详细日志需要到DLC控制台查看。