阿里云 Paimon + MaxCompute 极速体验

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
实时数仓Hologres,5000CU*H 100GB 3个月
简介: Paimon 和 MaxCompute 的对接经历了长期优化,解决了以往性能不足的问题。通过半年紧密合作,双方团队专门提升了 Paimon 在 MaxCompute 上的读写性能。主要改进包括:采用 Arrow 接口减少数据转换开销,内置 Paimon SDK 提升启动速度,实现原生读写能力,减少中间拷贝与转换,显著降低 CPU 开销与延迟。经过双十一实战验证,Paimon 表的读写速度已接近 MaxCompute 内表,远超传统外表。欢迎体验!

概述

Paimon 和 MaxCompute 的对接,很早就开始。虽然一直是可用状态,但是经常因为性能对比于内表或者其他外表不足,而导致客户在使用上十分艰难。为此,Paimon 团队和 MaxCompute 团队进行了半年的紧密合作,专门为提高 Paimon 在 MaxCompute 上的性能进行了专项优化研发。本文将从过去 MaxCompute 读写慢的原因,双方团队合作进行了哪些优化以及最后优化效果三个方面构文,叙述最新进展。


MaxCompute 外表架构升级(快)

MaxCompute 是纯 c++ 代码结构,为了跟 Hive 生态保持一定的兼容性,读写外表一般通过 Hive 的读写接口。此接口,通过 next 方法获取下一条消息数据。在大数据领域,绝大多数场景都是使用列存或行列混合方式来进行数据存储。因此,从 Paimon sdk 读取到的数据,再经由 MaxCompute 消费就会存在以下过程:

MaxCompute 调用 Paimon sdk 读取数据,数据存在 Paimon 的缓冲中,以列存的形式存储 --> Maxcompute 通过 "per row" 接口,一行一行地从 Paimon 缓存中读取数据 --> MaxCompute 再将数据由行式转化为列式进行向量化计算。数据流转上,从列式读取,转换成行,再转换成列,消耗了巨量的 CPU 资源。


为此,团队设计了新的基于 Arrow 的接口,使得列存数据能够方便地在 java 和 c++ 之间进行传递。从 Paimon 读取到的数据,只需要进过一次列式拷贝,到 Arrow 的数据结构中,然后直接通过 Arrow 的跨语言特性传递给 MaxCompute 进行使用即可。当然,这一步的前提是,不需要经过主键 merge。如果是 Paimon 主键表,依旧需要使用行的形式进行数据去重,再通过 Arrow 接口传递给 MaxCompute 使用。


小结:

  • 非主键表:可以直接 Arrow 化处理,一次列式拷贝即可,大幅节约 CPU 资源。
  • 主键表:仍需行级别去重,无法达到与非主键表完全一致的性能,但也在新的框架下有所提升。


新旧接口对比

image.png

除此之外,新接口实现了在 MaxCompute 上直接写 Paimon 外表。Paimon 通过实现 native writer,已经直接适配 MaxCompute 内部的 aliorc 格式。


Paimon sdk 内置 (更快)

过去,MaxCompute 内置了 hudi 等其他的 sdk,但是 MaxCompute 通过 plugin 机制置放 Paimon sdk (基于某些沟通没到位的问题)。导致,每次启动 MaxCompute 的 worker pod,都需要从远方拉一遍 Paimon sdk 的包,且通过 plugin load 的方式进行加载。同时,因为Paimon jar是用户自定义的jar包,从安全角度考虑必须以子进程+子容器方式被拉起和执行,然后由父进程主动与子进程通信并基于TCP/Domain Socket等方式完成数据交换,拉起进程的耗时受整个集群的CPU水位影响,数据交换方式也由内部调用变成了协议传输,大大增加了时间消耗。因此,每个有关 Paimon 的 task,都必须要有 30s 左右的跟任务完全无关的时间消耗。且随着数据量和集群压力的增加,这个消耗会更多(理论上应该保持不变,但是测下来会更久)。


因此与MC技术团队沟通后,MaxCompute 团队内置了 Paimon sdk。Paimon sdk 的发版会受到 MaxCompute 团队发版的限制,提升了性能,增强了用户体验。


native direct read && write (无敌快)

为了让 Paimon 的读性能媲美内表,我们进行了更近一步的读写优化。当 MaxCompute 方收到的 Paimon split 数据,不需要经过 Paimon sdk merge 的时候,直接利用 MaxCompute 自身读引擎进行数据读取。对于非主键表,直接媲美 MaxCompute 内表的读取速度和资源消耗。


同时,Paimon 侧设计了新的 Arrow 批量写接口。从 MaxCompute 获取到的 Arrow batch 数据,将直接通过 Paimon sdk 写入到 Paimon 表,无需经过任何行列转换或者数据拷贝。Paimon 通过设计新的 Paimon-jni 模块,直接在底层通过 c++ Arrow 写入 Arrow batch 数据,效果无异于 MaxCompute 直写。(此优化依旧只针对非主键表,主键表需要内存中 merge)


为了进一步加速 MaxCompute 的写,Paimon 侧还进行了异步 setup 优化(MaxCompute 异步调用 Paimon 初始化,同时进行数据准备),列式分拣动态分区等多种优化,进一步提升 MaxCompute 对于 Paimon 的读写性能。


这套双方团队的新架构,已经成功经过了淘宝天猫双十一的巨量流量验证,并且取得了巨大的成功。


性能测试

首先,我们测试在 MaxCompute 上对 Paimon 外表,hudi 外表,以及 MaxCompute 内表的读取结果。


环境构建

首先,在共有云上开通 dlf、vvp、MaxCompute 以及 oss 服务。为了构建测试环境,我们通过 vvp 创建 dlf catalog,利用 catalog 建表,并通过 vvp 流任务写入数据。

构建 catalog:

-- my-catalog 也可以改为自定义的英文名称CREATE CATALOG `my-catalog` WITH (  'type' = 'paimon',  'metastore' = 'dlf',  'warehouse' = '<warehouse>',  'dlf.catalog.id' = '<dlf.catalog.id>',  'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>',  'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>',  'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>',  'dlf.catalog.region' = '<dlf.catalog.region>',  'fs.oss.endpoint' = '<fs.oss.endpoint>',  'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',  'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>');

建表(举例):

CREATE TABLE `dlf-lakehouse`.`my_db`.ods_sell_meta_stream (    pt INT,    `user_id` BIGINT,     sell_id BIGINT,     sell_time TIMESTAMP,     sell_name STRING,     sell_nick_name STRING,    sell_phone STRING,    sell_number STRING,    sell_position STRING) PARTITIONED BY (pt) WITH (    'file.format' = 'parquet');

利用 vvp etl 写入测试数据:

INSERT INTO `dlf-lakehouse`.`my_db`.ods_sell_meta_stream SELECT * FROM KafkaTable;

接着通过 odps 的 dlf 创建引导,开通和 dlf 打通的 odps 项目(需要绑定 odps 数据源到 DataWorks 项目开发中心), 具体操作可见 MaxCompute对应文档[1]。


读测试

对于 hudi 表,我们使用了 COPY_ON_WRITE 的写入方式,以达到最好的读取性能。hudi 表的 schema 如下(hudi 必须要定义 uudi 才能导入 dlf,导致配置了只进行 insert 操作):

CREATE TEMPORARY TABLE flink_hudi_tbl (    `uuid` STRING PRIMARY KEY NOT ENFORCED,    `ts` STRING,    pt INT,    `user_id` BIGINT,     sell_id BIGINT,     sell_time STRING,     sell_name STRING,     sell_nick_name STRING,    sell_phone STRING,    sell_number STRING,    sell_position STRING) WITH (  'connector' = 'hudi',   'oss.endpoint' = 'oss-cn-hangzhou-internal.aliyuncs.com',   'accessKeyId' = 'xx',   'accessKeySecret' = 'xx',   'path' = 'oss://<path>',  'table.type' = 'COPY_ON_WRITE',  'hive_sync.enable' = 'true',  'hive_sync.mode' = 'hms',  'hive_sync.db' = 'flink_hudi',  'hive_sync.table' = 'flink_hudi_tbl',  'dlf.catalog.region' = 'cn-hangzhou',  'dlf.catalog.endpoint' = 'dlf-share.cn-hangzhou.aliyuncs.com',  'compaction.async.enabled' = 'false',  'write.operation' = 'insert',  'index.global.enabled' = 'false');

对于 Paimon 表,我们使用如下 schema 结构 (不需要任何参数):

CREATE TABLE `dlf-lakehouse`.`my_db`.ods_sell_meta_stream_write_none_native (    `uuid` STRING,    `ts` STRING,    pt INT,    `user_id` BIGINT,     sell_id BIGINT,     sell_time TIMESTAMP,     sell_name STRING,     sell_nick_name STRING,    sell_phone STRING,    sell_number STRING,    sell_position STRING);

表基本信息:

条数:66140055

每条数据大小 2k 以上


测试结果如下:

sql1

select max(sell_nick_name) from ods_sell_meta_stream_write_none_native;

image.png

新老 Paimon 接口在此 sql 下性能对比:

image.png

sql2(对比双字段带过滤条件查询)

select user_id, max(sell_nick_name)  from ods_sell_meta_stream_write_none_native where user_id > 100 and user_id < 200 group by user_id;


image.png


写测试

因为 MaxCompute 不支持写 hudi,因此仅对比 MaxCompute 写 Paimon 和写内表。以下为两次静态分区写的平均消耗对比。

表特征:

1.2亿数据量,每条数据 3k 大小左右

sql:

INSERT INTO ods_sell_meta_stream_write_none_native3 partition (pt=1) select user_id,sell_id,sell_time,sell_name,sell_nick_name,sell_phone,sell_number,sell_position from ods_sell_meta_stream_write_none_native where pt = 1;

image.png

性能实测下来,接近 MaxCompute 内表。


相比于 MaxCompute 上 parquet、orc 外表

sql:

select user_id, max(sell_nick_name)  from ods_sell_meta_stream_write_none_native where user_id >

image.png

sql:

select max(sell_nick_name) from ods_sell_meta_stream_write_none_native;

image.png

sql:

INSERT INTO <target> partition (pt=1) select user_id,sell_id,sell_time,sell_name,sell_nic

image.png

在当前版本,MaxCompute 对 Paimon 表的读写速度,快于对 parquet、orc 外表的直接读写速度,且读对比下, Paimon 比其他外表快了 10 倍以上。


总结

通过 Paimon 和 MaxCompute 团队共同努力,我们有以下功能或实战亮点:

  • Paimon 在 MaxCompute 上实现了原生读写能力。
  • 性能接近 MaxCompute 内表,多倍于传统外表。
  • 减少中间拷贝与转换,显著降低 CPU 开销与延迟。
  • 已通过双十一实战验证,稳定可靠

欢迎大家使用和体验!



作者简介 PROFILE

叶俊豪 (仟弋)

Apache Paimon Committer, 阿里云表存储团队核心研发

王烨 (萌豆)

阿里云 MaxCompute 团队核心研发


[1]. MaxCompute对应文档

https://help.aliyun.com/zh/maxcompute/user-guide/build-and-manage?spm=a2c4g.11186623.help-menu-27797.d_2_5_2_1_0.71636270AQTZEj#section-5mm-3fw-vsm






相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
13天前
|
存储 人工智能 数据管理
|
6天前
|
存储 人工智能 数据管理
媒体声音|专访阿里云数据库周文超博士:AI就绪的智能数据平台设计思路
在生成式AI的浪潮中,数据的重要性日益凸显。大模型在实际业务场景的落地过程中,必须有海量数据的支撑:经过训练、推理和分析等一系列复杂的数据处理过程,才能最终产生业务价值。事实上,大模型本身就是数据处理后的产物,以数据驱动的决策与创新需要通过更智能的平台解决数据多模处理、实时分析等问题,这正是以阿里云为代表的企业推动 “Data+AI”融合战略的核心动因。
|
12天前
|
机器学习/深度学习 分布式计算 数据挖掘
MaxFrame 性能评测:阿里云MaxCompute上的分布式Pandas引擎
MaxFrame是一款兼容Pandas API的分布式数据分析工具,基于MaxCompute平台,极大提升了大规模数据处理效率。其核心优势在于结合了Pandas的易用性和MaxCompute的分布式计算能力,无需学习新编程模型即可处理海量数据。性能测试显示,在涉及`groupby`和`merge`等复杂操作时,MaxFrame相比本地Pandas有显著性能提升,最高可达9倍。适用于大规模数据分析、数据清洗、预处理及机器学习特征工程等场景。尽管存在网络延迟和资源消耗等问题,MaxFrame仍是处理TB级甚至PB级数据的理想选择。
38 4
|
20天前
|
SQL DataWorks 数据可视化
阿里云DataWorks评测:大数据开发治理平台的卓越表现
阿里云DataWorks是一款集数据集成、开发、分析与管理于一体的大数据平台,支持多种数据源无缝整合,提供可视化ETL工具和灵活的任务调度机制。其内置的安全体系和丰富的插件生态,确保了数据处理的高效性和安全性。通过实际测试,DataWorks展现了强大的计算能力和稳定性,适用于中小企业快速搭建稳定高效的BI系统。未来,DataWorks将继续优化功能,降低使用门槛,并推出更多灵活的定价方案,助力企业实现数据价值最大化。
|
20天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
57 2
|
2月前
|
存储 分布式计算 大数据
【赵渝强老师】阿里云大数据生态圈体系
阿里云大数据计算服务MaxCompute(原ODPS)提供大规模数据存储与计算,支持离线批处理。针对实时计算需求,阿里云推出Flink版。此外,阿里云还提供数据存储服务如OSS、Table Store、RDS和DRDS,以及数据分析平台DataWorks、Quick BI和机器学习平台PAI,构建全面的大数据生态系统。
85 18
|
2月前
|
人工智能 Cloud Native 数据管理
媒体声音|重磅升级,阿里云发布首个“Data+AI”驱动的一站式多模数据平台
在2024云栖大会上,阿里云瑶池数据库发布了首个一站式多模数据管理平台DMS:OneMeta+OneOps。该平台由Data+AI驱动,兼容40余种数据源,实现跨云数据库、数据仓库、数据湖的统一数据治理,帮助用户高效提取和分析元数据,提升业务决策效率10倍。DMS已服务超10万企业客户,降低数据管理成本高达90%。
172 19
|
2月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
4月前
|
人工智能 分布式计算 DataWorks
连续四年!阿里云领跑中国公有云大数据平台
近日,国际数据公司(IDC)发布《中国大数据平台市场份额,2023:数智融合时代的真正到来》报告——2023年中国大数据平台公有云服务市场规模达72.2亿元人民币,其中阿里巴巴市场份额保持领先,占比达40.2%,连续四年排名第一。
278 12
|
4月前
|
人工智能 Cloud Native 数据管理
重磅升级,阿里云发布首个“Data+AI”驱动的一站式多模数据平台
阿里云发布首个AI多模数据管理平台DMS,助力业务决策提效10倍
572 17

相关产品

  • 云原生大数据计算服务 MaxCompute