产品概述
分布式计算框架MaxCompute MaxFrame是阿里云推出的一款专为大数据处理和AI开发设计的解决方案。它基于Python编程接口,并完全兼容Pandas数据操作库,使得用户能够以熟悉的方式高效地进行数据处理。MaxFrame能够自动将Pandas算子转化为分布式计算任务,利用MaxCompute的海量弹性计算资源,实现数据的并行处理和加速计算。这一框架直连MaxCompute数据,无需本地数据传输,降低了开发成本,提高了处理效率,适用于大规模数据处理、科学计算及机器学习/AI开发等多种应用场景。架构图如下:
实践体验
服务开通
因为接下来体验的两个实践均会使用到大数据开发治理平台DataWorks和大数据计算服务MaxCompute,所以首先我们要对这个产品或服务进行开通。
- DataWorks开通
对于大数据开发治理平台DataWorks服务的开通有两种方式,一是针对老用户可以参照如下方式进行购买:
进入DataWorks服务开通页,地域选择华东1(杭州)、基础版、按量付费,其他默认。
勾选服务协议,点击确认订单并支付。
校验通过后,点击下一步。
价格清单确认无误后,点击下一步创建订单。
在支付页面,点击支付即可。
当出现下图这个页面时,就表示DataWorks服务已成功创建。
如果你是新用户,则可以直接进入免费试用首页找到对应产品开通即可:
在产品开通页面填入资源组名称,因为试用默认只有一个可用区,所以这里保持默认。
如果是首次试用DataWorks产品,这里需要先关联角色,点击创建关联角色即可。
确认如上信息无误后,点击立即试用。
点击页面的管理试用,可以在费用与成本中看到试用详情。
- MaxCompute开通
接下来继续开通大数据计算服务MaxCompute,进入产品控制台,点击立即开通。
这里唯一需要注意的一点就是,产品的可用区要保持和DataWorks一致,比如这里的华东2(上海)。
提交后,进入下一步
继续下一步
在支付页面点击立即支付即可。
出现下图就表明开通成功。
使用MaxFrame
按照官方文档,MaxFrame的使用有三种方式,接下来逐个阐述:
在本地环境中使用
本地环境使用MaxFrame有两个大前提,一是本地系统中已安装3.7或3.11版本的Python环境且有pip工具。并通过如下命令安装:
#安装maxframe pip install --upgrade maxframe #验证maxframe是否安装成功 python -c "import maxframe.dataframe as md"
二是需要新建一个MaxCompute项目。登录MaxCompute控制台,在左上角选择地域。在左侧导航栏选择工作区 > 项目管理,并单击新建项目。如下:
这里先创建一个测试的python文件,代码如下:
import os
import maxframe.dataframe as md
from odps import ODPS
from maxframe import new_session
# 创建MaxCompute入口
o = ODPS(
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET提前写入电脑的系统变量
# project填写MaxCompute新建的项目名称
# endpoint填写MaxCompute的公网访问地址URL
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
table = o.create_table("test_source_table", "a string, b bigint", if_not_exists=True)
with table.open_writer() as writer:
writer.write([
["value1", 0],
["value2", 1],
])
# 创建MaxFrame session
session = new_session(o)
df = md.read_odps_table("test_source_table",index_col="b")
df["a"] = "prefix_" + df["a"]
# 打印dataframe数据
print(df.execute().fetch())
# MaxFrame DataFrame数据写入MaxCompute表
md.to_odps_table(df, "test_prefix_source_table").execute()
# 销毁 maxframe session
session.destroy()
直接运行python maxframetest.py文件,结果如下:
接着进入MaxCompute控制台,点击左侧导航栏的的SQL分析,查询上述表格数据是否创建成功。如下:
通过以上操作,我们已经成功在本地环境中安装了MaxFrame,并成功连接到MaxCompute。
在DataWorks中使用
为了配合接下来的实验,这里首先需要在MaxCompute中新建两个项目,一个用于生产环境,一个用于开发环境。
进入DataWorks控制台,绑定MaxCompute数据源。如下:
在新建计算资源页面分别关联MaxCompute的两个项目,如下:
点击下一步,就可以看到绑定已经成功。
接下来在DataWorks的数据开发页面创建PyODPS 3节点,如下:
在MaxCompute会话框中写入如下代码:
import maxframe.dataframe as md
from maxframe import new_session
from maxframe.config import options
options.sql.enable_mcqa = False
table = o.create_table("test_source_table", "a string, b bigint", if_not_exists=True)
with table.open_writer() as writer:
writer.write([
["value1", 0],
["value2", 1],
])
# 创建MaxFrame session
session = new_session(o)
df = md.read_odps_table("test_source_table",index_col="b")
df["a"] = "prefix_" + df["a"]
# 打印dataframe数据
print(df.execute().fetch())
# MaxFrame DataFrame数据写入MaxCompute表
md.to_odps_table(df, "test_prefix_source_table").execute()
# 销毁 maxframe session
session.destroy()
直接运行代码,结果如下:
同样地,我们前往MaxCompute控制台,执行一个SQL分析,结果如下:
到这,我们也完成了在大数据开发治理平台DataWorks中完成了MaxFrame的安装,并实现了与大数据计算服务MaxCompute的联动。
在DataWorks镜像中使用
最后,让我们看看最后一种方式体验如何。首先登录DataWorks控制台,切换至DataWorks工作空间所在地域后,单击左侧导航栏的镜像管理,选择自定义镜像,填入对应信息。
接着,点击发布,在正式发布前,需要对镜像进行测试,如下:
完成测试后,点击发布,成功发布后修改自定义镜像的归属工作空间。
单击右侧调度配置,配置资源属性,选择与自定义镜像发布绑定的资源组保持一致。
接下来的操作同第二种方式一致,这里就不再赘述。到这,MaxFrame的三种使用方式均已体验,下面开始正式的实践体验。
基于MaxFrame实现分布式Pandas处理
基于上述搭建的本地环境,我们使用如下代码直接进行测试:
from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
import pandas as pd
import os
o = ODPS(
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET提前写入电脑的系统变量
# project填写MaxCompute新建的项目名称
# endpoint填写MaxCompute的公网访问地址URL
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
data_sets = [{
"table_name": "product",
"table_schema" : "index bigint, product_id bigint, product_name string, current_price bigint",
"source_type": "records",
"records" : [
[1, 100, 'Nokia', 1000],
[2, 200, 'Apple', 5000],
[3, 300, 'Samsung', 9000]
],
},
{
"table_name" : "sales",
"table_schema" : "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint",
"source_type": "records",
"records" : [
[1, 1, 100, 101, 2008, 10, 5000],
[2, 2, 300, 101, 2009, 7, 4000],
[3, 4, 100, 102, 2011, 9, 4000],
[4, 5, 200, 102, 2013, 6, 6000],
[5, 8, 300, 102, 2015, 10, 9000],
[6, 9, 100, 102, 2015, 6, 2000]
],
"lifecycle": 5
}]
def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
for index, data in enumerate(data_sets):
table_name = data.get("table_name")
table_schema = data.get("table_schema")
source_type = data.get("source_type")
if not table_name or not table_schema or not source_type:
raise ValueError(f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'.")
lifecycle = data.get("lifecycle", 5)
table_name += suffix
print(f"Processing {table_name}...")
if drop_if_exists:
print(f"Deleting {table_name}...")
o.delete_table(table_name, if_exists=True)
o.create_table(name=table_name, table_schema=table_schema, lifecycle=lifecycle, if_not_exists=True)
if source_type == "local_file":
file_path = data.get("file")
if not file_path:
raise ValueError(f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key.")
sep = data.get("sep", ",")
pd_df = pd.read_csv(file_path, sep=sep)
ODPSDataFrame(pd_df).persist(table_name, drop_table=True)
elif source_type == 'records':
records = data.get("records")
if not records:
raise ValueError(f"Dataset at index {index} with source_type 'records' is missing the 'records' key.")
with o.get_table(table_name).open_writer() as writer:
writer.write(records)
else:
raise ValueError(f"Unknown data set source_type: {source_type}")
print(f"Processed {table_name} Done")
prepare_data(o, data_sets, "_maxframe_demo", True)
进入MaxCompute控制台,执行SQL分析,分别查询通过上述代码导入的测试数据。
接下来通过如下代码使用MaxFrame进行数据分析,来获取所有产品对应的年份和价格。
from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
import os
o = ODPS(
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET提前写入电脑的系统变量
# project填写MaxCompute新建的项目名称
# endpoint填写MaxCompute的公网访问地址URL
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
session = new_session(o)
#session id是一串用于关联MaxFrame task的字符,对于调试和追踪任务状态有重要的作用。
print(session.session_id)
sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
#这里的df并不会立即执行,除非您使用df.execute()来触发。
#这意味着所有的计算都将最终完全在MaxCompute集群完成,避免了中间所不必要的数据传输和阻塞。
df = sales.merge(product, left_on="product_id", right_index=True)
df = df[["product_name", "year", "price"]]
print(df.execute().fetch())
#保存结果到MaxCompute表中,并销毁Session
md.to_odps_table(df, "result_df", overwrite=True).execute()
session.destroy()
运行结果如下:
通过如下代码实现查询已售产品对应的年份,价格和数量。
from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
import os
o = ODPS(
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET提前写入电脑的系统变量
# project填写MaxCompute新建的项目名称
# endpoint填写MaxCompute的公网访问地址URL
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
session = new_session(o)
#session id是一串用于关联MaxFrame task的字符,对于调试和追踪任务状态有重要的作用。
print(session.session_id)
# 聚合获取每个产品的第一个年份
min_year_df = md.read_odps_table("sales_maxframe_demo", index_col="index")
min_year_df = min_year_df.groupby('product_id', as_index=False).agg(first_year=('year', 'min'))
# join 找到对应的销售记录
sales = md.read_odps_table("sales_maxframe_demo", index_col=['product_id', 'year'])
result_df = md.merge(sales, min_year_df,
left_index=True,
right_on=['product_id','first_year'],
how='inner')
#这里的result_df并不会立即执行,除非您使用 result_df.execute()来触发。
#这意味着所有的计算都将最终完全在MaxCompute中集群完成,避免了中间所不必要的数据传输和阻塞。
result_df = result_df[['product_id', 'first_year', 'quantity', 'price']]
print(result_df.execute().fetch())
#销毁 Session
session.destroy()
又通过如下代码实现查询用户消费最多的产品。
from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
import os
o = ODPS(
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET提前写入电脑的系统变量
# project填写MaxCompute新建的项目名称
# endpoint填写MaxCompute的公网访问地址URL
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
session = new_session(o)
#session id 是一串用于关联 MaxFrame task 的字符,对于调试和追踪任务状态有重要的作用。
print(session.session_id)
sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
sales['total'] = sales['price'] * sales['quantity']
product_cost_df = sales.groupby(['product_id', 'user_id'], as_index=False).agg(user_product_total=('total','sum'))
product_cost_df = product_cost_df.merge(product, left_on="product_id", right_index=True, how='right')
user_cost_df = product_cost_df.groupby('user_id').agg(max_total=('user_product_total', 'max'))
merge_df = product_cost_df.merge(user_cost_df, left_on='user_id', right_index=True)
#这里的 result_df 并不会立即执行,除非您使用 result_df.execute()来触发。
#这意味着所有的计算都将最终完全在 MaxCompute 中集群完成,避免了中间所不必要的数据传输和阻塞。
result_df = merge_df[merge_df['user_product_total'] == merge_df['max_total']][['user_id', 'product_id']].drop_duplicates().sort_values(['user_id'], ascending = [1])
print(result_df.execute().fetch())
#销毁 Session
session.destroy()
通过实验不难发现,使用MaxFrame进行数据分析比起传统的查询效率明显是提升的。
基于MaxFrame实现大语言模型数据处理
通过如下代码实现对敏感信息的脱敏处理。
import os
import time
import numpy as np
import maxframe.dataframe as md
from odps import ODPS
from maxframe import new_session
# from maxframe.udf import with_resource_libraries
from maxframe.config import options
from maxframe import config
o = ODPS(
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET提前写入电脑的系统变量
# project填写MaxCompute新建的项目名称
# endpoint填写MaxCompute的公网访问地址URL
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='maxframetest',
endpoint='https://service.cn-hangzhou.maxcompute.aliyun.com/api',
)
config.options.sql.settings = {
"odps.session.image": "common"
}
def clean_copyright(row):
import re
pat = re.compile('/\\*[^*]*\\*+(?:[^/*][^*]*\\*+)*/')
cpat = re.compile('copyright', re.IGNORECASE)
text = row['content']
if not text:
return row
r = pat.search(text)
if r:
span = r.span()
sub = text[span[0]:span[1]]
if cpat.search(sub):
# cut it
text = text[:span[0]] + text[span[1]:]
row['content'] = text
return row
lines = text.split('\n')
skip = 0
for k in range(len(lines)):
if (lines[k].startswith('//') or lines[k].startswith('#')
or lines[k].startswith('--') or not lines[k]):
skip = skip + 1
else:
break
if skip:
text = '\n'.join(lines[skip:])
row['content'] = text
return row
def maxframe_job():
s_time = time.time()
table_name = 'bigdata_public_dataset.data_science.llm_redpajama_github_demo_data'
session = new_session(o)
print('session id: ', session.session_id)
df = md.read_odps_table(table_name, index_col='id')
df = df.apply(
clean_copyright,
axis=1, # row
output_type="dataframe",
)
out_table = 'tmp_mf_clean_copyright'
md.to_odps_table(df, out_table).execute()
session.destroy()
maxframe_job()
由于结果返回很多,所以这里只显示100条。
虽然数据量很大,但查询效率一点也没影响。
体验总结
整体感受
1、在使用MaxFrame时,Python编程接口非常易于上手,特别是对于熟悉Python开发的用户来说,MaxFrame内置Python开发环境,开箱即用,对于初学者来说无需额外配置开发环境,可以直接上手进行Python开发。几乎可以无缝过渡到MaxFrame的使用中。然而,在使用过程中,我也遇到了一些不便之处。对于初学者来说,虽然MaxFrame提供了一些文档和入门演示,但在实际操作中,如果遇到具体问题,仍然希望能够有更便捷的在线支持或社区交流渠道。
2、在体验MaxFrame的过程中,产品功能基本满足了我的预期,我能够轻松地进行各种数据处理和分析任务。此外,MaxFrame还与MaxCompute Notebook、镜像管理等功能共同构成了完整的Python开发生态,提升了我在MaxCompute上的Python开发体验。然而,对于数据的可视化和交互分析方面,我希望能够有更多的支持。建议在MaxFrame中增加更多的数据可视化和交互分析功能,以便用户能够更加直观地了解数据和分析结果。
3、针对AI数据处理和Pandas处理场景,首先期待在后续版本中进一步优化算子性能,以满足更加复杂和高效的计算需求。此外,建议在MaxFrame中增加更多的内置函数和算子,以便用户能够更加便捷地进行数据处理和分析任务。
问题反馈
1、首先,测评提供的试用产品链接是不全的,活动首页仅提供了大数据计算服务MaxCompute的试用,并没有提供大数据开发治理DataWorks的试用。
2、最佳实践文档存在多处内容描述不全或模糊不清问题,比如实例代码中就缺少对于ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET需设置为本机系统变量的具体说明,仅简单一笔带过,对新手小白非常不友好。
再比如,还有下图这段,用户即可以使用RAM权限,也可以直接使用云账号权限,对于临时测试,云账号权限会直接了当。为了更好让用户理解,应该配上具体配置的截图,而不是给个链接让用户去找,对于新手用户非常不友好,也许不愿折腾的用户到这就放弃了。
此外,还有下图这个,对于第一次使用MaxCompute产品的用户,这个描述多少有点模糊,更改为控制台上具体的功能描述可能会更好,比如进行SQL分析。或者配上具体截图也能更直观表述。
文档中对于性能的对比描述也非常简略,缺少相关运行截图的佐证。
在如何利用DataWorks自定义镜像安装MaxFrame内容描述中,就存在容易理解模糊问题,下图这段用户可能就没法继续了,因为实例默认开通的就是Serverless资源组。
在大语言模型数据处理的内容中也存在缺乏问题,下图的SQL应该添加limit输出限制,按照文档SQL是没法输出结果的。
3、最佳实践内容明显缺失,比如对于如何使用MaxFrame文档是有提到三种方式,可到了最佳实践这里,实例代码仅提供了本地环境实现这一种方式,显然是不完整的。