Apache Flink 2.2.0: 推动实时数据与人工智能融合,赋能AI时代的流处理

简介: Apache Flink 2.2.0 发布,增强AI能力,支持ML_PREDICT与VECTOR_SEARCH,提升物化表、批处理及PyFlink性能,优化连接器与调度,助力实时智能数据处理。

Apache Flink PMC 很高兴地宣布 Apache Flink 2.2.0 版本发布了。Flink 2.2.0 版本进一步增强了 AI 函数 和 向量检索功能,改进了物化表和连接器框架,并优化了批处理和 PyFlink 支持。Flink 2.2.0 版本总共由来自全球的 73 位贡献者参与,累计推进了 9 个 FLIP(Flink 重要改进提案),完成了 220 多项缺陷修复和改进。


Flink 2.2.0 版本无缝集成实时数据处理与人工智能,开启了人工智能时代。该版本增强了用于大规模语言模型推理的 ML_PREDICT 和用于实时向量搜索的 VECTOR_SEARCH,从而增强了流式人工智能应用的能力。重点功能包括:物化表增强、Delta Join 优化、均衡任务调度和更多连接器优化(包括限流框架和均匀分片),显著提升了处理性能、可扩展性和可靠性,为构建智能、低延迟的数据管道奠定了坚实的基础。我们衷心感谢所有贡献者的宝贵支持!


接下来让我们深入了解 Flink 2.2.0 版本的重点内容。


Flink SQL 增强和改进

实时AI函数

从 Flink 2.1 版本起,Apache Flink 通过 Flink SQL 中的 ML_PREDICT 函数支持使用 LLM 功能,用户能够以简单高效的方式执行语义分析。在 Flink 2.2.0 版本中,Table API 支持了模型推理操作,允许将机器学习模型直接集成到数据处理中,并使用特定提供商(例如 OpenAI)的模型对数据进行预测处理。

使用示例:

  • 创建并使用模型
// 1. Set up the local environment
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

// 2. Create a source table from in-memory data
Table myTable = tEnv.fromValues(
    ROW(FIELD("text", STRING())),
    row("Hello"),
    row("Machine Learning"),
    row("Good morning")
);

// 3. Create model
tEnv.createModel(
    "my_model",
    ModelDescriptor.forProvider("openai")
        .inputSchema(Schema.newBuilder().column("input", STRING()).build())
        .outputSchema(Schema.newBuilder().column("output", STRING()).build())
        .option("endpoint", "https://api.openai.com/v1/chat/completions")
        .option("model", "gpt-4.1")
        .option("system-prompt", "translate to chinese")
        .option("api-key", "<your-openai-api-key-here>")
        .build()
);

Model model = tEnv.fromModel("my_model");

// 4. Use the model to make predictions
Table predictResult = model.predict(myTable, ColumnList.of("text"));

// 5. Async prediction example
Table asyncPredictResult = model.predict(
    myTable, 
    ColumnList.of("text"), 
    Map.of("async", "true")
);

更多信息请参考:FLINK-38104[1]、FLIP-526[2]

向量搜索

Apache Flink 通过 ML_PREDICT 函数和大模型进行了无缝衔接,已在情感分析、实时问答系统等场景中得到技术验证。然而目前的架构仅允许 Flink 使用嵌入模型将非结构化文本数据转换为高维向量特征,然后将这些特征持久化到下游存储系统,缺乏对向量空间进行实时在线查询和相似性分析的能力。


Flink 2.2.0 提供了 VECTOR_SEARCH 函数,使用户能够直接在 Flink 中执行流式向量相似性搜索和实时上下文检索。


以下 SQL 语句为例:

-- Basic usage
SELECT * FROM 
input_table, LATERAL VECTOR_SEARCH(
  TABLE vector_table,
  input_table.vector_column,
  DESCRIPTOR(index_column),
  10
);

-- With configuration options
SELECT * FROM 
input_table, LATERAL VECTOR_SEARCH(
  TABLE vector_table,
  input_table.vector_column,
  DESCRIPTOR(index_column),
  10,
  MAP['async', 'true', 'timeout', '100s']
);

-- Using named parameters
SELECT * FROM 
input_table, LATERAL VECTOR_SEARCH(
  SEARCH_TABLE => TABLE vector_table,
  COLUMN_TO_QUERY => input_table.vector_column,
  COLUMN_TO_SEARCH => DESCRIPTOR(index_column),
  TOP_K => 10,
  CONFIG => MAP['async', 'true', 'timeout', '100s']
);

-- Searching with contant value
SELECT * 
FROM VECTOR_SEARCH(
  TABLE vector_table,
  ARRAY[10, 20],
  DESCRIPTOR(index_column),
  10,
);

更多信息请参考:FLINK-38422[3]、FLIP-540[4]、Vector Search[5]


物化表

物化表[6]是 Flink SQL 中引入的一种新的表类型,用于简化批处理和流式数据管道,提供一致的开发体验。创建物化表时,只需指定数据新鲜度和查询条件,引擎即可自动生成物化表的模式,并创建相应的数据管道以保证指定的数据新鲜度。


从 Flink 2.2.0 开始,FRESHNESS 不再是 CREATE MATERIALIZED TABLE 和 CREATE OR ALTER MATERIALIZED TABLE DDL 语句的必要组成部分。Flink 2.2.0 引入了一个新的 MaterializedTableEnricher 接口,该接口为自定义的默认逻辑提供了一个正式的扩展方式,允许高级用户实现“智能”的默认行为(例如,从上游表推断数据新鲜度)。


此外,用户可以使用 DISTRIBUTED BYDISTRIBUTED INTO 来支持物化表的分桶。用户还可以使用 SHOW MATERIALIZED TABLES 来展示所有物化表。

使用方式如下:

CREATE MATERIALIZED TABLE my_materialized_table
    PARTITIONED BY (ds)
    DISTRIBUTED INTO 5 BUCKETS
    FRESHNESS = INTERVAL '1' HOUR
    AS SELECT
        ds
    FROM
     ...

更多信息请参考:FLINK-38532[7]、FLINK-38311[8]、FLIP-542[9]、FLIP-551[10]


SinkUpsertMaterializer V2

SinkUpsertMaterializer 是 Flink 中的一个算子,在乱序的变更事件发送到 upsert 接收器之前对其进行协调。在某些情况下,这个算子的性能会呈指数级下降。Flink 2.2.0 引入了一种针对此类情况优化的新版本实现。

更多信息请参考:FLINK-38459[11]、FLIP-544[12]


Delta Join

Apache Flink 2.1 版本引入了新的 Delta Join 算子,以缓解 regular join 中由于庞大状态带来的问题。通过双向查找 join 取代了 regular join 维护的大量状态,直接重用源表中的数据。


Flink 2.2.0 增加了对更多 SQL 模式转换为 Delta Join 的支持。Delta Join 现在支持在不使用 DELETE 操作的情况下应用 CDC 数据源,并允许在数据源之后进行投影和过滤操作。此外,Delta Join 还支持缓存,这有助于减少对外部存储的请求。


目前,Apache Fluss (Incubating) [13]源表可以用作 Delta Join 的源表,可以在 Fluss[14]相关文档查看对应表的定义方式和使用案例。


更多信息请参考:Delta Joins[15]、Delta Join in Fluss[16]

SQL类型

在 Flink 2.2 版本前,SQL 中定义的ROW类型(例如 SELECT CAST(f AS ROW<i NOT NULL>))会忽略 NOT NULL 约束。这虽然更符合 SQL 标准,但在处理嵌套数据时会导致许多类型不一致和晦涩难懂的错误信息。例如,这阻止了在计算列或join key中使用ROW类型。Flink 2.2.0 版本修改了该行为,考虑ROW的可空性。配置项 table.legacy-nested-row-nullability 允许在需要开启来恢复旧行为,建议更新之前忽略约束的已有查询。


Flink 2.2.0 转换字符串为 TIME 类型时会考虑正确的精度(0-3),将不正确的字符串转换为时间类型(例如,小时部分大于 24)现在会导致运行时异常。BINARY 和 VARBINARY 之间的类型转换现在会正确考虑长度。

更多信息请参考:FLINK-20539[17]、FLINK-38181[18]


使用 UniqueKeys 进行状态管理

Flink 2.2 版本对 StreamingMultiJoinOperator 进行了优化和变更,使用 UniqueKeys 而不是 UpsertKeys 来进行状态管理。该算子在 Flink 2.1 中以实验状态发布,后续会持续进行优化,这些优化可能会导致不兼容的变化。

更多信息请参考:FLINK-38209[19]


Runtime 新特性和改进

均衡任务调度

Flink 2.2.0 引入了一种均衡的任务调度策略,以实现任务管理器的任务负载均衡并减少作业瓶颈。

更多信息请参考:FLINK-31757[20]、FLIP-370[21]


增强HistoryServer工作历史保留策略

在 Flink 2.2.0 版本前,HistoryServer 仅支持基于数量的作业归档保留策略,这不足以满足需要基于时间保留或组合规则的场景。用户在 Flink 2.2.0 可以使用新的配置项 historyserver.archive.retained-ttl 并结合 historyserver.archive.retained-jobs 来满足更多场景需求。

更多信息请参考:FLINK-38229[22]、FLIP-490[23]


Metrics

自 Flink 2.2.0 版本起,用户可以为作业中使用的每个算子/转换分配自定义指标变量。这些变量随后会被指标报告器转换为标签,允许用户为特定运算符的指标添加标签。例如,您可以使用此功能来命名和区分数据源。

用户现在可以通过 traces.checkpoint.span-detail-level[24] 控制检查点 span 的详细级别。最高级别会报告每个任务和子任务的 span 树。报告的自定义 span 现在可以包含子 span。更多详情请参阅 Traces[25]

更多信息请参考:FLINK-38158[26]、FLINK-38353[27]

Connector 框架优化

Scan数据源限流功能

Flink 作业频繁地与外部系统交换数据,这会消耗网络带宽和 CPU 资源。当这些资源稀缺时,过于频繁地拉取数据可能会干扰其他工作负载,例如 Kafka/MySQL CDC 连接器。在 Flink 2.2 中,我们引入了 RateLimiter 接口,为Scan数据源提供请求速率限制,连接器开发人员可以将其与限流框架集成,以实现自己的限流策略。此功能仅在 DataStream API 中可用。

更多信息请参考:FLINK-38497[28]、FLIP-535[29]


支持均匀分片

SplitEnumerator 负责分配分片工作,但它无法了解这些分片的实际运行时状态或分布情况。这使得 SplitEnumerator 无法保证分片均匀分布,并且极易出现数据倾斜。从 Flink 2.2 开始,SplitEnumerator 获得了分片分布信息,并提供了在运行时均匀分配分片的能力。例如,此功能可用于解决 Kafka 连接器中的数据倾斜问题。

更多信息请参考:FLINK-38564[30]、FLIP-537[31]

其他改进

PyFlink

在 Flink 2.2 中,我们为 Python DataStream API 添加了异步函数支持。这使得 Python 用户能够在 Flink 作业中高效地查询外部服务,例如通常部署在独立 GPU 集群中的 LLM 服务等。


此外,我们还提供了全面的支持,以确保外部服务访问的稳定性。一方面,我们支持限制发送到外部服务的并发请求数量,以避免服务过载。另一方面,我们也添加了重试机制,以应对可能由网络抖动或其他瞬态问题导致的临时服务不可用情况。

以下是一个简单的使用示例:

from typing import List
from ollama import AsyncClient

from pyflink.common import Types, Time, Row
from pyflink.datastream import (
    StreamExecutionEnvironment,
    AsyncDataStream,
    AsyncFunction,
    RuntimeContext,
    CheckpointingMode,
)


class AsyncLLMRequest(AsyncFunction[Row, str]):

    def __init__(self, host, port):
        self._host = host
        self._port = port

    def open(self, runtime_context: RuntimeContext):
        self._client = AsyncClient(host='{}:{}'.format(self._host, self._port))

    async def async_invoke(self, value: Row) -> List[str]:
        message = {"role": "user", "content": value.question}
        question_id = value.id
        ollam_response = await self._client.chat(model="qwen3:4b", messages=[message])
        return [
            f"Question ID {question_id}: response: {ollam_response['message']['content']}"
        ]

    def timeout(self, value: Row) -> List[str]:
        # return a default value in case timeout
        return [f"Timeout for this question: {value.a}"]


def main(output_path):
    env = StreamExecutionEnvironment.get_execution_environment()
    env.enable_checkpointing(30000, CheckpointingMode.EXACTLY_ONCE)
    ds = env.from_collection(
        [
            ("Who are you?", 0),
            ("Tell me a joke", 1),
            ("Tell me the result of comparing 0.8 and 0.11", 2),
        ],
        type_info=Types.ROW_NAMED(["question", "id"], [Types.STRING(), Types.INT()]),
    )

    result_stream = AsyncDataStream.unordered_wait(
        data_stream=ds,
        async_function=AsyncLLMRequest(),
        timeout=Time.seconds(100),
        capacity=1000,
        output_type=Types.STRING(),
    )

    # define the sink
    result_stream.print()

    # submit for execution
    env.execute()


if __name__ == "__main__":
    main(known_args.output)

更多信息请参考:FLINK-38190[32]

升级 commons-lang3 依赖到 3.18.0

将 commons-lang3 从 3.12.0 升级到 3.18.0 以解决 CVE-2025-48924。

更多信息请参考:FLINK-38193[33]

protobuf-java 从 3.x 升级到 4.32.1

Flink 2.2 从 protobuf-java 3.21.7(Protocol Buffers 版本 21)升级到 protobuf-java 4.32.1(对应 Protocol Buffers 版本 32)。此次升级实现了以下功能:

  • Protobuf 版本支持:完全支持 Protocol Buffers v27 及更高版本中引入的 edition = "2023"edition = "2024" 语法。版本提供了一种统一的方法,将 proto2 和 proto3 的功能与细粒度的特性控制相结合。
  • 改进 Proto3 字段存在性检查:更好地处理 proto3 中的可选字段,不再受限于旧版 protobuf 的限制,无需将 protobuf.read-default-values 设置为 true 来进行字段存在性检查。
  • 性能提升:利用了 11 个 Protocol Buffers 版本(版本 22-32)中的性能改进和错误修复。
  • 现代 Protobuf 特性:可访问更新的 protobuf 功能,包括 Edition 2024 特性和改进的运行时行为。

使用 proto2 和 proto3 .proto 文件的用户可以兼容使用,无需更改。

更多信息请参考:FLINK-38547[34]

升级注意事项

Flink 社区致力于确保版本升级过程尽可能顺畅。但某些变更可能需要用户在升级到 2.2 版本时,对程序的某些部分进行调整。请参阅发版说明[35]以获取升级过程中需要进行的调整和需要检查的全部注意事项。

相关链接:

[1] https://issues.apache.org/jira/browse/FLINK-38104

[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-526%3A+Model+ML_PREDICT%2C+ML_EVALUATE+Table+API

[3] https://issues.apache.org/jira/browse/FLINK-38422

[4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-540%3A+Support+VECTOR_SEARCH+in+Flink+SQL

[5] https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/table/sql/queries/vector-search/

[6] https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/table/materialized-table/overview/

[7] https://issues.apache.org/jira/browse/FLINK-38532

[8] https://issues.apache.org/jira/browse/FLINK-38311

[9] https://cwiki.apache.org/confluence/display/FLINK/FLIP-542%3A+Make+materialized+table+DDL+consistent+with+regular+tables

[10] https://cwiki.apache.org/confluence/display/FLINK/FLIP-551%3A+Make+FRESHNESS+Optional+for+Materialized+Tables

[11] https://issues.apache.org/jira/browse/FLINK-38459

[12] https://cwiki.apache.org/confluence/display/FLINK/FLIP-544%3A+SinkUpsertMaterializer+V2

[13] https://fluss.apache.org/blog/fluss-open-source/

[14] https://fluss.apache.org/docs/engine-flink/delta-joins/

[15] https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/table/tuning/#delta-joins

[16] https://fluss.apache.org/docs/engine-flink/delta-joins/

[17] https://issues.apache.org/jira/browse/FLINK-20539

[18] https://issues.apache.org/jira/browse/FLINK-38181

[19] https://issues.apache.org/jira/browse/FLINK-38209

[20] https://issues.apache.org/jira/browse/FLINK-38229

[21] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=332499857

[22] https://issues.apache.org/jira/browse/FLINK-38229

[23] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=332499857

[24] https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/deployment/config/#traces-checkpoint-span-detail-level

[25] https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/ops/traces/

[26] https://issues.apache.org/jira/browse/FLINK-38158

[27] https://issues.apache.org/jira/browse/FLINK-38353

[28] https://issues.apache.org/jira/browse/FLINK-38497

[29] https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source

[30] https://issues.apache.org/jira/browse/FLINK-38564

[31] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480

[32] https://issues.apache.org/jira/browse/FLINK-38190

[33] https://issues.apache.org/jira/browse/FLINK-38193

[34] https://issues.apache.org/jira/browse/FLINK-38547

[35] https://nightlies.apache.org/flink/flink-docs-release-2.2/release-notes/flink-2.2/

相关文章
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
304 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
8天前
|
运维 监控 前端开发
基于AI大模型的故障诊断与根因分析落地实现
本项目基于Dify平台构建多智能体协作的AIOps故障诊断系统,融合指标、日志、链路等多源数据,通过ReAct模式实现自动化根因分析(RCA),结合MCP工具调用与分层工作流,在钉钉/企业微信中以交互式报告辅助运维,显著降低MTTD/MTTR。
|
JavaScript 前端开发 UED
详解Vue——的双向数据绑定是如何实现的?
详解Vue——的双向数据绑定是如何实现的?
277 0
|
存储 缓存 算法
【软件设计师备考 专题 】主存-Cache存储系统的工作原理
【软件设计师备考 专题 】主存-Cache存储系统的工作原理
536 0
|
SQL Serverless 数据库
|
29天前
|
关系型数据库 分布式数据库 数据库
直播预告|备赛指南:2025 PolarDB数据库创新设计赛培训课程开讲啦!
【第2届PolarDB数据库创新设计赛】火热进行中!🔥 为助你高效备赛,特推系列培训课程:官方规则解读、往届获奖经验分享、全程进阶指导,新手老将皆可收获满满干货!🎯 点击了解详情👇
|
24天前
|
存储 消息中间件 关系型数据库
从 Snowflake 到 Apache Doris:Planet 实时分析成本直降 80%、查询加速 90 倍
Planet深耕支付与税务数字化近40年,服务全球零售、酒店与旅游行业。为应对数据增长挑战,其数据团队将数仓从Snowflake迁移至Apache Doris,实现月成本降低80%、查询性能提升最高90倍,并达成实时分析能力,构建了高效、低成本、可扩展的数据新架构,为业务发展奠定坚实基础。
126 0
从 Snowflake 到 Apache Doris:Planet 实时分析成本直降 80%、查询加速 90 倍
|
16天前
|
存储 SQL 缓存
Delta Join:为超大规模流处理实现计算与历史数据解耦
Delta Join(FLIP-486)是Flink流式Join的范式革新,通过将计算与历史数据解耦,避免传统Join因存储全量状态导致的资源爆炸。它采用无状态查询机制,按需从Fluss或Paimon等外部存储获取数据,显著降低状态大小、Checkpoint时间及恢复成本。实测中消除50TB状态,资源消耗降10倍,CPU内存节省超80%,作业恢复提速87%。兼容标准SQL,自动优化转换,适用于高基数流式关联、实时审计等场景,标志着大规模流处理进入高效稳定新阶段。
|
4月前
|
存储 SQL 关系型数据库
RDS DuckDB技术解析一:当 MySQL遇见列式存储引擎
RDS MySQL DuckDB分析实例以​列式存储与向量化计算​为核心,实现​复杂分析查询性能百倍跃升​,为企业在海量数据规模场景下提供​实时分析能力​,加速企业数据驱动型决策效能。​​
|
9月前
|
SQL 存储 人工智能
Apache Flink 2.0.0: 实时数据处理的新纪元
Apache Flink 2.0.0 正式发布!这是自 Flink 1.0 发布九年以来的首次重大更新,凝聚了社区两年的努力。此版本引入分离式状态管理、物化表、流批统一等创新功能,优化云原生环境下的资源利用与性能表现,并强化了对人工智能工作流的支持。同时,Flink 2.0 对 API 和配置进行了全面清理,移除了过时组件,为未来的发展奠定了坚实基础。感谢 165 位贡献者的辛勤付出,共同推动实时计算进入新纪元!
1078 1
Apache Flink 2.0.0: 实时数据处理的新纪元