加速数据处理与AI开发的利器:阿里云MaxFrame实验评测

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000次 1年
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 随着数据量的爆炸式增长,传统数据分析方法逐渐显现出局限性。Python作为数据科学领域的主流语言,因其简洁易用和丰富的库支持备受青睐。阿里云推出的MaxFrame是一个专为Python开发者设计的分布式计算框架,旨在充分利用MaxCompute的强大能力,提供高效、灵活且易于使用的工具,应对大规模数据处理需求。MaxFrame不仅继承了Pandas等流行数据处理库的友好接口,还通过集成先进的分布式计算技术,显著提升了数据处理的速度和效率。

前言

随着数据量的爆炸式增长,传统的数据分析和处理方法逐渐显现出其局限性,尤其是在面对海量数据集时,计算资源的瓶颈和处理速度成为了亟待解决的问题。与此同时,Python作为一门广泛应用于数据科学、机器学习和人工智能领域的编程语言,因其简洁易用的语法和丰富的库支持而备受青睐。

正是在这样的背景下,阿里云推出了MaxFrame——一个专为Python开发者设计的分布式计算框架。MaxFrame旨在充分利用MaxCompute这一云原生大数据计算平台的强大能力,提供给用户一个高效、灵活且易于使用的工具,以应对大规模数据处理的需求。它不仅继承了Pandas等流行数据处理库的友好接口,还通过集成先进的分布式计算技术,显著提升了数据处理的速度和效率。

虽然我确实对MaxCompute和DataWorks等大数据产品比较熟悉,但日常工作更多是集中在管理和架构层面,几乎没有涉及过底层开发。因此,这次撰写关于MaxFrame的文章,对我来说既是一次挑战也是一次宝贵的学习机会,希望通过深入探索这款工具的功能和应用场景,能为读者带来有价值见解的同时,也能进一步提升自己在分布式计算和大数据处理方面的能力。

产品概述

一句话总结就是:MaxFrame对接MaxCompute,提供Python编程接口,完全兼容Pandas API,并能自动执行分布式计算。

image.png

MaxFrame商品销售分析实验

安装 MaxFrame

通过执行 pip install maxframe 命令直接安装 MaxFrame SDK,安装 alibabacloud_credentials 依赖,以便实现免密登录。

!pip install maxframe alibabacloud_credentials -U

image.png

准备项目

准备 MaxCompute 项目,并将其绑定到当前 DSW 空间,用于后续计算

登录 MaxCompute 控制台,在左侧导航栏选择【工作区 > 项目管理】,查看 MaxCompute 项目名称

image.png

目标 MaxCompute 项目所在地域的 Endpoint,可根据网络连接方式自行选择,例如 http://service.cn-chengdu.maxcompute.aliyun.com/api ,我这里选择的是杭州

image.png

https://service.cn-hangzhou.maxcompute.aliyun.com/api

最终准备如下:

PROJECT_NAME = "[你的项目名]"
ENDPOINT = "https://service.cn-hangzhou.maxcompute.aliyun.com/api"

准备数据

创建实验所需的小规模测试数据

import pandas as pd
import numpy as np
from odps import ODPS
from odps.df import DataFrame as ODPSDataFrame

from alibabacloud_credentials import providers
from odps.accounts import CredentialProviderAccount

# 创建 odps 对象
# 基于实例 RAM 角色访问 ODPS
account = CredentialProviderAccount(providers.DefaultCredentialsProvider())
o = ODPS(
    account=account,
    PROJECT_NAME = "[你的项目名]"
    ENDPOINT = "https://service.cn-hangzhou.maxcompute.aliyun.com/api"
)

# 测试数据
data_sets = [
    {
   
        "table_name": "product",
        "table_schema": "index bigint, product_id bigint, product_name string, current_price bigint",
        "source_type": "records",
        "records": [
            [1, 100, "Nokia", 1000],
            [2, 200, "Apple", 5000],
            [3, 300, "Samsung", 9000],
            [4, 500, "HP", 7000],
        ],
        "lifecycle": 5,
    },
    {
   
        "table_name": "sales",
        "table_schema": "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint",
        "source_type": "records",
        "records": [
            [1, 1, 100, 101, 2008, 10, 5000],
            [2, 2, 300, 101, 2009, 7, 4000],
            [3, 4, 100, 102, 2008, 9, 4000],
            [4, 5, 200, 102, 2010, 6, 6000],
            [5, 8, 300, 102, 2008, 10, 9000],
            [6, 9, 100, 102, 2009, 6, 2000],
            [7, 13, 500, 104, 2007, 3, 8000],
        ],
        "lifecycle": 5,
    },
]

# 创建表
def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
    for index, data in enumerate(data_sets):
        table_name = data.get("table_name")
        table_schema = data.get("table_schema")
        source_type = data.get("source_type")

        if not table_name or not table_schema or not source_type:
            raise ValueError(
                f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'."
            )

        lifecycle = data.get("lifecycle", 5)
        table_name += suffix

        print(f"Processing {o.project}.{table_name}...")
        if drop_if_exists:
            print(f"Deleting {o.project}.{table_name} if exists...")
            o.delete_table(table_name, if_exists=True)

        o.create_table(
            name=table_name,
            table_schema=table_schema,
            lifecycle=lifecycle,
            if_not_exists=True,
        )

        if source_type == "local_file":
            file_path = data.get("file")
            if not file_path:
                raise ValueError(
                    f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key."
                )
            sep = data.get("sep", ",")
            pd_df = pd.read_csv(file_path, sep=sep)
            ODPSDataFrame(pd_df).persist(table_name, drop_table=True)

        elif source_type == "records":
            records = data.get("records")
            if not records:
                raise ValueError(
                    f"Dataset at index {index} with source_type 'records' is missing the 'records' key."
                )
            with o.get_table(table_name).open_writer() as writer:
                writer.write(records)
        else:
            raise ValueError(f"Unknown data set source_type: {source_type}")

        print(f"Processed {o.project}.{table_name} Done")


prepare_data(o, data_sets, "_maxframe_demo", True)

注意,此处使用刚才的项目名称和ENDPOINT地址进行替换

# 创建 odps 对象
# 基于实例 RAM 角色访问 ODPS
account = CredentialProviderAccount(providers.DefaultCredentialsProvider())
o = ODPS(
    account=account,
    PROJECT_NAME = "[你的项目名]"
    ENDPOINT = "https://service.cn-hangzhou.maxcompute.aliyun.com/api"
)

image.png

使用 MaxFrame 进行数据分析

MaxFrame 依赖于 MaxCompute 的资源来执行计算,因此首先需要建立一个 MaxCompute 会话。new_session 是 MaxFrame 创建会话的入口。在创建会话之后,所有后续的计算都将默认使用该会话进行通信,MaxFrame 会在远端保存中间状态

在创建完成后,可以从 session 对象中获得 session_id logview_address,前者是该会话的唯一 id,后者是用于查看在该 session 中产生的所有计算的作业执行情况

from maxframe.session import new_session
import maxframe.dataframe as md

# 初始化 MaxFrame Session
session = new_session(o)

# 打印 Session ID
print(f"MaxFrame Session ID: {session.session_id}")

# 打印 LogView 地址
print(f"MaxFrame LogView 地址: {session.get_logview_address()}")

此时会得到session_id 和 logview_address如下:

image.png

演示的场景是使用 MaxFrame 对商品和销售数据进行分析。这些数据统一存储于两张 MaxCompute 数据表中:商品表包含商品、价格等信息;销售表包含客户、产品、销售数量、销售年份、销售价格等信息

image.png

与 Pandas 一样,在获得 dataframe 对象之后,可以使用 dtypes 来查看数据的 schema。在下例中,md.read_odps_table 会获得 maxframe.DataFrame 对象

print('--------------Product Table Schema-----------------------')
product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
print(product.dtypes)

print('\n--------------Sales Table Schema-----------------------')
sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
print(sales.dtypes)

image.png

在 MaxFrame 中,DataFrame 可以使用与 Pandas 兼容的接口进行计算,例如下例中的 head。但与 Pandas 不同的是,MaxFrame 中的数据并不存储在本地。MaxFrame 通过 Lazy 计算可以在本地使用各种算子,并通过 execute() 来触发计算一并提交到 MaxCompute 集群中分布式执行

MaxFrame 已经对 Notebook 进行了适配,在 execute() 之后能够高效预览 MaxCompute 中数据的首末数行。

sales.head(5).execute()

image.png

利用 MaxFrame 的功能,能够使用与 Pandas 兼容的语法进行数据分析。所有算子的设计均为 分布式执行,确保计算的高效性。 相比于单机 Pandas,MaxFrame 的 Pandas 算子运行在 MaxCompute 计算集群中,能够大规模并行处理数据,数据读取效率也明显优于单机 Pandas。

场景 1:Pandas merge接口的分布式执行

连接两张数据表,以获取 product_maxframe_demo_large 表中所有 sale_id 对应的 product_name 以及该产品的所有 year 和 price

# 默认打印 Logview 信息
import logging
logging.basicConfig(level=logging.INFO)

sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
product = md.read_odps_table("product_maxframe_demo", index_col="product_id")

# 这里的df并不会立即执行,除非您使用df.execute()来触发。
# 这意味着所有的计算都将最终完全在MaxCompute集群完成,避免了中间所不必要的数据传输和阻塞。
df = sales.merge(product, left_on="product_id", right_index=True)
df = df[["product_name", "year", "price"]]

execute_result = df.execute()

image.png
image.png

df.execute() 会触发在 MaxCompute 集群上进行分布式计算,可以在 Logview 中查看当前计算的执行详情。

在内部测试中,Product 和 Sales 两张数据表的 6.8TB 数据在 MaxFrame 的 merge 算子中仅需 3 分钟即可完成计算。相比之下,在本地使用 Pandas 处理 6.8TB 的数据会超出内存限制,无法执行

image.png

场景 2:Pandas groupby、agg、sort_values 接口的分布式执行

连接 Product 和 Sales 两张数据表,聚合每个产品在 Sales 表中的首次售出年份

# 聚合每个产品的首次售出年份
min_year_df = md.read_odps_table("sales_maxframe_demo", index_col="index")
min_year_df = min_year_df.groupby('product_id', as_index=False).agg(first_year=('year', 'min'))

# 计算不同首次售出年份的产品 ID 数量
sum_product_min_year_df = min_year_df.groupby('first_year', as_index=False).agg(total_product_num=('product_id', 'count'))
exe_info = sum_product_min_year_df.execute()

image.png

image.png

在执行完上述分布式计算后,如果本地内存允许,可以对 execute() 的结果使用 fetch() 将结果拉全部取到本地使用,抓取后的数据会形成 Pandas DataFrame,因此对于 MaxFrame 尚无法支持的场景您也可以轻松的和本地的工作流结合。

当然,需要考虑取数的耗时、带宽和本地内存的限制,避免频繁的抓取和上传

local_df = exe_info.fetch().head(10)
local_df

image.png

在提取并展示了记录后,还可以使用 Matplotlib 生成图表来可视化数据。以下是一个示例代码,用于绘制每个首次售出年份的产品数量分布图

import matplotlib.pyplot as plt

plt.plot(local_df['first_year'],local_df['total_product_num'])
plt.xlabel('First_sale_year')
plt.ylabel('Total_product_num')
plt.title('Saled product distribution by first saled year')
plt.xticks(range(int(local_df['first_year'].min()), int(local_df['first_year'].max()) + 1))
plt.show()

image.png

在所有计算完成后,请确保使用 destroy() 方法来销毁会话,以释放资源

session.destroy()

基于 UDF 加载 OSS 中的 FastText 模型进行分布式语言识别实验

MaxFrame User Define Function 提供了灵活的开发能力,支持您提供原生的 Python 函数在上万机器上分布式执行,这对于海量数据需要离线推理的场景极为重要

本文以 fasttext 模型为例,将带您了解 MaxFrame 的开发流程,以及如何在 MaxFrame UDF 中访问 OSS 并加载模型进行文本语言识别

执行 pip install maxframe 命令直接安装 MaxFrame SDK,还需要额外安装 oss2 用于上传测试模型文件。

!pip install maxframe oss2 alibabacloud_credentials -U

image.png

同样,准备如下:

PROJECT_NAME = "[你的项目名]"
ENDPOINT = "https://service.cn-hangzhou.maxcompute.aliyun.com/api"

不同的是这里会用到OSS的资源,按照如下格式填写 OSS 账户信息、bucket 信息和模型的路径

# 此处仅做演示,如果是生产环境,请考虑使用 STS Token 作为临时授权
OSS_ACCESS_ID="[your-oss-access-id]" # OSS 的 Access ID
OSS_SECRET_ACCESS_KEY="[your-oss-secret-id]" # OSS 的 Secret ID

# 填写 Bucket 信息
OSS_ENDPOINT="[your-oss-internet-endpoint]" # OSS 的公网访问 endpoint,您也可以使用 sts token
OSS_BUCKET_NAME="[your-oss-bucket]" # OSS Bucket 的名字
OSS_BUCKET_ENDPOINT=f"{OSS_BUCKET_NAME}.{OSS_ENDPOINT}:80"

# 填写路径信息
OSS_MODEL_PATH="maxframe_demo/fasttext_model/lid.176.ftz" # 模型在 OSS 中的路径
MODEL_URI="https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.ftz" #模型下载的路径

最后运行如下脚本为您下载和上传 ,当然也可以手动在 OSS 控制台上传

import oss2
import requests

def upload_http_resource_to_oss(resource_url, auth, endpoint, bucket_name, object_path):
    # 创建Bucket实例
    bucket = oss2.Bucket(auth, endpoint, bucket_name)

    # 获取HTTP资源
    response = requests.get(resource_url)
    response.raise_for_status()
    print("下载完成")

    try:
        # 上传文件到OSS
        result = bucket.put_object(object_path, response.content)
        if result.status != 200:
            raise Exception("Upload failed with status: " + str(result.status))
        print("上传完成")
    except Exception as e:
        raise e

# 调用函数
auth = oss2.Auth(OSS_ACCESS_ID, OSS_SECRET_ACCESS_KEY)
upload_http_resource_to_oss(MODEL_URI, auth, OSS_ENDPOINT, OSS_BUCKET_NAME, OSS_MODEL_PATH)

image.png

image.png

MaxCompute 中默认不能访问外部网络,因此需要要将 bucket endpoint 配置到 maxcompute 控制台项目首页下方的 可用的外部网络地址 中,配置格式为 bucket_name.endpoint,如下图所示

image.png

准备演示数据

创建本示例所需的小规模测试数据,包含不同语言的文本

from alibabacloud_credentials import providers
from odps import ODPS
from odps.accounts import CredentialProviderAccount

# 创建 odps 对象
account = CredentialProviderAccount(providers.DefaultCredentialsProvider())
o = ODPS(
    account=account,
    project=PROJECT_NAME,
    endpoint=ENDPOINT
)
import pandas as pd
import numpy as np
from odps.df import DataFrame as ODPSDataFrame

# 测试数据
data_sets = [
    {
   
        "table_name": "test_lang_text",
        "table_schema": "index bigint, text string",
        "source_type": "records",
        "records": [
            [1, "Welcome to MaxFrame Distributed Computing Framework."],
            [2, "欢迎使用 MaxFrame 分布式计算框架。"],
            [3, "Bienvenido al marco de computación distribuida MaxFrame."],
            [4, "Bienvenue dans le cadre de calcul distribué MaxFrame."],
            [5, "Willkommen beim MaxFrame-Distrubutionsrechnerrahmen."],
            [6, "MaxFrame分散コンピューティングフレームワークへようこそ。"],
            [7, "MaxFrame 분산 컴퓨팅 프레임워크에 오신 것을 환영합니다."],
            [8, "Добро пожаловать в распределённую вычислительную структуру MaxFrame."],
            [9, "Benvenuto nel framework di calcolo distribuito MaxFrame."],
            [10, "Bem-vindo ao framework de computação distribuída MaxFrame."]
        ],
        "lifecycle": 1,
    }
]

# 创建表
def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
    for index, data in enumerate(data_sets):
        table_name = data.get("table_name")
        table_schema = data.get("table_schema")
        source_type = data.get("source_type")

        if not table_name or not table_schema or not source_type:
            raise ValueError(
                f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'."
            )

        lifecycle = data.get("lifecycle", 5)
        table_name += suffix

        print(f"Processing {o.project}.{table_name}...")
        if drop_if_exists:
            print(f"Deleting {o.project}.{table_name} if exists...")
            o.delete_table(table_name, if_exists=True)

        o.create_table(
            name=table_name,
            table_schema=table_schema,
            lifecycle=lifecycle,
            if_not_exists=True,
        )

        if source_type == "local_file":
            file_path = data.get("file")
            if not file_path:
                raise ValueError(
                    f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key."
                )
            sep = data.get("sep", ",")
            pd_df = pd.read_csv(file_path, sep=sep)
            ODPSDataFrame(pd_df).persist(table_name, drop_table=True)

        elif source_type == "records":
            records = data.get("records")
            if not records:
                raise ValueError(
                    f"Dataset at index {index} with source_type 'records' is missing the 'records' key."
                )
            with o.get_table(table_name).open_writer() as writer:
                writer.write(records)
        else:
            raise ValueError(f"Unknown data set source_type: {source_type}")

        print(f"Processed {o.project}.{table_name} Done")

prepare_data(o, data_sets, "_maxframe_demo", True)

使用 MaxFrame 进行语言识别

MaxFrame 依赖于 MaxCompute 的资源来执行计算,因此首先需要建立一个 MaxCompute 会话。new_session 是 MaxFrame 创建会话的入口。在创建会话之后,所有后续的计算都将默认使用该会话进行通信。MaxFrame 会在远端保存中间状态,因此你可以进行交互式操作,一边开发一边验证

在创建完成后,可以从 session 对象中获得 session_id 和 logview_address,前者是该会话的唯一 id,后者是用于查看在该 session 中产生的所有计算的作业执行情况

在该例中,由于需要访问 OSS,需要在 options.sql.settings 中添加 odps.internet.access.list,值为上文中配置的外部网络白名单

import maxframe.dataframe as md
from maxframe.session import new_session

# 默认打印 Logview 信息
import logging
logging.basicConfig(level=logging.INFO)

from maxframe import options
options.sql.settings = {
   
    "odps.session.image": "common", # 使用默认镜像
    "odps.internet.access.list": OSS_BUCKET_ENDPOINT, #请替换成您的 bucket endpoint
    "odps.stage.mapper.split.size": 1 # 按 1MB 输入切割分片用于调整并发
}

# 初始化 MaxFrame Session
session = new_session(o)

# 打印 Session ID
print(f"MaxFrame Session ID: {session.session_id}")

image.png

数据探查

在上文数据准备阶段已经创建了临时表包含了如下内容。在 MaxFrame 中您可以使用 read_odps_table 来从表创建 DataFrame 对象

image.png

与 Pandas 一样,在获得 DataFrame 对象之后,可以使用 dtypes 来查看数据的 schema

print('--------------test_lang_text_maxframe_demo Table Schema-----------------------')
df = md.read_odps_table("test_lang_text_maxframe_demo", index_col="index")
print(df.dtypes)

在 MaxFrame 中,DataFrame 可以使用与 Pandas 兼容的接口进行计算,例如下例中的 head。但与 Pandas 不同的是,MaxFrame 中的数据并不存储在本地。MaxFrame 通过 Lazy 计算使你可以在本地使用各种算子,并通过 execute() 来触发计算一并提交到 MaxCompute 集群中分布式执行。

MaxFrame 已经对 Notebook 进行了适配,在 execute() 之后能够高效预览 MaxCompute 中数据的首末数行

df.head(5).execute()

image.png
image.png

image.png

利用 MaxFrame 的功能,我们能够使用与 Pandas 兼容的语法进行数据分析。所有算子的设计均为 分布式执行,确保计算的高效性。在模型调用的场景也一样,所有的输入数据会被自动分片在多台甚至上万台机器上并发运行。 相比于单机 Pandas,MaxFrame 的 Pandas 算子运行在 MaxCompute 计算集群中,能够大规模并行处理数据,数据读取效率也明显优于单机 Pandas。

对于 Fasttext 模型,我们可以使用一个 MaxFrame UDF 来加载 OSS 模型并进行推理。

在该 UDF 中,主要完成了如下工作:

  1. 使用自动化服务打包安装 oss2 和 fasttext

  2. 从 oss 下载加载模型

  3. 使用模型进行推理

  4. 把结果添加到行中作为新列返回

from maxframe.udf import with_python_requirements

# 1. 添加依赖自动打包
@with_python_requirements("oss2,fasttext,numpy==1.26.4")
def process(row, ak, sk, endpoint, bucket_name, model_path, _ctx={
   }):
    # 定义默认返回值
    row["lang"] = None
    row["score"] = None
    row["error"] = None

    try:
        def _load_model_once():
            """
            加载且只加载一次模型

            Returns:
                加载后的模型对象
            """
            if "model" in _ctx:
                return _ctx["model"]

            # 2. 下载并加载模型
            import oss2
            auth = oss2.Auth(ak, sk)
            bucket = oss2.Bucket(auth, endpoint, bucket_name)
            bucket.get_object_to_file(model_path, './model.bin')

            import fasttext
            model = fasttext.load_model('./model.bin')
            _ctx['model'] = model
            return model

        model = _load_model_once()
        # 3. 推理
        score = model.predict(row["text"])
        pred_label = score[0][0].replace('__label__', '')
        pred_score = score[1][0]

        # 4. 更新结果
        row["lang"] = pred_label
        row["score"] = float(pred_score)
    except Exception as e:
        row["error"] = str(e)
        raise e
        # raise for debugging

    return row

上述为原生 Python 函数,使用 apply 算子 可以将该函数发送到集群中完成计算。

需要注意的是,这里入参 row 为原生的 series 代表一行数据,返回的数据需要和 apply 算子中指定的 dtypes 在顺序和类型上都匹配,使用中如果遇到任何问题可以查看 apply 算子的参数说明。

# 声明返回值的类型,在该例中,将会在每行按顺序添加三个新列 lang, score, error
return_schema = df.dtypes.copy()
return_schema["lang"] = np.str_
return_schema["score"] = np.float_
return_schema["error"] = np.str_

# 使用 apply 算子在 dataframe df 上进行计算得到 result_dfs
result_df = df.apply(
    process, # 指定函数
    axis=1, # 指定按行计算
    result_type="expand", # 返回多列
    output_type="dataframe", # 返回为 dataframe
    dtypes=return_schema, # 设置返回行类型
    ak=OSS_ACCESS_ID, # 以下向 UDF 传参
    sk=OSS_SECRET_ACCESS_KEY,
    endpoint=OSS_ENDPOINT,
    bucket_name=OSS_BUCKET_NAME,
    model_path=OSS_MODEL_PATH)

# 运行并打印预览结果
execute_info = result_df.execute()

image.png
image.png

如果您想在本地查看所有计算结果,可以对 execute_info 使用 fetch() 方法将所有结果提取到本地进行查看

execute_info.fetch()

image.png

如果您接下来对数据的处理依旧发生在 MaxCompute 中,您可以对 result_df 使用 to_odps_table 将所有的结果写入指定表中

execute_info = md.to_odps_table(result_df, "test_lang_paragraph_maxframe_demo_result", overwrite=True, lifecycle=1).execute()

image.png
image.png

image.png

在所有计算完成后,请确保使用 destroy() 方法来销毁会话,以释放资源

session.destroy()

总结

这次我另辟蹊径,没有完全按照官方给出的最佳实践文档依葫芦画瓢,而是按照PAI中的实验步骤来完成的,通过这次体验确实有较为深入的理解了MaxFrame的用法,它不仅简化了从数据处理到模型训练的整个流程,还通过其强大的功能集显著提升了工作效率与创新能力

比如在企业内部,MaxFrame可以极大地促进跨部门间协作。例如,在数据分析团队与开发团队之间,常常存在信息孤岛的问题,导致沟通成本增加,项目进展缓慢。而MaxFrame凭借其高度兼容的Python接口以及对MaxCompute计算资源和服务的无缝集成,使得不同背景的技术人员能够在一个统一平台上协同工作。数据科学家可以直接使用熟悉的Pandas语法进行大规模数据操作,无需担心底层架构复杂性;与此同时,工程师们也可以轻松调用各种高级算子来构建高效的机器学习流水线。

在实际操作中确实也有碰到部分问题,但是最后通过查询资料和社区的问答都得以解决了。

问题一:读表时读到有列是JSON格式的就报错。当时我想将一个包含多种数据类型的表格上传到MaxFrame中,但是其中有一列是用于存储用户的偏好设置,所以设置的格式为JSON,上传后发现报错了。

经过一番排查和研究文档后,发现MaxFrame默认并不支持直接读取JSON格式的数据列。MaxFrame为了保证计算效率和稳定性,默认情况下只处理简单且固定结构的数据类型。对于JSON这样的非结构化数据,需要额外配置才能正确解析。

解决办法是考虑使用MaxCompute提供的内置函数如get_json_object()来提取JSON中的特定值。比如说我的JSON字段里有一个名为“preferences”的键,那么可以通过get_json_object(json_column, '$.preferences')的方式获取对应的值。这种方法适用于只需要从JSON中抽取少量关键信息的情况。但如果涉及到更复杂的JSON结构或者需要频繁访问多个层级的数据,则可能需要先将JSON列转换成表格形式再导入MaxFrame中。

问题二是使用MaxCompute时碰到的

image.png

错误原因是因为在创建会话时,_odps_entry 对象没有被正确初始化或设置,导致其为 None,此时尝试访问 endpoint 属性时,抛出了 AttributeError,进一步检查访问密钥、安全密钥以及项目名称等是否正确填写。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
1天前
|
存储 人工智能 数据管理
|
6天前
|
人工智能 自然语言处理 监控
解决方案评测:主动式智能导购AI助手构建
作为一名数据工程师,我体验了主动式智能导购AI助手构建解决方案,并进行了详细评测。该方案通过百炼大模型和函数计算实现智能推荐与高并发处理,部署文档详尽但部分细节如模型调优需改进。架构设计清晰,前端支持自然语言处理与语音识别,中间件确保实时数据同步。生产环境部署顺畅,但在系统监控方面可进一步优化。总体而言,该方案在零售行业具有显著应用潜力,值得尝试。
37 17
|
3天前
|
人工智能 安全 前端开发
《主动式智能导购 AI 助手构建》解决方案评测
在部署《主动式智能导购 AI 助手构建》解决方案时,需关注以下四方面: 1. **引导与文档支持**:官方应提供细致、易懂的引导步骤,涵盖环境搭建、模块配置及常见问题解答。遇到错误及时截图反馈。 2. **原理与架构理解**:深入探究智能导购的工作原理和系统架构,从前端到后端各层运作机制,明确模块职责与扩展性。 3. **关键技术洞察**:理解百炼大模型和函数计算的应用,确保其适配场景并高效运行,通过截图反馈技术难题。 4. **生产环境评估**:评估方案在实际业务中的适用性,如安全防护和数据接入指导,确保高并发下的稳定性和全面性。 认真评测这些要点,助力方案持续优化。
33 11
|
6天前
|
人工智能 大数据 测试技术
自主和开放并举 探索下一代阿里云AI基础设施固件创新
12月13日,固件产业技术创新联盟产业峰会在杭州举行,阿里云主导的开源固件测试平台发布和PCIe Switch固件技术亮相,成为会议焦点。
|
22小时前
|
人工智能 Serverless API
《智能导购 AI 助手构建》解决方案评测:极具吸引力的产品,亟待完善的教程文档
《智能导购 AI 助手构建》解决方案评测:极具吸引力的产品,亟待完善的教程文档
33 7
|
7天前
|
人工智能 算法 搜索推荐
《主动式智能导购AI助手构建》解决方案用户评测
《主动式智能导购AI助手构建》提供了详尽的文档支持,涵盖环境准备、配置项设置等,配有图表和实例代码,适合新手上手。部署中遇到环境变量设置和网络连接问题,通过官方文档与技术支持解决。建议增加FAQ内容及错误日志说明。该方案采用Multi-Agent架构,结合百炼大模型和函数计算,实现精准推荐和高效响应。生产环境部署指导基本满足需求,但需加强异常处理指导。整体而言,此解决方案创新实用,推动电商领域发展。
|
22小时前
|
弹性计算 人工智能 自然语言处理
云工开物:阿里云弹性计算走进高校第2期,与北京大学研一学生共探AI时代下的应用创新
阿里云高校合作、弹性计算团队​于北京大学,开展了第2届​【弹性计算进校园】​交流活动。
|
1天前
|
机器学习/深度学习 新零售 人工智能
基于阿里云AI购物助手解决方案的深度评测
阿里云推出的AI购物助手解决方案,采用模块化架构,涵盖智能对话引擎、商品知识图谱和个性化推荐引擎。评测显示其在智能咨询问答、个性化推荐和多模态交互方面表现出色,准确率高且响应迅速。改进建议包括提升复杂问题理解、简化推荐过程及优化话术。总体评价认为该方案技术先进,应用效果好,能显著提升电商购物体验并降低运营成本。
24 0
|
1天前
|
人工智能
阿里云领跑生成式AI工程领域,两大维度排名Gartner®生成式AI工程Market Quadrant全球第二
阿里云凭借强劲实力入选Gartner 《Innovation Guide for Generative AI Technologies》所有领域的新兴领导者象限。
|
10天前
|
机器学习/深度学习 人工智能 自然语言处理
AI技术深度解析:从基础到应用的全面介绍
人工智能(AI)技术的迅猛发展,正在深刻改变着我们的生活和工作方式。从自然语言处理(NLP)到机器学习,从神经网络到大型语言模型(LLM),AI技术的每一次进步都带来了前所未有的机遇和挑战。本文将从背景、历史、业务场景、Python代码示例、流程图以及如何上手等多个方面,对AI技术中的关键组件进行深度解析,为读者呈现一个全面而深入的AI技术世界。
71 10