前言
随着数据量的爆炸式增长,传统的数据分析和处理方法逐渐显现出其局限性,尤其是在面对海量数据集时,计算资源的瓶颈和处理速度成为了亟待解决的问题。与此同时,Python作为一门广泛应用于数据科学、机器学习和人工智能领域的编程语言,因其简洁易用的语法和丰富的库支持而备受青睐。
正是在这样的背景下,阿里云推出了MaxFrame——一个专为Python开发者设计的分布式计算框架。MaxFrame旨在充分利用MaxCompute这一云原生大数据计算平台的强大能力,提供给用户一个高效、灵活且易于使用的工具,以应对大规模数据处理的需求。它不仅继承了Pandas等流行数据处理库的友好接口,还通过集成先进的分布式计算技术,显著提升了数据处理的速度和效率。
虽然我确实对MaxCompute和DataWorks等大数据产品比较熟悉,但日常工作更多是集中在管理和架构层面,几乎没有涉及过底层开发。因此,这次撰写关于MaxFrame的文章,对我来说既是一次挑战也是一次宝贵的学习机会,希望通过深入探索这款工具的功能和应用场景,能为读者带来有价值见解的同时,也能进一步提升自己在分布式计算和大数据处理方面的能力。
产品概述
一句话总结就是:MaxFrame对接MaxCompute,提供Python编程接口,完全兼容Pandas API,并能自动执行分布式计算。
MaxFrame商品销售分析实验
安装 MaxFrame
通过执行 pip install maxframe 命令直接安装 MaxFrame SDK,安装 alibabacloud_credentials 依赖,以便实现免密登录。
!pip install maxframe alibabacloud_credentials -U
准备项目
准备 MaxCompute 项目,并将其绑定到当前 DSW 空间,用于后续计算
登录 MaxCompute 控制台,在左侧导航栏选择【工作区 > 项目管理】,查看 MaxCompute 项目名称
目标 MaxCompute 项目所在地域的 Endpoint,可根据网络连接方式自行选择,例如 http://service.cn-chengdu.maxcompute.aliyun.com/api ,我这里选择的是杭州
https://service.cn-hangzhou.maxcompute.aliyun.com/api
最终准备如下:
PROJECT_NAME = "[你的项目名]"
ENDPOINT = "https://service.cn-hangzhou.maxcompute.aliyun.com/api"
准备数据
创建实验所需的小规模测试数据
import pandas as pd
import numpy as np
from odps import ODPS
from odps.df import DataFrame as ODPSDataFrame
from alibabacloud_credentials import providers
from odps.accounts import CredentialProviderAccount
# 创建 odps 对象
# 基于实例 RAM 角色访问 ODPS
account = CredentialProviderAccount(providers.DefaultCredentialsProvider())
o = ODPS(
account=account,
PROJECT_NAME = "[你的项目名]"
ENDPOINT = "https://service.cn-hangzhou.maxcompute.aliyun.com/api"
)
# 测试数据
data_sets = [
{
"table_name": "product",
"table_schema": "index bigint, product_id bigint, product_name string, current_price bigint",
"source_type": "records",
"records": [
[1, 100, "Nokia", 1000],
[2, 200, "Apple", 5000],
[3, 300, "Samsung", 9000],
[4, 500, "HP", 7000],
],
"lifecycle": 5,
},
{
"table_name": "sales",
"table_schema": "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint",
"source_type": "records",
"records": [
[1, 1, 100, 101, 2008, 10, 5000],
[2, 2, 300, 101, 2009, 7, 4000],
[3, 4, 100, 102, 2008, 9, 4000],
[4, 5, 200, 102, 2010, 6, 6000],
[5, 8, 300, 102, 2008, 10, 9000],
[6, 9, 100, 102, 2009, 6, 2000],
[7, 13, 500, 104, 2007, 3, 8000],
],
"lifecycle": 5,
},
]
# 创建表
def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
for index, data in enumerate(data_sets):
table_name = data.get("table_name")
table_schema = data.get("table_schema")
source_type = data.get("source_type")
if not table_name or not table_schema or not source_type:
raise ValueError(
f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'."
)
lifecycle = data.get("lifecycle", 5)
table_name += suffix
print(f"Processing {o.project}.{table_name}...")
if drop_if_exists:
print(f"Deleting {o.project}.{table_name} if exists...")
o.delete_table(table_name, if_exists=True)
o.create_table(
name=table_name,
table_schema=table_schema,
lifecycle=lifecycle,
if_not_exists=True,
)
if source_type == "local_file":
file_path = data.get("file")
if not file_path:
raise ValueError(
f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key."
)
sep = data.get("sep", ",")
pd_df = pd.read_csv(file_path, sep=sep)
ODPSDataFrame(pd_df).persist(table_name, drop_table=True)
elif source_type == "records":
records = data.get("records")
if not records:
raise ValueError(
f"Dataset at index {index} with source_type 'records' is missing the 'records' key."
)
with o.get_table(table_name).open_writer() as writer:
writer.write(records)
else:
raise ValueError(f"Unknown data set source_type: {source_type}")
print(f"Processed {o.project}.{table_name} Done")
prepare_data(o, data_sets, "_maxframe_demo", True)
注意,此处使用刚才的项目名称和ENDPOINT地址进行替换
# 创建 odps 对象
# 基于实例 RAM 角色访问 ODPS
account = CredentialProviderAccount(providers.DefaultCredentialsProvider())
o = ODPS(
account=account,
PROJECT_NAME = "[你的项目名]"
ENDPOINT = "https://service.cn-hangzhou.maxcompute.aliyun.com/api"
)
使用 MaxFrame 进行数据分析
MaxFrame 依赖于 MaxCompute 的资源来执行计算,因此首先需要建立一个 MaxCompute 会话。new_session 是 MaxFrame 创建会话的入口。在创建会话之后,所有后续的计算都将默认使用该会话进行通信,MaxFrame 会在远端保存中间状态
在创建完成后,可以从 session 对象中获得 session_id 和 logview_address,前者是该会话的唯一 id,后者是用于查看在该 session 中产生的所有计算的作业执行情况
from maxframe.session import new_session
import maxframe.dataframe as md
# 初始化 MaxFrame Session
session = new_session(o)
# 打印 Session ID
print(f"MaxFrame Session ID: {session.session_id}")
# 打印 LogView 地址
print(f"MaxFrame LogView 地址: {session.get_logview_address()}")
此时会得到session_id 和 logview_address如下:
演示的场景是使用 MaxFrame 对商品和销售数据进行分析。这些数据统一存储于两张 MaxCompute 数据表中:商品表包含商品、价格等信息;销售表包含客户、产品、销售数量、销售年份、销售价格等信息
与 Pandas 一样,在获得 dataframe 对象之后,可以使用 dtypes 来查看数据的 schema。在下例中,md.read_odps_table 会获得 maxframe.DataFrame 对象
print('--------------Product Table Schema-----------------------')
product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
print(product.dtypes)
print('\n--------------Sales Table Schema-----------------------')
sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
print(sales.dtypes)
在 MaxFrame 中,DataFrame 可以使用与 Pandas 兼容的接口进行计算,例如下例中的 head。但与 Pandas 不同的是,MaxFrame 中的数据并不存储在本地。MaxFrame 通过 Lazy 计算可以在本地使用各种算子,并通过 execute() 来触发计算一并提交到 MaxCompute 集群中分布式执行
MaxFrame 已经对 Notebook 进行了适配,在 execute() 之后能够高效预览 MaxCompute 中数据的首末数行。
sales.head(5).execute()
利用 MaxFrame 的功能,能够使用与 Pandas 兼容的语法进行数据分析。所有算子的设计均为 分布式执行,确保计算的高效性。 相比于单机 Pandas,MaxFrame 的 Pandas 算子运行在 MaxCompute 计算集群中,能够大规模并行处理数据,数据读取效率也明显优于单机 Pandas。
场景 1:Pandas merge接口的分布式执行
连接两张数据表,以获取 product_maxframe_demo_large 表中所有 sale_id 对应的 product_name 以及该产品的所有 year 和 price
# 默认打印 Logview 信息
import logging
logging.basicConfig(level=logging.INFO)
sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
# 这里的df并不会立即执行,除非您使用df.execute()来触发。
# 这意味着所有的计算都将最终完全在MaxCompute集群完成,避免了中间所不必要的数据传输和阻塞。
df = sales.merge(product, left_on="product_id", right_index=True)
df = df[["product_name", "year", "price"]]
execute_result = df.execute()
df.execute() 会触发在 MaxCompute 集群上进行分布式计算,可以在 Logview 中查看当前计算的执行详情。
在内部测试中,Product 和 Sales 两张数据表的 6.8TB 数据在 MaxFrame 的 merge 算子中仅需 3 分钟即可完成计算。相比之下,在本地使用 Pandas 处理 6.8TB 的数据会超出内存限制,无法执行
场景 2:Pandas groupby、agg、sort_values 接口的分布式执行
连接 Product 和 Sales 两张数据表,聚合每个产品在 Sales 表中的首次售出年份
# 聚合每个产品的首次售出年份
min_year_df = md.read_odps_table("sales_maxframe_demo", index_col="index")
min_year_df = min_year_df.groupby('product_id', as_index=False).agg(first_year=('year', 'min'))
# 计算不同首次售出年份的产品 ID 数量
sum_product_min_year_df = min_year_df.groupby('first_year', as_index=False).agg(total_product_num=('product_id', 'count'))
exe_info = sum_product_min_year_df.execute()
在执行完上述分布式计算后,如果本地内存允许,可以对 execute() 的结果使用 fetch() 将结果拉全部取到本地使用,抓取后的数据会形成 Pandas DataFrame,因此对于 MaxFrame 尚无法支持的场景您也可以轻松的和本地的工作流结合。
当然,需要考虑取数的耗时、带宽和本地内存的限制,避免频繁的抓取和上传
local_df = exe_info.fetch().head(10)
local_df
在提取并展示了记录后,还可以使用 Matplotlib 生成图表来可视化数据。以下是一个示例代码,用于绘制每个首次售出年份的产品数量分布图
import matplotlib.pyplot as plt
plt.plot(local_df['first_year'],local_df['total_product_num'])
plt.xlabel('First_sale_year')
plt.ylabel('Total_product_num')
plt.title('Saled product distribution by first saled year')
plt.xticks(range(int(local_df['first_year'].min()), int(local_df['first_year'].max()) + 1))
plt.show()
在所有计算完成后,请确保使用 destroy() 方法来销毁会话,以释放资源
session.destroy()
基于 UDF 加载 OSS 中的 FastText 模型进行分布式语言识别实验
MaxFrame User Define Function 提供了灵活的开发能力,支持您提供原生的 Python 函数在上万机器上分布式执行,这对于海量数据需要离线推理的场景极为重要
本文以 fasttext 模型为例,将带您了解 MaxFrame 的开发流程,以及如何在 MaxFrame UDF 中访问 OSS 并加载模型进行文本语言识别
执行 pip install maxframe 命令直接安装 MaxFrame SDK,还需要额外安装 oss2 用于上传测试模型文件。
!pip install maxframe oss2 alibabacloud_credentials -U
同样,准备如下:
PROJECT_NAME = "[你的项目名]"
ENDPOINT = "https://service.cn-hangzhou.maxcompute.aliyun.com/api"
不同的是这里会用到OSS的资源,按照如下格式填写 OSS 账户信息、bucket 信息和模型的路径
# 此处仅做演示,如果是生产环境,请考虑使用 STS Token 作为临时授权
OSS_ACCESS_ID="[your-oss-access-id]" # OSS 的 Access ID
OSS_SECRET_ACCESS_KEY="[your-oss-secret-id]" # OSS 的 Secret ID
# 填写 Bucket 信息
OSS_ENDPOINT="[your-oss-internet-endpoint]" # OSS 的公网访问 endpoint,您也可以使用 sts token
OSS_BUCKET_NAME="[your-oss-bucket]" # OSS Bucket 的名字
OSS_BUCKET_ENDPOINT=f"{OSS_BUCKET_NAME}.{OSS_ENDPOINT}:80"
# 填写路径信息
OSS_MODEL_PATH="maxframe_demo/fasttext_model/lid.176.ftz" # 模型在 OSS 中的路径
MODEL_URI="https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.ftz" #模型下载的路径
最后运行如下脚本为您下载和上传 ,当然也可以手动在 OSS 控制台上传
import oss2
import requests
def upload_http_resource_to_oss(resource_url, auth, endpoint, bucket_name, object_path):
# 创建Bucket实例
bucket = oss2.Bucket(auth, endpoint, bucket_name)
# 获取HTTP资源
response = requests.get(resource_url)
response.raise_for_status()
print("下载完成")
try:
# 上传文件到OSS
result = bucket.put_object(object_path, response.content)
if result.status != 200:
raise Exception("Upload failed with status: " + str(result.status))
print("上传完成")
except Exception as e:
raise e
# 调用函数
auth = oss2.Auth(OSS_ACCESS_ID, OSS_SECRET_ACCESS_KEY)
upload_http_resource_to_oss(MODEL_URI, auth, OSS_ENDPOINT, OSS_BUCKET_NAME, OSS_MODEL_PATH)
MaxCompute 中默认不能访问外部网络,因此需要要将 bucket endpoint 配置到 maxcompute 控制台项目首页下方的 可用的外部网络地址 中,配置格式为 bucket_name.endpoint,如下图所示
准备演示数据
创建本示例所需的小规模测试数据,包含不同语言的文本
from alibabacloud_credentials import providers
from odps import ODPS
from odps.accounts import CredentialProviderAccount
# 创建 odps 对象
account = CredentialProviderAccount(providers.DefaultCredentialsProvider())
o = ODPS(
account=account,
project=PROJECT_NAME,
endpoint=ENDPOINT
)
import pandas as pd
import numpy as np
from odps.df import DataFrame as ODPSDataFrame
# 测试数据
data_sets = [
{
"table_name": "test_lang_text",
"table_schema": "index bigint, text string",
"source_type": "records",
"records": [
[1, "Welcome to MaxFrame Distributed Computing Framework."],
[2, "欢迎使用 MaxFrame 分布式计算框架。"],
[3, "Bienvenido al marco de computación distribuida MaxFrame."],
[4, "Bienvenue dans le cadre de calcul distribué MaxFrame."],
[5, "Willkommen beim MaxFrame-Distrubutionsrechnerrahmen."],
[6, "MaxFrame分散コンピューティングフレームワークへようこそ。"],
[7, "MaxFrame 분산 컴퓨팅 프레임워크에 오신 것을 환영합니다."],
[8, "Добро пожаловать в распределённую вычислительную структуру MaxFrame."],
[9, "Benvenuto nel framework di calcolo distribuito MaxFrame."],
[10, "Bem-vindo ao framework de computação distribuída MaxFrame."]
],
"lifecycle": 1,
}
]
# 创建表
def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
for index, data in enumerate(data_sets):
table_name = data.get("table_name")
table_schema = data.get("table_schema")
source_type = data.get("source_type")
if not table_name or not table_schema or not source_type:
raise ValueError(
f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'."
)
lifecycle = data.get("lifecycle", 5)
table_name += suffix
print(f"Processing {o.project}.{table_name}...")
if drop_if_exists:
print(f"Deleting {o.project}.{table_name} if exists...")
o.delete_table(table_name, if_exists=True)
o.create_table(
name=table_name,
table_schema=table_schema,
lifecycle=lifecycle,
if_not_exists=True,
)
if source_type == "local_file":
file_path = data.get("file")
if not file_path:
raise ValueError(
f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key."
)
sep = data.get("sep", ",")
pd_df = pd.read_csv(file_path, sep=sep)
ODPSDataFrame(pd_df).persist(table_name, drop_table=True)
elif source_type == "records":
records = data.get("records")
if not records:
raise ValueError(
f"Dataset at index {index} with source_type 'records' is missing the 'records' key."
)
with o.get_table(table_name).open_writer() as writer:
writer.write(records)
else:
raise ValueError(f"Unknown data set source_type: {source_type}")
print(f"Processed {o.project}.{table_name} Done")
prepare_data(o, data_sets, "_maxframe_demo", True)
使用 MaxFrame 进行语言识别
MaxFrame 依赖于 MaxCompute 的资源来执行计算,因此首先需要建立一个 MaxCompute 会话。new_session 是 MaxFrame 创建会话的入口。在创建会话之后,所有后续的计算都将默认使用该会话进行通信。MaxFrame 会在远端保存中间状态,因此你可以进行交互式操作,一边开发一边验证
在创建完成后,可以从 session 对象中获得 session_id 和 logview_address,前者是该会话的唯一 id,后者是用于查看在该 session 中产生的所有计算的作业执行情况
在该例中,由于需要访问 OSS,需要在 options.sql.settings 中添加 odps.internet.access.list,值为上文中配置的外部网络白名单
import maxframe.dataframe as md
from maxframe.session import new_session
# 默认打印 Logview 信息
import logging
logging.basicConfig(level=logging.INFO)
from maxframe import options
options.sql.settings = {
"odps.session.image": "common", # 使用默认镜像
"odps.internet.access.list": OSS_BUCKET_ENDPOINT, #请替换成您的 bucket endpoint
"odps.stage.mapper.split.size": 1 # 按 1MB 输入切割分片用于调整并发
}
# 初始化 MaxFrame Session
session = new_session(o)
# 打印 Session ID
print(f"MaxFrame Session ID: {session.session_id}")
数据探查
在上文数据准备阶段已经创建了临时表包含了如下内容。在 MaxFrame 中您可以使用 read_odps_table 来从表创建 DataFrame 对象
与 Pandas 一样,在获得 DataFrame 对象之后,可以使用 dtypes 来查看数据的 schema
print('--------------test_lang_text_maxframe_demo Table Schema-----------------------')
df = md.read_odps_table("test_lang_text_maxframe_demo", index_col="index")
print(df.dtypes)
在 MaxFrame 中,DataFrame 可以使用与 Pandas 兼容的接口进行计算,例如下例中的 head。但与 Pandas 不同的是,MaxFrame 中的数据并不存储在本地。MaxFrame 通过 Lazy 计算使你可以在本地使用各种算子,并通过 execute() 来触发计算一并提交到 MaxCompute 集群中分布式执行。
MaxFrame 已经对 Notebook 进行了适配,在 execute() 之后能够高效预览 MaxCompute 中数据的首末数行
df.head(5).execute()
利用 MaxFrame 的功能,我们能够使用与 Pandas 兼容的语法进行数据分析。所有算子的设计均为 分布式执行,确保计算的高效性。在模型调用的场景也一样,所有的输入数据会被自动分片在多台甚至上万台机器上并发运行。 相比于单机 Pandas,MaxFrame 的 Pandas 算子运行在 MaxCompute 计算集群中,能够大规模并行处理数据,数据读取效率也明显优于单机 Pandas。
对于 Fasttext 模型,我们可以使用一个 MaxFrame UDF 来加载 OSS 模型并进行推理。
在该 UDF 中,主要完成了如下工作:
使用自动化服务打包安装 oss2 和 fasttext
从 oss 下载加载模型
使用模型进行推理
把结果添加到行中作为新列返回
from maxframe.udf import with_python_requirements
# 1. 添加依赖自动打包
@with_python_requirements("oss2,fasttext,numpy==1.26.4")
def process(row, ak, sk, endpoint, bucket_name, model_path, _ctx={
}):
# 定义默认返回值
row["lang"] = None
row["score"] = None
row["error"] = None
try:
def _load_model_once():
"""
加载且只加载一次模型
Returns:
加载后的模型对象
"""
if "model" in _ctx:
return _ctx["model"]
# 2. 下载并加载模型
import oss2
auth = oss2.Auth(ak, sk)
bucket = oss2.Bucket(auth, endpoint, bucket_name)
bucket.get_object_to_file(model_path, './model.bin')
import fasttext
model = fasttext.load_model('./model.bin')
_ctx['model'] = model
return model
model = _load_model_once()
# 3. 推理
score = model.predict(row["text"])
pred_label = score[0][0].replace('__label__', '')
pred_score = score[1][0]
# 4. 更新结果
row["lang"] = pred_label
row["score"] = float(pred_score)
except Exception as e:
row["error"] = str(e)
raise e
# raise for debugging
return row
上述为原生 Python 函数,使用 apply 算子 可以将该函数发送到集群中完成计算。
需要注意的是,这里入参 row 为原生的 series 代表一行数据,返回的数据需要和 apply 算子中指定的 dtypes 在顺序和类型上都匹配,使用中如果遇到任何问题可以查看 apply 算子的参数说明。
# 声明返回值的类型,在该例中,将会在每行按顺序添加三个新列 lang, score, error
return_schema = df.dtypes.copy()
return_schema["lang"] = np.str_
return_schema["score"] = np.float_
return_schema["error"] = np.str_
# 使用 apply 算子在 dataframe df 上进行计算得到 result_dfs
result_df = df.apply(
process, # 指定函数
axis=1, # 指定按行计算
result_type="expand", # 返回多列
output_type="dataframe", # 返回为 dataframe
dtypes=return_schema, # 设置返回行类型
ak=OSS_ACCESS_ID, # 以下向 UDF 传参
sk=OSS_SECRET_ACCESS_KEY,
endpoint=OSS_ENDPOINT,
bucket_name=OSS_BUCKET_NAME,
model_path=OSS_MODEL_PATH)
# 运行并打印预览结果
execute_info = result_df.execute()
如果您想在本地查看所有计算结果,可以对 execute_info 使用 fetch() 方法将所有结果提取到本地进行查看
execute_info.fetch()
如果您接下来对数据的处理依旧发生在 MaxCompute 中,您可以对 result_df 使用 to_odps_table 将所有的结果写入指定表中
execute_info = md.to_odps_table(result_df, "test_lang_paragraph_maxframe_demo_result", overwrite=True, lifecycle=1).execute()
在所有计算完成后,请确保使用 destroy() 方法来销毁会话,以释放资源
session.destroy()
总结
这次我另辟蹊径,没有完全按照官方给出的最佳实践文档依葫芦画瓢,而是按照PAI中的实验步骤来完成的,通过这次体验确实有较为深入的理解了MaxFrame的用法,它不仅简化了从数据处理到模型训练的整个流程,还通过其强大的功能集显著提升了工作效率与创新能力
比如在企业内部,MaxFrame可以极大地促进跨部门间协作。例如,在数据分析团队与开发团队之间,常常存在信息孤岛的问题,导致沟通成本增加,项目进展缓慢。而MaxFrame凭借其高度兼容的Python接口以及对MaxCompute计算资源和服务的无缝集成,使得不同背景的技术人员能够在一个统一平台上协同工作。数据科学家可以直接使用熟悉的Pandas语法进行大规模数据操作,无需担心底层架构复杂性;与此同时,工程师们也可以轻松调用各种高级算子来构建高效的机器学习流水线。
在实际操作中确实也有碰到部分问题,但是最后通过查询资料和社区的问答都得以解决了。
问题一:读表时读到有列是JSON格式的就报错。当时我想将一个包含多种数据类型的表格上传到MaxFrame中,但是其中有一列是用于存储用户的偏好设置,所以设置的格式为JSON,上传后发现报错了。
经过一番排查和研究文档后,发现MaxFrame默认并不支持直接读取JSON格式的数据列。MaxFrame为了保证计算效率和稳定性,默认情况下只处理简单且固定结构的数据类型。对于JSON这样的非结构化数据,需要额外配置才能正确解析。
解决办法是考虑使用MaxCompute提供的内置函数如get_json_object()
来提取JSON中的特定值。比如说我的JSON字段里有一个名为“preferences”的键,那么可以通过get_json_object(json_column, '$.preferences')
的方式获取对应的值。这种方法适用于只需要从JSON中抽取少量关键信息的情况。但如果涉及到更复杂的JSON结构或者需要频繁访问多个层级的数据,则可能需要先将JSON列转换成表格形式再导入MaxFrame中。
问题二是使用MaxCompute时碰到的
错误原因是因为在创建会话时,_odps_entry 对象没有被正确初始化或设置,导致其为 None,此时尝试访问 endpoint 属性时,抛出了 AttributeError,进一步检查访问密钥、安全密钥以及项目名称等是否正确填写。