kuairand-27k的Parquet 数据导出与上传到 MaxCompute 完整流程(hstu格式)

简介: 本文详解如何将本地kuairand-27k(1257行×14列)Parquet推荐数据集,经探查、类型映射(int64→bigint,list→array<bigint>),通过pyodps上传至阿里云MaxCompute表,含完整环境配置、建表与批量上传代码。

概述

本文介绍如何将本地 Parquet 文件(kuairand-27k 推荐系统数据集)导出并上传到阿里云 MaxCompute 表,包含:数据探查、类型映射、建表、上传的完整代码。


1. 环境准备

依赖安装

pip install pandas pyarrow pyodps
  • pandas + pyarrow:读取 Parquet 文件
  • pyodps:阿里云 MaxCompute Python SDK

凭证配置

通过环境变量传入 AccessKey,避免硬编码:

export ACCESS_ID="your_access_id"
export ACCESS_KEY="your_access_key"

2. 数据探查:读取 Parquet 文件

在上传之前,先了解 Parquet 文件的数据结构:

import pandas as pd

df = pd.read_parquet("kuairand-27k-train-0.parquet")

print(f"数据总行数: {len(df)}")
print(f"数据列数: {len(df.columns)}")
print(f"\n列名列表:")
print(df.columns.tolist())
print(f"\n数据类型:")
print(df.dtypes)

# 逐列查看前5条数据
for col in df.columns:
    print(f"\n--- {col} (dtype: {df[col].dtype}) ---")
    for i in range(min(5, len(df))):
        val = df[col].iloc[i]
        if isinstance(val, (list,)):
            print(f"  [{i}] len={len(val)}, 前10个值: {val[:10]}")
        else:
            print(f"  [{i}] {val}")

探查结果

本数据集共 1257 行、14 列,结构如下:

列名 Python 类型 说明
user_id int64 用户 ID
user_active_degree int64 用户活跃度
follow_user_num_range int64 关注人数区间
fans_user_num_range int64 粉丝人数区间
friend_user_num_range int64 好友人数区间
register_days_range int64 注册天数区间
video_id list(int) 用户历史交互视频序列
action_timestamp list(int) 行为时间戳序列
action_weight list(int) 行为权重序列(bitmask 编码)
watch_time list(int) 观看时长序列
item_video_id list(int) 候选视频 ID 序列
item_action_weight list(int) 候选视频行为标签
item_target_watchtime list(int) 候选视频目标观看时长
item_query_time list(int) 候选请求时间戳

3. 类型映射:Parquet → MaxCompute

Parquet/Python 类型 MaxCompute 类型
int64(标量) bigint
list(int)(数组) array<bigint>

4. 完整上传脚本

#!/usr/bin/env python3
"""
将 kuairand-27k-train-0.parquet 数据上传到 MaxCompute 表 pairec_kuairand_train
"""

import os
import pandas as pd
import numpy as np
from odps import ODPS
from odps.models import TableSchema as Schema, Column

# ========== 1. 配置连接参数 ==========
project_name = "pairec_mc"
access_id = os.environ["ACCESS_ID"]
access_key = os.environ["ACCESS_KEY"]
endpoint = "http://service.cn.maxcompute.aliyun.com/api"

# ========== 2. 连接 MaxCompute ==========
odps = ODPS(access_id, access_key, project_name, endpoint=endpoint)
print("MaxCompute 连接成功")

# ========== 3. 建表 ==========
TABLE_NAME = "pairec_kuairand_train"

# 先删除已有表(如需覆盖写入)
if odps.exist_table(TABLE_NAME):
    print(f"表 {TABLE_NAME} 已存在,正在删除...")
    odps.delete_table(TABLE_NAME)
    print(f"表 {TABLE_NAME} 已删除")

# 定义表 schema
columns = [
    # 用户侧标量字段 (bigint)
    Column(name="user_id", type="bigint"),
    Column(name="user_active_degree", type="bigint"),
    Column(name="follow_user_num_range", type="bigint"),
    Column(name="fans_user_num_range", type="bigint"),
    Column(name="friend_user_num_range", type="bigint"),
    Column(name="register_days_range", type="bigint"),
    # 历史序列字段 (array<bigint>)
    Column(name="video_id", type="array<bigint>"),
    Column(name="action_timestamp", type="array<bigint>"),
    Column(name="action_weight", type="array<bigint>"),
    Column(name="watch_time", type="array<bigint>"),
    # 候选物料字段 (array<bigint>)
    Column(name="item_video_id", type="array<bigint>"),
    Column(name="item_action_weight", type="array<bigint>"),
    Column(name="item_target_watchtime", type="array<bigint>"),
    Column(name="item_query_time", type="array<bigint>"),
]

schema = Schema(columns=columns)
odps.create_table(TABLE_NAME, schema)
print(f"表 {TABLE_NAME} 创建成功")

# ========== 4. 读取 Parquet 并上传数据 ==========
PARQUET_PATH = "kuairand-27k-train-0.parquet"
df = pd.read_parquet(PARQUET_PATH)
print(f"Parquet 读取完成,共 {len(df)} 行")

# 将 numpy 数组转为 Python list(PyODPS 要求原生 Python 类型)
array_cols = [
    "video_id", "action_timestamp", "action_weight", "watch_time",
    "item_video_id", "item_action_weight", "item_target_watchtime", "item_query_time"
]
for col in array_cols:
    df[col] = df[col].apply(
        lambda x: list(x) if isinstance(x, np.ndarray) else (x if isinstance(x, list) else [])
    )

# 标量列确保为 Python int
scalar_cols = [
    "user_id", "user_active_degree", "follow_user_num_range",
    "fans_user_num_range", "friend_user_num_range", "register_days_range"
]
for col in scalar_cols:
    df[col] = df[col].astype(int)

# 使用 Tunnel 上传
table = odps.get_table(TABLE_NAME)
print(f"开始上传数据(共 {len(df)} 行)...")

with table.open_writer() as writer:
    records = []
    for idx, row in df.iterrows():
        record = [
            int(row["user_id"]),
            int(row["user_active_degree"]),
            int(row["follow_user_num_range"]),
            int(row["fans_user_num_range"]),
            int(row["friend_user_num_range"]),
            int(row["register_days_range"]),
            list(row["video_id"]),
            list(row["action_timestamp"]),
            list(row["action_weight"]),
            list(row["watch_time"]),
            list(row["item_video_id"]),
            list(row["item_action_weight"]),
            list(row["item_target_watchtime"]),
            list(row["item_query_time"]),
        ]
        records.append(table.new_record(record))

        if (idx + 1) % 100 == 0:
            print(f"  已处理 {idx + 1}/{len(df)} 行")

    writer.write(records)

print(f"数据上传完成!共上传 {len(records)} 行到 {TABLE_NAME}")

5. 执行

source env.sh  # 设置 ACCESS_ID、ACCESS_KEY 环境变量
python upload_to_odps.py

预期输出:

project_name: pairec
endpoint: http://service.cn.maxcompute.aliyun.com/api
MaxCompute 连接成功
表 pairec_kuairand_train 创建成功
Parquet 读取完成,共 1257 行
开始上传数据(共 1257 行)...
  已处理 100/1257 行
  ...
  已处理 1200/1257 行
数据上传完成!共上传 1257 行到 pairec_kuairand_train

6. 常见问题与注意事项

Endpoint 与 Project 不匹配

MaxCompute 的 Project 绑定到特定 Region 的 Endpoint。如果报 Project not found,需确认 Project 所在的 Region 并切换对应 Endpoint:

Endpoint 适用场景
service.cn.maxcompute.aliyun.com 公网(华东2)

AccessKey 与 Endpoint 网络域不匹配

公网 AccessKey 只能用于公网 Endpoint,内网 AccessKey 只能用于内网 Endpoint,混用会报 AccessKeyIdNotFound

PyODPS 要求原生 Python 类型

open_writer() 写入数据时,不支持 numpy 的 int64ndarray 类型,需显式转换为 Python 原生的 intlist,否则会抛出类型错误。

批量上传性能

对于大数据量(百万行以上),建议分批写入(如每批 5000 条),避免单次 writer.write() 内存过大:

BATCH_SIZE = 5000
with table.open_writer() as writer:
    batch = []
    for idx, row in df.iterrows():
        batch.append(table.new_record([...]))
        if len(batch) >= BATCH_SIZE:
            writer.write(batch)
            batch = []
    if batch:
        writer.write(batch)
相关文章
|
19天前
|
人工智能 自然语言处理 文字识别
阿里云百炼Qwen3.7-Max简介:能力、优势、支持订阅计划参考
Qwen3.7-Max是阿里云百炼面向智能体时代推出的新一代旗舰模型,对标GPT-5.5、Claude Opus 4.7等闭源旗舰。该模型支持百万级token上下文窗口,具备顶级推理能力、多模态搜索与视觉理解增强、流式输出低延迟响应等核心优势,覆盖编程、办公、长周期自主执行等复杂场景。同时支持OpenAI接口兼容,便于系统快速迁移。用户可通过Token Plan团队或节省计划等订阅方式灵活调用,适合企业级高要求场景使用。
7161 30
阿里云百炼Qwen3.7-Max简介:能力、优势、支持订阅计划参考
|
4天前
|
数据采集 人工智能 前端开发
让 Coding Agent 从黑盒到透明:阿里云 Agent 观测审计数据采集实践
AI Agent 规模化落地带来执行黑盒、行为难追溯、成本难度量三大难题。阿里云基于 OTel 标准,面向 Coding Agent、个人通用助理和框架型 Agent,推出 LoongSuite Pilot、插件及探针等无侵入采集方案,让 Agent 实现可看见、可分析、可审计、可治理。
623 140
|
4天前
|
人工智能 弹性计算 运维
阿里云发布堡垒机智能运维Agent,运维交互进入自然语言新时代
支持自然语言运维,提升效率与安全双保障。
1158 1
|
11天前
|
人工智能 安全 定位技术
CodeGraph深度解析 让Claude Code工具调用直降七成的核心原理与实操教程
如今以Claude Code为代表的AI编程智能体已经成为开发者日常编码、项目重构、漏洞修复的必备工具。但在长期使用过程中,几乎所有开发者都会遇到同一个明显痛点:AI虽然具备强大的代码生成与分析能力,却常常陷入盲目探索的循环中。
1219 2
|
14天前
|
存储 定位技术 数据库
CodeGraph 如何让 Claude Code减少 7 成工具调用?
CodeGraph 为 Coding Agent 提供本地代码知识图谱,把函数、类、调用链和框架路由提前整理成“项目地图”,减少盲目搜索和文件读取。它不是新 Agent,而是上下文基础设施,让 Agent 更快找到正确代码路径,平均减少 7 成工具调用。
1296 3
|
11天前
|
人工智能 弹性计算 安全
阿里云618活动时间、活动入口、优惠活动详细解读
2026年阿里云618创新加速季已全面开启,作为年度力度最大的云产品促销活动,本次大促覆盖轻量应用服务器、ECS云服务器、GPU云服务器、数据库、AI算力、安全服务、CDN等全品类产品,推出5亿元算力补贴、新用户限时秒杀、普惠满减、企业专享、免费试用、云大使返佣等多重福利,个人开发者、中小企业、AI团队均可享受专属低价。本文将系统梳理2026年阿里云618活动的完整时间节点、官方参与入口、各类优惠细则、使用规则、热门产品推荐及实操代码,帮助用户精准参与、高效省钱,以最低成本完成上云部署。
1041 5
|
10天前
|
人工智能 自然语言处理 安全
Vibe Coding 实战:别盲目跟风,先分清 vibe coding 适合什么场景
本文系统总结vibe coding实战经验:明确其适用场景(原型、小工具、标准化模块),剖析5步落地流程(场景判定→结构化提示词→目录初始化→分模块生成→自动化校验),指出四大常见误区,并推荐适配工具Trae。强调“场景匹配+规则前置”是提效关键,避免盲目套用。
845 1
|
3天前
|
人工智能 运维 API
2026年阿里云百炼通义千问Qwen3.7-plus深度介绍 功能特性、使用优势及618大促订阅方案指南
大模型技术的普及,让AI能力逐步融入个人办公、内容创作、代码编写、企业运营、教育培训等各类场景。不同定位的模型对应不同使用需求,旗舰级模型性能强劲但使用成本偏高,轻量化模型价格低廉却难以胜任复杂任务,而介于两者之间的中端主力模型,凭借均衡的能力、亲民的定价、广泛的场景适配性,成为绝大多数个人用户、小型团队、中小企业的首选。
401 1

热门文章

最新文章