概述
本文介绍如何将本地 Parquet 文件(kuairand-27k 推荐系统数据集)导出并上传到阿里云 MaxCompute 表,包含:数据探查、类型映射、建表、上传的完整代码。
1. 环境准备
依赖安装
pip install pandas pyarrow pyodps
pandas+pyarrow:读取 Parquet 文件pyodps:阿里云 MaxCompute Python SDK
凭证配置
通过环境变量传入 AccessKey,避免硬编码:
export ACCESS_ID="your_access_id"
export ACCESS_KEY="your_access_key"
2. 数据探查:读取 Parquet 文件
在上传之前,先了解 Parquet 文件的数据结构:
import pandas as pd
df = pd.read_parquet("kuairand-27k-train-0.parquet")
print(f"数据总行数: {len(df)}")
print(f"数据列数: {len(df.columns)}")
print(f"\n列名列表:")
print(df.columns.tolist())
print(f"\n数据类型:")
print(df.dtypes)
# 逐列查看前5条数据
for col in df.columns:
print(f"\n--- {col} (dtype: {df[col].dtype}) ---")
for i in range(min(5, len(df))):
val = df[col].iloc[i]
if isinstance(val, (list,)):
print(f" [{i}] len={len(val)}, 前10个值: {val[:10]}")
else:
print(f" [{i}] {val}")
探查结果
本数据集共 1257 行、14 列,结构如下:
| 列名 | Python 类型 | 说明 |
|---|---|---|
user_id |
int64 | 用户 ID |
user_active_degree |
int64 | 用户活跃度 |
follow_user_num_range |
int64 | 关注人数区间 |
fans_user_num_range |
int64 | 粉丝人数区间 |
friend_user_num_range |
int64 | 好友人数区间 |
register_days_range |
int64 | 注册天数区间 |
video_id |
list(int) | 用户历史交互视频序列 |
action_timestamp |
list(int) | 行为时间戳序列 |
action_weight |
list(int) | 行为权重序列(bitmask 编码) |
watch_time |
list(int) | 观看时长序列 |
item_video_id |
list(int) | 候选视频 ID 序列 |
item_action_weight |
list(int) | 候选视频行为标签 |
item_target_watchtime |
list(int) | 候选视频目标观看时长 |
item_query_time |
list(int) | 候选请求时间戳 |
3. 类型映射:Parquet → MaxCompute
| Parquet/Python 类型 | MaxCompute 类型 |
|---|---|
int64(标量) |
bigint |
list(int)(数组) |
array<bigint> |
4. 完整上传脚本
#!/usr/bin/env python3
"""
将 kuairand-27k-train-0.parquet 数据上传到 MaxCompute 表 pairec_kuairand_train
"""
import os
import pandas as pd
import numpy as np
from odps import ODPS
from odps.models import TableSchema as Schema, Column
# ========== 1. 配置连接参数 ==========
project_name = "pairec_mc"
access_id = os.environ["ACCESS_ID"]
access_key = os.environ["ACCESS_KEY"]
endpoint = "http://service.cn.maxcompute.aliyun.com/api"
# ========== 2. 连接 MaxCompute ==========
odps = ODPS(access_id, access_key, project_name, endpoint=endpoint)
print("MaxCompute 连接成功")
# ========== 3. 建表 ==========
TABLE_NAME = "pairec_kuairand_train"
# 先删除已有表(如需覆盖写入)
if odps.exist_table(TABLE_NAME):
print(f"表 {TABLE_NAME} 已存在,正在删除...")
odps.delete_table(TABLE_NAME)
print(f"表 {TABLE_NAME} 已删除")
# 定义表 schema
columns = [
# 用户侧标量字段 (bigint)
Column(name="user_id", type="bigint"),
Column(name="user_active_degree", type="bigint"),
Column(name="follow_user_num_range", type="bigint"),
Column(name="fans_user_num_range", type="bigint"),
Column(name="friend_user_num_range", type="bigint"),
Column(name="register_days_range", type="bigint"),
# 历史序列字段 (array<bigint>)
Column(name="video_id", type="array<bigint>"),
Column(name="action_timestamp", type="array<bigint>"),
Column(name="action_weight", type="array<bigint>"),
Column(name="watch_time", type="array<bigint>"),
# 候选物料字段 (array<bigint>)
Column(name="item_video_id", type="array<bigint>"),
Column(name="item_action_weight", type="array<bigint>"),
Column(name="item_target_watchtime", type="array<bigint>"),
Column(name="item_query_time", type="array<bigint>"),
]
schema = Schema(columns=columns)
odps.create_table(TABLE_NAME, schema)
print(f"表 {TABLE_NAME} 创建成功")
# ========== 4. 读取 Parquet 并上传数据 ==========
PARQUET_PATH = "kuairand-27k-train-0.parquet"
df = pd.read_parquet(PARQUET_PATH)
print(f"Parquet 读取完成,共 {len(df)} 行")
# 将 numpy 数组转为 Python list(PyODPS 要求原生 Python 类型)
array_cols = [
"video_id", "action_timestamp", "action_weight", "watch_time",
"item_video_id", "item_action_weight", "item_target_watchtime", "item_query_time"
]
for col in array_cols:
df[col] = df[col].apply(
lambda x: list(x) if isinstance(x, np.ndarray) else (x if isinstance(x, list) else [])
)
# 标量列确保为 Python int
scalar_cols = [
"user_id", "user_active_degree", "follow_user_num_range",
"fans_user_num_range", "friend_user_num_range", "register_days_range"
]
for col in scalar_cols:
df[col] = df[col].astype(int)
# 使用 Tunnel 上传
table = odps.get_table(TABLE_NAME)
print(f"开始上传数据(共 {len(df)} 行)...")
with table.open_writer() as writer:
records = []
for idx, row in df.iterrows():
record = [
int(row["user_id"]),
int(row["user_active_degree"]),
int(row["follow_user_num_range"]),
int(row["fans_user_num_range"]),
int(row["friend_user_num_range"]),
int(row["register_days_range"]),
list(row["video_id"]),
list(row["action_timestamp"]),
list(row["action_weight"]),
list(row["watch_time"]),
list(row["item_video_id"]),
list(row["item_action_weight"]),
list(row["item_target_watchtime"]),
list(row["item_query_time"]),
]
records.append(table.new_record(record))
if (idx + 1) % 100 == 0:
print(f" 已处理 {idx + 1}/{len(df)} 行")
writer.write(records)
print(f"数据上传完成!共上传 {len(records)} 行到 {TABLE_NAME}")
5. 执行
source env.sh # 设置 ACCESS_ID、ACCESS_KEY 环境变量
python upload_to_odps.py
预期输出:
project_name: pairec
endpoint: http://service.cn.maxcompute.aliyun.com/api
MaxCompute 连接成功
表 pairec_kuairand_train 创建成功
Parquet 读取完成,共 1257 行
开始上传数据(共 1257 行)...
已处理 100/1257 行
...
已处理 1200/1257 行
数据上传完成!共上传 1257 行到 pairec_kuairand_train
6. 常见问题与注意事项
Endpoint 与 Project 不匹配
MaxCompute 的 Project 绑定到特定 Region 的 Endpoint。如果报 Project not found,需确认 Project 所在的 Region 并切换对应 Endpoint:
| Endpoint | 适用场景 |
|---|---|
service.cn.maxcompute.aliyun.com |
公网(华东2) |
AccessKey 与 Endpoint 网络域不匹配
公网 AccessKey 只能用于公网 Endpoint,内网 AccessKey 只能用于内网 Endpoint,混用会报 AccessKeyIdNotFound。
PyODPS 要求原生 Python 类型
open_writer() 写入数据时,不支持 numpy 的 int64、ndarray 类型,需显式转换为 Python 原生的 int 和 list,否则会抛出类型错误。
批量上传性能
对于大数据量(百万行以上),建议分批写入(如每批 5000 条),避免单次 writer.write() 内存过大:
BATCH_SIZE = 5000
with table.open_writer() as writer:
batch = []
for idx, row in df.iterrows():
batch.append(table.new_record([...]))
if len(batch) >= BATCH_SIZE:
writer.write(batch)
batch = []
if batch:
writer.write(batch)