简述
Ploomber 是为数据科学和机器学习构建可靠数据工作流的最简单方法。 当你以标准形式提供您的源代码,Ploomber 会自动为您构建工作流。 任务可以是 Python 函数、Jupyter Notebook、Python/R/shell 脚本和 SQL 脚本中的任何内容。当你准备就绪后,无需更改代码即可部署到 Airflow 或 Kubernetes(使用 Argo)。
主要特点
集成了Jupyter Notebook。 当您打开Notebook时,Ploomber 将自动注入一个新单元格,其中包含从上游变量推断出的输入文件的位置。 如果您打开 Python 或 R 脚本,它会立即转换为Notebook。
支持增量构建。 通过跳过源代码未更改的任务来加快执行速度。
支持并行化。 支持并行运行任务以加快计算速度。
支持工作流测试。 在任务执行时,运行测试以验证输出的数据是否具有正确的属性(例如,在预期范围内的值)。
支持工作流调试。 使用plumber interact
启动一个的交互式会话来调试您的工作流。 调用 dag['task_name'].debug()
来启动调试会话。
支持部署到 Kubernetes、AWS Batch 或 Airflow。 您可以在本地开发和执行。 一旦你准备好部署之后,导出到 Kubernetes、AWS Batch 或 Airflow即可。(注:Ploomber提供了Soopervisor库用于运行Ploomber工作流以进行批处理(大规模训练或批量服务)或在线推理。)
示例
定义任务
首先定义任务,这里的任务是Python脚本。
这个工作流包含3个任务,第一个任务raw.py获取一些数据,第二个任务clean.py清除数据,第三个任务plot.py生成可视化。
ls *.py clean.py plot.py raw.py 复制代码
以上这三个脚本构成了我们的工作流(或DAG),它是具有预定义执行顺序的任务集合。
raw.py
""" Get data """ import seaborn as sns # + tags=["parameters"] upstream = None product = None # - # + df = sns.load_dataset('iris') df.columns = [ 'sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'class' ] df.head() # - # + df.to_csv(str(product['data']), index=False) # - 复制代码
clean.py
""" Clean """ import pandas as pd # + tags=["parameters"] upstream = ['raw'] product = None # - df = pd.read_csv(upstream['raw']['data']) # data cleaning code... df.to_csv(str(product['data']), index=False) 复制代码
plot.py
""" Visualize """ # this is an annotated script import pandas as pd import seaborn as sns # + tags=["parameters"] upstream = ["clean"] product = None # - df = pd.read_csv(upstream['clean']['data']) sns.distplot(df.sepal_length) sns.distplot(df.sepal_width) df.head() 复制代码
Ploomber与Jupyter Notebook集成,如果您打开Jupyter Notebook应用程序内的脚本,它们将呈现为notebooks形式。
定义工作流
然后定义工作流,工作流在pipeline.yaml中定义。
pipeline.yaml文件声明我们使用哪些文件作为任务(pipeline.yaml是可选,但它给了您更多的灵活性)。
pipeline.yaml
tasks: - source: raw.py product: nb: output/raw.ipynb data: output/data.csv - source: clean.py product: nb: output/clean.ipynb data: output/clean.csv - source: plot.py product: output/plot.ipynb 复制代码
绘制工作流图
接下来在命令行绘制工作流。
ploomber plot 复制代码
浏览工作流
ploomber status 复制代码
输出结果:
name Last run Outdated? Product Doc (short) Location ------ ------------ ----------- ------------ ------------- ------------ raw Has not been Source code MetaProduct( /home/docs/c run {'data': Fil heckouts/rea e('output/da dthedocs.org ta.csv'), /user_builds 'nb': File(' /ploomber/ch output/raw.i eckouts/proj pynb')}) ects- master/spec- api-python/r aw.py clean Has not been Source code MetaProduct( /home/docs/c run & Upstream {'data': Fil heckouts/rea e('output/cl dthedocs.org ean.csv'), /user_builds 'nb': File(' /ploomber/ch output/clean eckouts/proj .ipynb')}) ects- master/spec- api-python/c lean.py plot Has not been Source code File('output /home/docs/c run & Upstream /plot.ipynb' heckouts/rea ) dthedocs.org /user_builds /ploomber/ch eckouts/proj ects- master/spec- api-python/p lot.py 复制代码
构建一个工作流
然后,运行该工作流。
# output文件件保存输出结果,包括数据和脚本的Notebook mkdir output ploomber build 复制代码
运行结果:
Building task 'raw': 0%| | 0/3 [00:00<?, ?it/s] Executing: 0%| | 0/5 [00:00<?, ?cell/s] Executing: 20%|██ | 1/5 [00:03<00:14, 3.71s/cell] Executing: 100%|██████████| 5/5 [00:04<00:00, 1.15cell/s] Building task 'clean': 33%|███▎ | 1/3 [00:04<00:08, 4.38s/it] Executing: 0%| | 0/5 [00:00<?, ?cell/s] Executing: 100%|██████████| 5/5 [00:01<00:00, 3.91cell/s] Building task 'plot': 67%|██████▋ | 2/3 [00:05<00:02, 2.56s/it] Executing: 0%| | 0/7 [00:00<?, ?cell/s] Executing: 14%|█▍ | 1/7 [00:01<00:08, 1.35s/cell] Executing: 71%|███████▏ | 5/7 [00:01<00:00, 4.18cell/s] Executing: 100%|██████████| 7/7 [00:01<00:00, 3.65cell/s] Building task 'plot': 100%|██████████| 3/3 [00:07<00:00, 2.53s/it] name Ran? Elapsed (s) Percentage ------ ------ ------------- ------------ raw True 4.37319 57.5864 clean True 1.28883 16.9714 plot True 1.93212 25.4422 复制代码
更新工作流
如果我们修改了工作流中的任务。我们需要更新工作流,重新执行已更改的任务。
ploomber build 复制代码
运行结果:
0it [00:00, ?it/s] name Ran? Elapsed (s) Percentage ------ ------ ------------- ------------ raw False 0 0 clean False 0 0 plot False 0 0 复制代码
从上面可以看到,我们这三个任务都没有再执行,因为,我并没有修改这三个任务。
总结
Ploomber使用约定优于配置的方式。从一开始,您只需在脚本/notebooks中包含两个特殊变量,Ploomber就可以将工作流进行编排执行。为了获得更大的灵活性,您可以使用 YAML 指定您的pipeline。
同时,Ploomber可以使用Jupyter以交互方式开发脚本和notebooks,然后以编程方式执行。