随着大数据技术的不断发展,分布式计算框架在数据处理和分析领域扮演着越来越重要的角色。阿里云自研的MaxCompute MaxFrame(简称“MaxFrame”)作为一种专为大数据处理设计的分布式计算框架,旨在提供高效、便捷的Python开发体验。本评测基于MaxFrame最佳实践来进行体验。
一、产品功能特性
- Python编程接口:MaxFrame支持Python编程接口,使得开发者可以使用熟悉的Python语言进行大数据处理和分析,降低了学习成本,提高了开发效率。
- 直接利用MaxCompute资源:MaxFrame能够直接使用MaxCompute的计算资源和数据接口,无需额外配置或迁移数据,简化了数据处理流程。
- 与MaxCompute Notebook集成:与MaxCompute Notebook的无缝集成,为开发者提供了一个交互式开发环境,支持代码编写、调试和结果可视化,进一步提升了开发体验。
- 镜像管理功能:MaxFrame提供镜像管理功能,允许开发者自定义开发环境,包括安装所需的Python库和依赖项,增强了开发环境的灵活性和可定制性。
二、在DataWorks中使用MaxFrame
1、创建MaxCompute数据源
登录MaxCompute控制台,在左上角选择地域。在左侧导航栏选择工作区 > 项目管理,并单击新建项目。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的更多 > 管理中心,在下拉框中选择对应工作空间后单击进入管理中心。
进入工作空间管理中心页面后,单击左侧导航栏的数据源 > 数据源列表,进入数据源页面。
单击新增数据源,选择MaxCompute,根据界面指引创建数据源。
2、进入DataWorks的数据开发页面创建PyODPS 3节点。
3、创建MaxFrame会话。
在目标MaxCompute项目中运行如下SQL,查询test_prefix_source_table表的数据
SELECT * FROM test_prefix_source_table;
返回结果:
三、基于MaxFrame实现分布式Pandas处理
1、在安装了MaxFrame的Python环境下运行如下脚本,准备测试表和测试数据。
连接到上海的节点有超时。
2、把endpoint换成VPC节点,很快就成功了。
https://service.cn-shanghai-vpc.maxcompute.aliyun-inc.com/api
、
3、查询sales_maxframe_demo表和product_maxframe_demo表的数据
场景1:使用merge方法连接两张数据表,以获取sales_maxframe_demo表中所有sale_id对应的product_name以及该产品的所有year和price
在sales表数据量为5000W条(size:1.96 GB),product表数据量为10W条(size:3 MB)的数据样本中进行运算MaxFrame的haish耗时为48.539秒。
场景2:选出每个出售过的产品第一年销售的产品ID、年份、数量和价格
第一次运行失败
试了好几次不知道为什么一直报错
换个endpoint节点。还是超时
场景3:为每个用户获取其消费最多的产品ID
也是运行失败
四、基于MaxFrame实现大语言模型数据处理
对原始数据进行分析,若其中含有“Copyright”等版权信息,需要对该类敏感信息进行去除。
例如:repo_name值为“menuka94/cdnjs”时,对应的content字段中含有“Copyright”等版权信息。
1、创建MaxCompute入口类。
2、引用MaxCompute内置镜像common,其中包含Python环境及本次数据处理所需的regex等第三方包。
config.options.sql.settings = {
"odps.session.image": "common"
}
报错如下:
修改为如下之后,正常运行成功。
from maxframe import config
# 在new_session之前添加
config.options.sql.settings = {
"odps.session.image": "common",
}
3、通过UDF构建数据处理逻辑。
4、创建MaxFrame Session,提交作业至MaxCompute。
def maxframe_job():
s_time = time.time()
table_name = 'bigdata_public_dataset.data_science.llm_redpajama_github_demo_data'
session = new_session(o)
print('session id: ', session.session_id)
df = md.read_odps_table(table_name, index_col='id')
df = df.apply(
clean_copyright,
axis=1, # row
output_type="dataframe",
)
out_table = 'tmp_mf_clean_copyright'
md.to_odps_table(df, out_table).execute()
session.destroy()
maxframe_job()
需要连着上面3个步骤一起执行。
5、查询tmp_mf_clean_copyright表,对之前含有“Copyright”等版权信息的数据进行查看,已去除敏感信息。
SELECT * FROM tmp_mf_clean_copyright;
返回有报错:
修改语句如下:
SELECT * FROM tmp_mf_clean_copyright limit 100;
返回结果如下:
MaxCompute已与阿里云人工智能平台PAI成功对接,您可基于PAI Desinger进行更多LLM算子的开发和使用。
五、测评反馈
本次测评是针对官方提供的MaxFrame产品最佳实践文档来进行实践体验。下面是体验后的一些反馈。
1、首先对MaxFrame产品使用,是需要掌握前置知识的,比如MaxCompute的使用,Python的知识等,若是新手直接来使用该产品可能比较困难,希望可以在MaxFrame产品的文档中多加些Datawork和MaxCompute的操作步骤。
2、MaxCompute官方文档提供的提供内容过于简单,有些超链接是无效的。这个链接一直跳回到这个界面,但这里没有具体的操作步骤,和相关截图,提供不了太多的帮助。
3、在基于MaxFrame实现分布式Pandas处理的实践中。
文档这里建议的endpoint类型一直都是超时,选择VPC类型才能正常执行。
使用MaxFrame进行数据分析的场景2和场景3执行总是报错。里面只需要修改Access Key ID、Access Key Secret、project、endpoint这四个参数,始终无法运行成功,不知道是哪里的问题。
环境中没有本地Pandas(版本为1.3.5),所以也无法实现结果对比。
4、在基于MaxFrame实现大语言模型数据处理的实践中。
文档描述的这4个步骤是做代码的分部说明,但需要整体来执行。我按照每一步来执行,导致执行的时候会有依赖报错。后来才明白要一起执行,这个还是建议在文档中说明一下,否则也有会有人和我犯一样的错误。
数据查询这个SQL语句,是需要在maxcompute中执行,这个也要说明下。同时SELECT * FROM tmp_mf_clean_copyright; 需要加限制,否则行太多报错。建议把这个语句优化下。
MaxFrame在工作和学习中的一些优势:
1、MaxFrame结合MaxCompute的强大算力,在工作中可以快速完成数据清洗、特征构建(如用户画像特征、时间序列特征)、数据分片与批量处理,支持下游模型训练。
2、MaxFrame通过纯Python编程接口,无需掌握复杂的分布式计算模型,降低了学习门槛。
3、MaxFrame能够为AI模型训练提供强大的数据支持。应用于如下业务场景,快速处理超大规模的训练数据集,为深度学习模型提供优质数据输入;动态资源扩展,减少模型训练前数据准备的时间,使学习者能够更多地聚焦于模型算法本身。