当 Mars 遇上 RAPIDS:用 GPU 以并行的方式加速数据科学

简介: 在数据科学世界,Python 是一个不可忽视的存在,且有愈演愈烈之势。而其中主要的使用工具,包括 Numpy、Pandas 和 Scikit-learn 等。 Mars 在 MaxCompute 团队内部诞生,它的主要目标就是让 Numpy、pandas 和 scikit-learn 等数据科学的库能够并行和分布式执行,支持通过 RAPIDS 平台用 GPU 加速数据科学。

背景

在数据科学世界,Python 是一个不可忽视的存在,且有愈演愈烈之势。而其中主要的使用工具,包括 Numpy、Pandas 和 Scikit-learn 等。

Numpy

Numpy 是数值计算的基础包,内部提供了多维数组(ndarray)这样一个数据结构,用户可以很方便地在任意维度上进行数值计算。

image

我们举一个蒙特卡洛方法求解 Pi 的例子。这背后的原理非常简单,现在我们有个半径为1的圆和边长为2的正方形,他们的中心都在原点。现在我们生成大量的均匀分布的点,让这些点落在正方形内,通过简单的推导,我们就可以知道,Pi 的值 = 落在圆内的点的个数 / 点的总数 * 4。

这里要注意,就是随机生成的点的个数越多,结果越精确。

用 Numpy 实现如下:

import numpy as np

N = 10 ** 7  # 1千万个点

data = np.random.uniform(-1, 1, size=(N, 2))  # 生成1千万个x轴和y轴都介于-1和1间的点
inside = (np.sqrt((data ** 2).sum(axis=1)) < 1).sum()  # 计算到原点的距离小于1的点的个数
pi = 4 * inside / N
print('pi: %.5f' % pi)

可以看到,用 Numpy 来进行数值计算非常简单,只要寥寥数行代码,而如果读者习惯了 Numpy 这种面相数组的思维方式之后,无论是代码的可读性还是执行效率都会有巨大提升。

pandas

pandas 是一个强大的数据分析和处理的工具,它其中包含了海量的 API 来帮助用户在二维数据(DataFrame)上进行分析和处理。

pandas 中的一个核心数据结构就是 DataFrame,它可以简单理解成表数据,但不同的是,它在行和列上都包含索引(Index),要注意这里不同于数据库的索引的概念,它的索引可以这么理解:当从行看 DataFrame 时,我们可以把 DataFrame 看成行索引到行数据的这么一个字典,通过行索引,可以很方便地选中一行数据;列也同理。

我们拿 movielens 的数据集 作为简单的例子,来看 pandas 是如何使用的。这里我们用的是 Movielens 20M Dataset.

import pandas as pd

ratings = pd.read_csv('ml-20m/ratings.csv')
ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']})

通过一行简单的 pandas.read_csv 就可以读取 CSV 数据,接着按 userId 做分组聚合,求 rating 这列在每组的总和、平均、最大、最小值。

“食用“ pandas 的最佳方式,还是在 Jupyter notebook 里,以交互式的方式来分析数据,这种体验会让你不由感叹:人生苦短,我用 xx(😉)

scikit-learn

scikit-learn 是一个 Python 机器学习包,提供了大量机器学习算法,用户不需要知道算法的细节,只要通过几个简单的 high-level 接口就可以完成机器学习任务。当然现在很多算法都使用深度学习,但 scikit-learn 依然能作为基础机器学习库来串联整个流程。

我们以 K-最邻近算法为例,来看看用 scikit-learn 如何完成这个任务。

import pandas as pd
from sklearn.neighbors import NearestNeighbors

df = pd.read_csv('data.csv')  # 输入是 CSV 文件,包含 20万个向量,每个向量10个元素
nn = NearestNeighbors(n_neighbors=10)
nn.fit(df)
neighbors = nn.kneighbors(df)

fit接口就是 scikit-learn 里最常用的用来学习的接口。可以看到整个过程非常简单易懂。

Mars——Numpy、pandas 和 scikit-learn 的并行和分布式加速器

Python 数据科学栈非常强大,但它们有如下几个问题:

  1. 现在是多核时代,这几个库里鲜有操作能利用得上多核的能力。
  2. 随着深度学习的流行,用来加速数据科学的新的硬件层出不穷,这其中最常见的就是 GPU,在深度学习前序流程中进行数据处理,我们是不是也能用上 GPU 来加速呢?
  3. 这几个库的操作都是命令式的(imperative),和命令式相对应的就是声明式(declarative)。命令式的更关心 how to do,每一个操作都会立即得到结果,方便对结果进行探索,优点是很灵活;缺点则是中间过程可能占用大量内存,不能及时释放,而且每个操作之间就被割裂了,没有办法做算子融合来提升性能;那相对应的声明式就刚好相反,它更关心 what to do,它只关心结果是什么,中间怎么做并没有这么关心,典型的声明式像 SQL、TensorFlow 1.x,声明式可以等用户真正需要结果的时候才去执行,也就是 lazy evaluation,这中间过程就可以做大量的优化,因此性能上也会有更好的表现,缺点自然也就是命令式的优点,它不够灵活,调试起来比较困难。

为了解决这几个问题,Mars 被我们开发出来,Mars 在 MaxCompute 团队内部诞生,它的主要目标就是让 Numpy、pandas 和 scikit-learn 等数据科学的库能够并行和分布式执行,充分利用多核和新的硬件。

Mars 的开发过程中,我们核心关注的几点包括:

  1. 我们希望 Mars 足够简单,只要会用 Numpy、pandas 或 scikit-learn 就会用 Mars。
  2. 避免重复造轮子,我们希望能利用到这些库已有的成果,只需要能让他们被调度到多核/多机上即可。
  3. 声明式和命令式兼得,用户可以在这两者之间自由选择,灵活度和性能兼而有之。
  4. 足够健壮,生产可用,能应付各种 failover 的情况。

当然这些是我们的目标,也是我们一直努力的方向。

Mars tensor:Numpy 的并行和分布式加速器

上面说过,我们的目标之一是,只要会用 Numpy 等数据科学包,就会用 Mars。我们直接来看代码,还是以蒙特卡洛为例。变成 Mars 的代码是什么样子呢?

import mars.tensor as mt

N = 10 ** 10

data = mt.random.uniform(-1, 1, size=(N, 2))
inside = (mt.sqrt((data ** 2).sum(axis=1)) < 1).sum()
pi = (4 * inside / N).execute()
print('pi: %.5f' % pi)

可以看到,区别就只有两处:import numpy as np 变成 import mars.tensor as mt ,后续的 np. 都变成 mt. ;pi 在打印之前调用了一下 .execute() 方法。

也就是默认情况下,Mars 会按照声明式的方式,代码本身移植的代价极低,而在真正需要一个数据的时候,通过 .execute() 去触发执行。这样能最大限度得优化性能,以及减少中间过程内存消耗。

这里,我们还将数据的规模扩大了 1000 倍,来到了 100 亿个点。之前 1/1000 的数据量的时候,在我的笔记本上需要 757ms;而现在数据扩大一千倍,光 data 就需要 150G 的内存,这用 Numpy 本身根本无法完成。而使用 Mars,计算时间只需要 3min 44s,而峰值内存只需要 1G 左右。假设我们认为内存无限大,Numpy 需要的时间也就是之前的 1000 倍,大概是 12min 多,可以看到 Mars 充分利用了多核的能力,并且通过声明式的方式,极大减少了中间内存占用。

前面说到,我们试图让声明式和命令式兼得,而使用命令式的风格,只需要在代码的开始配置一个选项即可。

import mars.tensor as mt
from mars.config import options

options.eager_mode = True  # 打开 eager mode 后,每一次调用都会立即执行,行为和 Numpy 就完全一致

N = 10 ** 7

data = mt.random.uniform(-1, 1, size=(N, 2))
inside = (mt.linalg.norm(data, axis=1) < 1).sum()
pi = 4 * inside / N  # 不需要调用 .execute() 了
print('pi: %.5f' % pi.fetch())  # 目前需要 fetch() 来转成 float 类型,后续我们会加入自动转换

Mars DataFrame:pandas 的并行和分布式加速器

看过怎么样轻松把 Numpy 代码迁移到 Mars tensor ,想必读者也知道怎么迁移 pandas 代码了,同样也只有两个区别。我们还是以 movielens 的代码为例。

import mars.dataframe as md

ratings = md.read_csv('ml-20m/ratings.csv')
ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']}).execute()

Mars Learn:scikit-learn 的并行和分布式加速器

Mars Learn 也同理,这里就不做过多阐述了。但目前 Mars learn 支持的 scikit-learn 算法还不多,我们也在努力移植的过程中,这需要大量的人力和时间,欢迎感兴趣的同学一起参与。

import mars.dataframe as md
from mars.learn.neighbors import NearestNeighbors

df = md.read_csv('data.csv')  # 输入是 CSV 文件,包含 20万个向量,每个向量10个元素
nn = NearestNeighbors(n_neighbors=10)
nn.fit(df)  # 这里 fit 的时候也会整体触发执行,因此机器学习的高层接口都是立即执行的
neighbors = nn.kneighbors(df).fetch()  # kneighbors 也已经触发执行,只需要 fetch 数据

这里要注意的是,对于机器学习的 fitpredict 等高层接口,Mars Learn 也会立即触发执行,以保证语义的正确性。

RAPIDS:GPU 上的数据科学

相信细心的观众已经发现,GPU 好像没有被提到。不要着急,这就要说到 RAPIDS

在之前,虽然 CUDA 已经将 GPU 编程的门槛降到相当低的一个程度了,但对于数据科学家们来说,在 GPU 上处理 Numpy、pandas 等能处理的数据无异于天方夜谭。幸运的是,NVIDIA 开源了 RAPIDS 数据科学平台,它和 Mars 的部分思想高度一致,即使用简单的 import 替换,就可以将 Numpy、pandas 和 scikit-learn 的代码移植到 GPU 上。

image

其中,RAPIDS cuDF 用来加速 pandas,而 RAPIDS cuML 用来加速 scikit-learn。

对于 Numpy 来说,CuPy 已经很好地支持用 GPU 来加速了,这样 RAPIDS 也得以把重心放在数据科学的其他部分。

CuPy:用 GPU 加速 Numpy

还是蒙特卡洛求解 Pi。

import cupy as cp
 
N = 10 ** 7

data = cp.random.uniform(-1, 1, size=(N, 2))
inside = (cp.sqrt((data ** 2).sum(axis=1)) < 1).sum()
pi = 4 * inside / N
print('pi: %.5f' % pi)

在我的测试中,它将 CPU 的 757ms,降到只有 36ms,提升超过 20 倍,可以说效果非常显著。这正是得益于 GPU 非常适合计算密集型的任务。

RAPIDS cuDF:用 GPU 加速 pandas

import pandas as pd 替换成 import cudf,GPU 内部如何并行,CUDA 编程这些概念,用户都不再需要关心。

import cudf

ratings = cudf.read_csv('ml-20m/ratings.csv')
ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']})

运行时间从 CPU 上的 18s 提升到 GPU 上的 1.66s,提升超过 10 倍。

RAPIDS cuML:用 GPU 加速 scikit-learn

同样是 k-最邻近问题。

import cudf
from cuml.neighbors import NearestNeighbors

df = cudf.read_csv('data.csv')
nn = NearestNeighbors(n_neighbors=10)
nn.fit(df)
neighbors = nn.kneighbors(df)

运行时间从 CPU 上 1min52s,提升到 GPU 上 17.8s。

Mars 和 RAPIDS 结合能带来什么?

RAPIDS 将 Python 数据科学带到了 GPU,极大地提升了数据科学的运行效率。它们和 Numpy 等一样,是命令式的。通过和 Mars 结合,中间过程将会使用更少的内存,这使得数据处理量更大;Mars 也可以将计算分散到多机多卡,以提升数据规模和计算效率。

在 Mars 里使用 GPU 也很简单,只需要在对应函数上指定 gpu=True。例如创建 tensor、读取 CSV 文件等都适用。

import mars.tensor as mt
import mars.dataframe as md

a = mt.random.uniform(-1, 1, size=(1000, 1000), gpu=True)
df = md.read_csv('ml-20m/ratings.csv', gpu=True)

下图是用 Mars 分别在 Scale up 和 Scale out 两个维度上加速蒙特卡洛计算 Pi 这个任务。一般来说,我们要加速一个数据科学任务,可以有这两种方式,Scale up 是指可以使用更好的硬件,比如用更好的 CPU、更大的内存、使用 GPU 替代 CPU等;Scale out 就是指用更多的机器,用分布式的方式提升效率。

image

可以看到在一台 24 核的机器上,Mars 计算需要 25.8s,而通过分布式的方式,使用 4 台 24 核的机器的机器几乎以线性的时间提升。而通过使用一个 NVIDIA TESLA V100 显卡,我们就能将单机的运行时间提升到 3.98s,这已经超越了4台 CPU 机器的性能。通过再将单卡拓展到多卡,时间进一步降低,但这里也可以看到,时间上很难再线性扩展了,这是因为 GPU 的运行速度提升巨大,这个时候网络、数据拷贝等的开销就变得明显。

性能测试

我们使用了 https://github.com/h2oai/db-benchmark 的数据集,测试了三个数据规模的 groupby 和 一个数据规模的 join。而我们主要对比了 pandas 和 DASK。DASK 和 Mars 的初衷很类似,也是试图并行和分布式化 Python 数据科学,但它们的设计、实现、分布式都存在较多差异,这个后续我们再撰文进行详细对比。

测试机器配置是 500G 内存、96 核、NVIDIA V100 显卡。Mars 和 DASK 在 GPU 上都使用 RAPIDS 执行计算。

Groupby

数据有三个规模,分别是 500M、5G 和 20G。

查询也有三组。

查询一

df = read_csv('data.csv')
df.groupby('id1').agg({'v1': 'sum'})

查询二

df = read_csv('data.csv')
df.groupby(['id1', 'id2']).agg({'v1': 'sum'})

查询三

df = read_csv('data.csv')
df.gropuby(['id6']).agg({'v1': 'sum', 'v2': 'sum', 'v3': 'sum'})

数据大小 500M,性能结果

image

数据大小 5G,性能结果

image

数据大小 20G,性能结果

image

数据大小到 20G 时,pandas 在查询2会内存溢出,得不出结果。

可以看到,随着数据增加,Mars 的性能优势会愈发明显。

得益于 GPU 的计算能力,GPU 运算性能相比于 CPU 都有数倍的提升。如果单纯使用 RAPIDS cuDF,由于显存大小的限制,数据来到 5G 都难以完成,而由于 Mars 的声明式的特点,中间过程对显存的使用大幅得到优化,所以整组测试来到 20G 都能轻松完成。这正是 Mars + RAPIDS 所能发挥的威力。

Join

测试查询:

x = read_csv('x.csv')
y = read_csv('y.csv')
x.merge(y, on='id1')

测试数据 x 为500M,y 包含10行数据。

image

总结

RAPIDS 将 Python 数据科学带到了 GPU,极大提升了数据分析和处理的效率。Mars 的注意力更多放在并行和分布式。相信这两者的结合,在未来会有更多的想象空间。

Mars 诞生于 MaxCompute 团队,MaxCompute 原名 ODPS,是一种快速、完全托管的EB级数据仓库解决方案。Mars 即将通过 MaxCompute 提供服务,购买了 MaxCompute 服务的用户届时可以开箱即用体验 Mars 服务。敬请期待。

如果对 Mars 感兴趣,可以关注 Mars 团队专栏,或者钉钉扫二维码加入 Mars 讨论群。

IMG_8215

相关实践学习
在云上部署ChatGLM2-6B大模型(GPU版)
ChatGLM2-6B是由智谱AI及清华KEG实验室于2023年6月发布的中英双语对话开源大模型。通过本实验,可以学习如何配置AIGC开发环境,如何部署ChatGLM2-6B大模型。
目录
相关文章
|
存储 小程序 前端开发
深入理解微信授权登录流程、用户信息获取和Emoji的存储
深入理解微信授权登录流程、用户信息获取和Emoji的存储
822 0
|
设计模式 存储 算法
整理牛客网 ----- 阿里校招 Java 后端 1-5 面 + HR 面面经
整理牛客网 ----- 阿里校招 Java 后端 1-5 面 + HR 面面经
766 0
整理牛客网 ----- 阿里校招 Java 后端 1-5 面 + HR 面面经
|
存储 SQL 缓存
快手:从 Clickhouse 到 Apache Doris,实现湖仓分离向湖仓一体架构升级
快手 OLAP 系统为内外多个场景提供数据服务,每天承载近 10 亿的查询请求。原有湖仓分离架构,由离线数据湖和实时数仓组成,面临存储冗余、资源抢占、治理复杂、查询调优难等问题。通过引入 Apache Doris 湖仓一体能力,替换了 Clickhouse ,升级为湖仓一体架构,并结合 Doris 的物化视图改写能力和自动物化服务,实现高性能的数据查询以及灵活的数据治理。
961 3
快手:从 Clickhouse 到 Apache Doris,实现湖仓分离向湖仓一体架构升级
|
存储 JSON 分布式计算
StarRocks + Paimon 在阿里集团 Lakehouse 的探索与实践
阿里集团在推进湖仓一体化建设过程中,依托 StarRocks 强大的 OLAP 查询能力与 Paimon 的高效数据入湖特性,实现了流批一体、存储成本大幅下降、查询性能数倍提升的显著成效: A+ 业务借助 Paimon 的准实时入湖,显著降低了存储成本,并引入 StarRocks 提升查询性能。升级后,数据时效提前60分钟,开发效率提升50%;JSON列化存储减少50%,查询性能提升最高达10倍;OLAP分析中,非JOIN查询快1倍,JOIN查询快5倍。 饿了么升级为准实时Lakehouse架构后,在时效性仅损失1-5分钟的前提下,实现Flink资源缩减、StarRocks查询性能提升(仅5%
1128 60
StarRocks + Paimon 在阿里集团 Lakehouse 的探索与实践
|
10月前
|
分布式计算 运维 监控
Fusion 引擎赋能:流利说如何用阿里云 Serverless Spark 实现数仓计算加速
本文介绍了流利说与阿里云合作,利用EMR Serverless Spark优化数据处理的全过程。流利说是科技驱动的教育公司,通过AI技术提升用户英语水平。原有架构存在资源管理、成本和性能等痛点,采用EMR Serverless Spark后,实现弹性资源管理、按需计费及性能优化。方案涵盖数据采集、存储、计算到查询的完整能力,支持多种接入方式与高效调度。迁移后任务耗时减少40%,失败率降低80%,成本下降30%。未来将深化合作,探索更多行业解决方案。
711 1
|
机器学习/深度学习 分布式计算 供应链
阿里云先知安全沙龙(上海站) ——大模型基础设施安全攻防
大模型基础设施的安全攻防体系涵盖恶意输入防御和基础设施安全,包括框架、三方库、插件、平台、模型和系统安全。关键漏洞如CVE-2023-6019(Ray框架命令注入)、CVE-2024-5480(PyTorch分布式RPC)及llama.cpp中的多个漏洞,强调了代码安全性的重要性。模型文件安全方面,需防范pickle反序列化等风险,建议使用Safetensors格式。相关实践包括构建供应链漏洞库、智能化漏洞分析和深度检测,确保全方位防护。
|
存储 安全 Java
打造智能合同管理系统:SpringBoot与电子签章的完美融合
【10月更文挑战第7天】 在数字化转型的浪潮中,电子合同管理系统因其高效、环保和安全的特点,正逐渐成为企业合同管理的新宠。本文将分享如何利用SpringBoot框架实现一个集电子文件签字与合同管理于一体的智能系统,探索技术如何助力合同管理的现代化。
742 4
Vue3二维码(QRCode)
这是一个可高度定制的二维码生成组件,支持在线预览。提供了丰富的属性设置,包括扫描文本、二维码大小、颜色、背景色、边框、边框颜色、像素比例及纠错等级等。安装简单,通过 `pnpm` 引入插件,创建 `QRCode.vue` 组件即可使用。适用于多种应用场景,如生成不同样式的二维码、动态调整大小和内容等。
1318 6
Vue3二维码(QRCode)
|
缓存 分布式计算 Hadoop
HBase在高并发场景下的性能分析
HBase在高并发场景下的性能受到多方面因素的影响,包括数据模型设计、集群配置、读写策略及性能调优等。合理的设计和配置可以显著提高HBase在高并发环境下的性能。不过,需要注意的是,由于项目和业务需求的不同,性能优化并没有一劳永逸的解决方案,需要根据实际情况进行针对性的调整和优化。
500 8