Apache Flink 说道系列 - PyFlink集成Pandas(1+1 != 2)

简介: # 开篇说道 说道聊什么,聊阿里人熟知的“因为相信,所以简单”!这是每个人内心所神往的日常工作生活!这看似 简单的言语,透彻的道理,虽被大众所认可,但人们对其执行的能力却大相径庭。“因为相信,所以简单” 不是对我们的约束,而是为我们寻求快乐指明了方向...永远做别人成功路上的基石...当TA人踏上人生巅峰,基石自具人生高度! ![image.png](https://ata2-img

开篇说道

说道聊什么,聊阿里人熟知的“因为相信,所以简单”!这是每个人内心所神往的日常工作生活!这看似 简单的言语,透彻的道理,虽被大众所认可,但人们对其执行的能力却大相径庭。“因为相信,所以简单” 不是对我们的约束,而是为我们寻求快乐指明了方向...永远做别人成功路上的基石...当TA人踏上人生巅峰,基石自具人生高度!


image.png

Python已经发展成为许多数据处理领域中最重要的编程语言之一。Python的流行程度如此之大,以至于它几乎成了数据科学家默认的数据处理语言。除此之外,还有大量基于Python的数据处理工具,如NumPy、Pandas和scikitlearn,这些工具由于其灵活性或强大的功能而获得了广泛的普及。

Python has evolved into one of the most important programming languages for many fields of data processing. So big has been Python’s popularity, that it has pretty much become the default data processing language for data scientists. On top of that, there is a plethora of Python-based data processing tools such as NumPy, Pandas, and Scikit-learn that have gained additional popularity due to their flexibility or powerful functionalities.


image.png

Pic source: VanderPlas 2017, slide 52[1]

为了满足用户的需求,Flink社区希望更好地利用这些工具。为此,Flink社区花了很大的努力将Pandas与最新的Flink1.11中的PyFlink进行集成。增加的新特性包括对Pandas UDF的支持以及Pandas DataFrame和Table之间的转换。Pandas UDF不仅大大提高了Python UDF的执行性能,而且使用户更方便地利用Python UDF中的Pandas和NumPy等库。此外,为Pandas DataFrame和Table之间的转换提供支持,用户可以无缝地切换处理引擎,而不需要中间连接器。在本文的剩余部分中,我们将通过一个逐步的示例介绍这些功能是如何工作的。

In an effort to meet the user needs and demands, the Flink community hopes to leverage and make better use of these tools. Along this direction, the Flink community put some great effort in integrating Pandas into PyFlink with the latest Flink version 1.11. Some of the added features include support for Pandas UDF and the conversion between Pandas DataFrame and Table. Pandas UDF not only greatly improve the execution performance of Python UDF, but also make it more convenient for users to leverage libraries such as Pandas and NumPy in Python UDF. Additionally, providing support for the conversion between Pandas DataFrame and Table enables users to switch processing engines seamlessly without the need for an intermediate connector. In the remainder of this article, we will introduce how these functionalities work and how to use them with a step-by-step example.

Note: Currently, only Scalar Pandas UDFs are supported in PyFlink.

Pandas UDF in Flink 1.11

在flink1.10中已经可以使用标量Python UDF了,正如在博客《一小时吃透PyFlink》描述的那样 标量Python udf基于三个主要步骤工作:

Using scalar Python UDF was already possible in Flink 1.10 as described in a previous article on the Flink blog. Scalar Python UDFs work based on three primary steps:

  • Java算子序列化数据成字节流发送到Python算子中;
  1. Java operator serializes one input row to bytes and sends them to the Python worker;

    • Python算子反序列化字节数据利用用户的UDF进行计算;
  2. Python worker deserializes the input row and evaluates the Python UDF with it;

    • 最后Python算子将计算结果序列化发送回到Java 算子
  3. resulting row is serialized and sent back to the Java operator

虽然在PyFlink中提供对Python udf的支持极大地改善了用户体验,但它也有一些缺点,即导致:

While providing support for Python UDFs in PyFlink greatly improved the user experience, it had some drawbacks, namely resulting in:

  • 序列化反序列化成本很高
    High serialization/deserialization overhead
  • 在利用数据科学家使用的流行Python库(如Pandas或NumPy)时遇到困难,这些库提供了高性能的数据结构和函数,但是普通的UDF无法支持。
    Difficulty when leveraging popular Python libraries used by data scientists — such as Pandas or NumPy — that provide high-performance data structure and functions.

引入Pandas UDF就是为了解决这些缺点。对于Pandas UDF,在JVM和PVM之间以 Columnar 格式(Arrow memory format)传输一批行。这批行将被转换为Pandas系列的集合,并将其传输到Pandas UDF,然后利用流行的Python库(如Pandas、NumPy等)来实现Python UDF。

The introduction of Pandas UDF is used to address these drawbacks. For Pandas UDF, a batch of rows is transferred between the JVM and Python VM in a columnar format (Arrow memory format). The batch of rows will be converted into a collection of Pandas Series and will be transferred to the Pandas UDF to then leverage popular Python libraries (such as Pandas, Numpy, etc.) for the Python UDF implementation.


image.png


与普通Python UDF相比,Vectorized UDF的性能通常要高得多,因为通过 Apache Arrow序列化/反序列化开销被最小化了。 利用Pandas.Series作为输入/输出数据结构,我们可以充分利用Pandas and NumPy功能。与NumPy生态库的集成将成为并行化机器学习和其他大规模、分布式数据科学计算的流行解决方案(如特征工程、分布式模型应用)。

The performance of vectorized UDFs is usually much higher when compared to the normal Python UDF, as the serialization/deserialization overhead is minimized by falling back to Apache Arrow, while handling Pandas.Series as input/output allows us to take full advantage of the Pandas and NumPy libraries, making it a popular solution to parallelize Machine Learning and other large-scale, distributed data science workloads (e.g. feature engineering, distributed model application).

Conversion between PyFlink Table and Pandas DataFrame

Pandas DataFrame是Python社区中处理表格数据的事实标准,而PyFlink Table是Flink用Python语言表示的表格数据。提供PyFlink Table和Pandas DataFrame之间的转换,将允许在Python中处理数据时在PyFlink和Pandas之间无缝切换。用户可以使用一个执行引擎处理数据,并轻松切换到另一个执行引擎。例如,如果用户手头已经有一个Pandas DataFrame,并且想要执行一些昂贵的转换,他们可以很容易地将其转换为PyFlink Table,并利用Flink引擎进行分布式计算。另一方面,用户还可以将PyFlink Table 转换为Pandas DataFrame,然后利用Pandas生态系统提供的丰富功能进行转换计算。

Pandas DataFrame is the de-facto standard for working with tabular data in the Python community while PyFlink Table is Flink’s representation of the tabular data in Python language. Enabling the conversion between PyFlink Table and Pandas DataFrame allows switching between PyFlink and Pandas seamlessly when processing data in Python. Users can process data using one execution engine and switch to a different one effortlessly. For example, in case users already have a Pandas DataFrame at hand and want to perform some expensive transformation, they can easily convert it to a PyFlink Table and leverage the power of the Flink engine. On the other hand, users can also convert a PyFlink Table to a Pandas DataFrame and perform the same transformation with the rich functionalities provided by the Pandas ecosystem.

Examples

在Apache Flink中使用Python需要安装PyFlink。PyFlink可以通过PyPI获得,并且可以使用pip轻松安装:

Using Python in Apache Flink requires installing PyFlink. PyFlink is available through PyPI and can be easily installed using pip:

Check Python Version/检查Python版本

$ python --version
Python 3.7.6

Note: Python 3.5 or higher is required to install and run PyFlink.

如果你目前不是3.5+,我们也可以利用virtualenv:

$ pip install virtualenv
$ virtualenv --python /usr/local/bin/python3 py37
$ source py37/bin/activate

然后进行PyFlink安装:

$ python -m pip install apache-flink

Using Pandas UDF

Pandas UDFs 使用 pandas.Series 作为输入并返回与输出入长度相同的pandas.Series。Pandas UDFs的应用和普通UDF一样。只要将自定义项标记为Pandas,也就是在UDF decorator中添加额外的参数UDF_type=“Pandas”:

Pandas UDFs take pandas.Series as the input and return a pandas.Series of the same length as the output. Pandas UDFs can be used at the exact same place where non-Pandas functions are currently being utilized. To mark a UDF as a Pandas UDF, you only need to add an extra parameter udf_type=”pandas” in the udf decorator:


@udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
     result_type=DataTypes.FLOAT(), udf_type='pandas')
def interpolate(id, temperature):
    # takes id: pandas.Series and temperature: pandas.Series as input
    df = pd.DataFrame({'id': id, 'temperature': temperature})

    # use interpolate() to interpolate the missing temperature
    interpolated_df = df.groupby('id').apply(
        lambda group: group.interpolate(limit_direction='both'))

    # output temperature: pandas.Series
    return interpolated_df['temperature']

上面的Pandas UDF使用Pandas的dataframe.interpolate()为每个设备id插入缺失的温度数据的功能。这是一种常见的IOT场景需求,每个设备需要报告其设备温度,但由于各种原因,温度数据可能为空。下面是如何在PyFlink中使用Pandas UDF的完整示例。

The Pandas UDF above uses the Pandas dataframe.interpolate() function to interpolate the missing temperature data for each equipment id. This is a common IoT scenario whereby each equipment/device reports it’s id and temperature to be analyzed, but the temperature field may be null due to various reasons. Below is a complete example of how to use the Pandas UDF in PyFlink.


from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
import pandas as pd

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

@udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
     result_type=DataTypes.FLOAT(), udf_type='pandas')
def interpolate(id, temperature):
    # takes id: pandas.Series and temperature: pandas.Series as input
    df = pd.DataFrame({'id': id, 'temperature': temperature})

    # use interpolate() to interpolate the missing temperature
    interpolated_df = df.groupby('id').apply(
        lambda group: group.interpolate(limit_direction='both'))

    # output temperature: pandas.Series
    return interpolated_df['temperature']

t_env.register_function("interpolate", interpolate)

my_source_ddl = """
    create table mySource (
        id INT,
        temperature FLOAT 
    ) with (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        id INT,
        temperature FLOAT 
    ) with (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/tmp/output'
    )
"""

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

t_env.from_path('mySource')\
    .select("id, interpolate(id, temperature) as temperature") \
    .insert_into('mySink')

t_env.execute("pandas_udf_demo")

To submit the job, you:

  • Firstly, you need to prepare the input data in the “/tmp/input” file. For example,

$ echo -e  "1,98.0\n1,\n1,100.0\n2,99.0" > /tmp/input
  • Next, you can run this example on the command line,

$ python pandas_udf_demo.py

该命令在本地集群中构建并运行Python Table API程序。您还可以使用不同的命令行将Python Table API程序提交到远程集群,请参阅[此处](https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples)。

The command builds and runs the Python Table API program in a local mini-cluster. You can also submit the Python Table API program to a remote cluster using different command lines, see more details here.

  • Finally, you can see the execution result on the command line. Here you will find that all the temperature data with an empty value has been interpolated:

$  cat /tmp/output
1,98.0
1,99.0
1,100.0
2,99.0

Conversion between PyFlink Table and Pandas DataFrame

我们可以使用 from_pandas() 方法从 Pandas DataFrame创建PyFlink Ta ble
,也可以使用toPandas()方法将PyFlink Table 转换为Pandas DataFrame。

You can use the from_pandas() method to create a PyFlink Table from a Pandas DataFrame or use the to_pandas() method to convert a PyFlink Table to a Pandas DataFrame.


from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
import pandas as pd
import numpy as np

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")

# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.to_pandas()
print(pdf)

Conclusion & Upcoming work

本文介绍了flink1.11对Pandas的集成,包括Pandas UDF和table与Pandas之间的转换。实际上,在最新的Apache Flink版本中,PyFlink中添加了许多实用的特性,比如支持User-defined Table functions和Python UDF 的用户定义Metrics。此外,从flink1.11开始,您可以使用Cython支持构建PyFlink,并对Python UDF进行“Cythonize”,从而显著提高代码执行速度(与flink1.10中的Python UDF相比,提高了30倍)。
In this article, we introduce the integration of Pandas in Flink 1.11, including Pandas UDF and the conversion between table and Pandas. In fact, in the latest Apache Flink release, there are many excellent features added to PyFlink, such as support of User-defined Table functions and User-defined Metrics for Python UDFs. What’s more, from Flink 1.11, you can build PyFlink with Cython support and “Cythonize” your Python UDFs to substantially improve code execution speed (up to 30x faster, compared to Python UDFs in Flink 1.10).

image.png

社区未来的工作将集中在添加更多的特性和在后续版本中带来更多的优化。这样的优化和添加包括Python DataStream API和与Python生态系统的更多集成,例如在Flink中支持分布式Pandas。请继续关注即将发布的更多更新!

Future work by the community will focus on adding more features and bringing additional optimizations with follow up releases. Such optimizations and additions include a Python DataStream API and more integration with the Python ecosystem, such as support for distributed Pandas in Flink. Stay tuned for more information and updates with the upcoming releases!


shiming.gif

小结

很喜欢,也要坚持执行 逍遥子 的那句经典:
从“求同存异” 到 “求同尊异”,是寻找同路人必经的过程 Finding Our Fellow Travelers.

=======PyFlink团队欢迎你。。。==========

团队介绍:
阿里巴巴实时计算团队聚焦在 Apache Flink 和周边生态上,工作覆盖 Flink 所有相关领域,围绕 Flink 打造通用的实时计算解决方案。团队服务于阿里经济体内部所有 BU,阿里云外部客户,以及 Flink 社区的用户。
Flink 生态团队是阿里实时计算部的核心团队之一,主要负责从应用场景的角度出发,自上而下构建 Flink 完整生态(例如 Flink AI Flow, TF on Flink,多语言支持PyFlink)并改进和完善 Flink 核心引擎功能。

工作方向:
Flink AI,Flink Python/R/Go等语言支持以及Python生态库与Flink的集成,如分布式Pandas的开发!详见这里

工作内容:

  1. 透彻理解 Flink 的应用场景,构建合理的技术方案。包括分析、抽象和改进 Flink 引擎核心功能和 API,设计实现围绕 Flink 的周边服务。
  2. 通过 Flink 引擎和生态技术支持服务集团内外的客户。

经验能力要求:

  1. 大数据 / 工程项目(分布式系统、Python生态库开发等)经验,开源项目 Committer / PMC 优先。
  2. 独立设计开发大中型系统框架、模块和服务的经验,对性能、接口、可扩展性、兼容性,高可用性等方面有较深刻的理解。
  3. 有以下项目开发经验者优先: Flink, Spark,Kafka,Pulsar,NumPy,Pandas,PyTorch,etc.
  4. 沟通能力强,具有国际社区开发经验优先。
  5. 技术上有追求极致的精神。

欢迎私聊,钉钉:金竹, 微信:18158190225

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
2月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
442 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
343 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
3月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
1314 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
4月前
|
存储 人工智能 数据处理
对话王峰:Apache Flink 在 AI 时代的“剑锋”所向
Flink 2.0 架构升级实现存算分离,迈向彻底云原生化,支持更大规模状态管理、提升资源效率、增强容灾能力。通过流批一体与 AI 场景融合,推动实时计算向智能化演进。生态项目如 Paimon、Fluss 和 Flink CDC 构建湖流一体架构,实现分钟级时效性与低成本平衡。未来,Flink 将深化 AI Agents 框架,引领事件驱动的智能数据处理新方向。
500 6
|
4月前
|
消息中间件 存储 Kafka
Apache Flink错误处理实战手册:2年生产环境调试经验总结
本文由 Ververica 客户成功经理 Naci Simsek 撰写,基于其在多个行业 Flink 项目中的实战经验,总结了 Apache Flink 生产环境中常见的三大典型问题及其解决方案。内容涵盖 Kafka 连接器迁移导致的状态管理问题、任务槽负载不均问题以及 Kryo 序列化引发的性能陷阱,旨在帮助企业开发者避免常见误区,提升实时流处理系统的稳定性与性能。
435 0
Apache Flink错误处理实战手册:2年生产环境调试经验总结
|
4月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
597 9
Apache Flink:从实时数据分析到实时AI
|
4月前
|
SQL 人工智能 API
Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元
Apache Flink 2.1.0 正式发布,标志着实时数据处理引擎向统一 Data + AI 平台迈进。新版本强化了实时 AI 能力,支持通过 Flink SQL 和 Table API 创建及调用 AI 模型,新增 Model DDL、ML_PREDICT 表值函数等功能,实现端到端的实时 AI 工作流。同时增强了 Flink SQL 的流处理能力,引入 Process Table Functions(PTFs)、Variant 数据类型,优化流式 Join 及状态管理,显著提升作业稳定性与资源利用率。
538 0
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
795 33
The Past, Present and Future of Apache Flink
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
1603 13
Apache Flink 2.0-preview released

热门文章

最新文章

推荐镜像

更多