Apache Flink 2.2.0: 推动实时数据与人工智能融合,赋能AI时代的流处理

简介: Apache Flink 2.2.0 发布!新增 ML_PREDICT 与 VECTOR_SEARCH 实时 AI 函数,增强物化表、Delta Join 及连接器能力,优化批处理与 PyFlink。73 位贡献者共建,9 大 FLIP,220+ 修复改进,助力智能低延迟数据管道。

Apache Flink PMC 很高兴地宣布 Apache Flink 2.2.0 版本发布了。Flink 2.2.0 版本进一步增强了 AI 函数 和 向量检索功能,改进了物化表和连接器框架,并优化了批处理和 PyFlink 支持。Flink 2.2.0 版本总共由来自全球的 73 位贡献者参与,累计推进了 9 个 FLIP(Flink 重要改进提案),完成了 220 多项缺陷修复和改进。

Flink 2.2.0 版本无缝集成实时数据处理与人工智能,开启了人工智能时代。该版本增强了用于大规模语言模型推理的 ML_PREDICT 和用于实时向量搜索的 VECTOR_SEARCH,从而增强了流式人工智能应用的能力。重点功能包括:物化表增强、Delta Join优化、均衡任务调度和更多连接器优化(包括限流框架和均匀分片),显著提升了处理性能、可扩展性和可靠性,为构建智能、低延迟的数据管道奠定了坚实的基础。我们衷心感谢所有贡献者的宝贵支持!

接下来让我们深入了解Flink 2.2.0版本的重点内容。

Flink SQL 改进

实时AI函数

从 Flink 2.1 版本起,Apache Flink 通过 Flink SQL 中的 ML_PREDICT 函数支持使用 LLM 功能,用户能够以简单高效的方式执行语义分析。在 Flink 2.2.0 版本中,Table API 支持了模型推理操作,允许将机器学习模型直接集成到数据处理中,并使用特定提供商(例如 OpenAI)的模型对数据进行预测处理。

使用示例:

  • 创建并使用模型
// 1. Set up the local environment
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

// 2. Create a source table from in-memory data
Table myTable = tEnv.fromValues(
    ROW(FIELD("text", STRING())),
    row("Hello"),
    row("Machine Learning"),
    row("Good morning")
);

// 3. Create model
tEnv.createModel(
    "my_model",
    ModelDescriptor.forProvider("openai")
        .inputSchema(Schema.newBuilder().column("input", STRING()).build())
        .outputSchema(Schema.newBuilder().column("output", STRING()).build())
        .option("endpoint", "https://api.openai.com/v1/chat/completions")
        .option("model", "gpt-4.1")
        .option("system-prompt", "translate to chinese")
        .option("api-key", "<your-openai-api-key-here>")
        .build()
);

Model model = tEnv.fromModel("my_model");

// 4. Use the model to make predictions
Table predictResult = model.predict(myTable, ColumnList.of("text"));

// 5. Async prediction example
Table asyncPredictResult = model.predict(
    myTable, 
    ColumnList.of("text"), 
    Map.of("async", "true")
);

更多信息

向量搜索

Apache Flink 通过 ML_PREDICT 函数和大模型进行了无缝衔接,已在情感分析、实时问答系统等场景中得到技术验证。然而目前的架构仅允许 Flink 使用嵌入模型将非结构化文本数据转换为高维向量特征,然后将这些特征持久化到下游存储系统,缺乏对向量空间进行实时在线查询和相似性分析的能力。

Flink 2.2.0 提供了 VECTOR_SEARCH 函数,使用户能够直接在 Flink 中执行流式向量相似性搜索和实时上下文检索。

以下SQL语句为例:

-- Basic usage
SELECT * FROM 
input_table, LATERAL VECTOR_SEARCH(
  TABLE vector_table,
  input_table.vector_column,
  DESCRIPTOR(index_column),
  10
);

-- With configuration options
SELECT * FROM 
input_table, LATERAL VECTOR_SEARCH(
  TABLE vector_table,
  input_table.vector_column,
  DESCRIPTOR(index_column),
  10,
  MAP['async', 'true', 'timeout', '100s']
);

-- Using named parameters
SELECT * FROM 
input_table, LATERAL VECTOR_SEARCH(
  SEARCH_TABLE => TABLE vector_table,
  COLUMN_TO_QUERY => input_table.vector_column,
  COLUMN_TO_SEARCH => DESCRIPTOR(index_column),
  TOP_K => 10,
  CONFIG => MAP['async', 'true', 'timeout', '100s']
);

-- Searching with contant value
SELECT * 
FROM VECTOR_SEARCH(
  TABLE vector_table,
  ARRAY[10, 20],
  DESCRIPTOR(index_column),
  10,
);

更多信息

物化表

物化表是 Flink SQL 中引入的一种新的表类型,用于简化批处理和流式数据管道,提供一致的开发体验。创建物化表时,只需指定数据新鲜度和查询条件,引擎即可自动生成物化表的模式,并创建相应的数据管道以保证指定的数据新鲜度。

从 Flink 2.2.0 开始,FRESHNESS 不再是 CREATE MATERIALIZED TABLE 和 CREATE OR ALTER MATERIALIZED TABLE DDL 语句的必要组成部分。Flink 2.2.0 引入了一个新的 MaterializedTableEnricher 接口,该接口为自定义的默认逻辑提供了一个正式的扩展方式,允许高级用户实现“智能”的默认行为(例如,从上游表推断数据新鲜度)。

此外,用户可以使用 DISTRIBUTED BYDISTRIBUTED INTO 来支持物化表的分桶。用户还可以使用 SHOW MATERIALIZED TABLES 来展示所有物化表。

使用方式如下:

CREATE MATERIALIZED TABLE my_materialized_table
    PARTITIONED BY (ds)
    DISTRIBUTED INTO 5 BUCKETS
    FRESHNESS = INTERVAL '1' HOUR
    AS SELECT
        ds
    FROM
     ...

更多信息

SinkUpsertMaterializer V2

SinkUpsertMaterializer是 Flink 中的一个算子,在乱序的变更事件发送到 upsert 接收器之前对其进行协调。在某些情况下,这个算子的性能会呈指数级下降。Flink 2.2.0 引入了一种针对此类情况优化的新版本实现。

更多信息

Delta Join

Apache Flink 2.1 版本引入了新的Delta Join算子,以缓解regular join中由于庞大状态带来的问题。通过双向查找 join 取代了regular join维护的大量状态,直接重用源表中的数据。

Flink 2.2.0 增加了对更多 SQL 模式转换为Delta Join的支持。Delta Join现在支持在不使用 DELETE 操作的情况下应用 CDC 数据源,并允许在数据源之后进行投影和过滤操作。此外,Delta Join还支持缓存,这有助于减少对外部存储的请求。

目前,Apache Fluss (Incubating) 源表可以用作Delta Join的源表,可以在Fluss相关文档查看对应表的定义方式和使用案例。

更多信息

SQL类型

在 Flink 2.2 版本前,SQL 中定义的ROW类型(例如 SELECT CAST(f AS ROW<i NOT NULL>))会忽略 NOT NULL 约束。这虽然更符合 SQL 标准,但在处理嵌套数据时会导致许多类型不一致和晦涩难懂的错误信息。例如,这阻止了在计算列或join key中使用ROW类型。Flink 2.2.0 版本修改了该行为,考虑ROW的可空性。配置项 table.legacy-nested-row-nullability 允许在需要开启来恢复旧行为,建议更新之前忽略约束的已有查询。

Flink 2.2.0 转换为 TIME 类型时会考虑正确的精度(0-3),将不正确的字符串转换为时间类型(例如,小时部分大于 24)现在会导致运行时异常。BINARY 和 VARBINARY 之间的类型转换现在会正确考虑长度。

更多信息

使用 UniqueKeys 进行状态管理

Flink 2.2 版本对 StreamingMultiJoinOperator 进行了优化和变更,使用 UniqueKeys 而不是 UpsertKeys 来进行状态管理。该算子在 Flink 2.1 中以实验状态发布,后续会持续进行优化,这些优化可能会导致不兼容的变化。

更多信息

Runtime

均衡任务调度

Flink 2.2.0 引入了一种均衡的任务调度策略,以实现任务管理器的任务负载均衡并减少作业瓶颈。

更多信息

增强HistoryServer工作历史保留策略

在 Flink 2.2.0 版本前,HistoryServer 仅支持基于数量的作业归档保留策略,这不足以满足需要基于时间保留或组合规则的场景。用户在 Flink 2.2.0 可以使用新的配置项 historyserver.archive.retained-ttl 并结合 historyserver.archive.retained-jobs来满足更多场景需求。

更多信息

Metrics

自 Flink 2.2.0 版本起,用户可以为作业中使用的每个算子/转换分配自定义指标变量。这些变量随后会被指标报告器转换为标签,允许用户为特定运算符的指标添加标签。例如,您可以使用此功能来命名和区分数据源。

用户现在可以通过 traces.checkpoint.span-detail-level 控制检查点span的详细级别。最高级别会报告每个任务和子任务的 span 树。报告的自定义 span 现在可以包含子 span。更多详情请参阅Traces

更多信息

Connectors

Scan数据源限流功能

Flink 作业频繁地与外部系统交换数据,这会消耗网络带宽和 CPU 资源。当这些资源稀缺时,过于频繁地拉取数据可能会干扰其他工作负载,例如 Kafka/MySQL CDC 连接器。在 Flink 2.2 中,我们引入了 RateLimiter 接口,为Scan数据源提供请求速率限制,连接器开发人员可以将其与限流框架集成,以实现自己的限流策略。此功能仅在 DataStream API 中可用。

更多信息

支持均匀分片

SplitEnumerator 负责分配分片工作,但它无法了解这些分片的实际运行时状态或分布情况。这使得 SplitEnumerator 无法保证分片均匀分布,并且极易出现数据倾斜。从 Flink 2.2 开始,SplitEnumerator 获得了分片分布信息,并提供了在运行时均匀分配分片的能力。例如,此功能可用于解决 Kafka 连接器中的数据倾斜问题。

更多信息

其他内容

PyFlink

在 Flink 2.2 中,我们为 Python DataStream API 添加了异步函数支持。这使得 Python 用户能够在 Flink 作业中高效地查询外部服务,例如通常部署在独立 GPU 集群中的 LLM 服务等。

此外,我们还提供了全面的支持,以确保外部服务访问的稳定性。一方面,我们支持限制发送到外部服务的并发请求数量,以避免服务过载。另一方面,我们也添加了重试机制,以应对可能由网络抖动或其他瞬态问题导致的临时服务不可用情况。

以下是一个简单的使用示例:

from typing import List
from ollama import AsyncClient

from pyflink.common import Types, Time, Row
from pyflink.datastream import (
    StreamExecutionEnvironment,
    AsyncDataStream,
    AsyncFunction,
    RuntimeContext,
    CheckpointingMode,
)


class AsyncLLMRequest(AsyncFunction[Row, str]):

    def __init__(self, host, port):
        self._host = host
        self._port = port

    def open(self, runtime_context: RuntimeContext):
        self._client = AsyncClient(host='{}:{}'.format(self._host, self._port))

    async def async_invoke(self, value: Row) -> List[str]:
        message = {
   "role": "user", "content": value.question}
        question_id = value.id
        ollam_response = await self._client.chat(model="qwen3:4b", messages=[message])
        return [
            f"Question ID {question_id}: response: {ollam_response['message']['content']}"
        ]

    def timeout(self, value: Row) -> List[str]:
        # return a default value in case timeout
        return [f"Timeout for this question: {value.a}"]


def main(output_path):
    env = StreamExecutionEnvironment.get_execution_environment()
    env.enable_checkpointing(30000, CheckpointingMode.EXACTLY_ONCE)
    ds = env.from_collection(
        [
            ("Who are you?", 0),
            ("Tell me a joke", 1),
            ("Tell me the result of comparing 0.8 and 0.11", 2),
        ],
        type_info=Types.ROW_NAMED(["question", "id"], [Types.STRING(), Types.INT()]),
    )

    result_stream = AsyncDataStream.unordered_wait(
        data_stream=ds,
        async_function=AsyncLLMRequest(),
        timeout=Time.seconds(100),
        capacity=1000,
        output_type=Types.STRING(),
    )

    # define the sink
    result_stream.print()

    # submit for execution
    env.execute()


if __name__ == "__main__":
    main(known_args.output)

更多信息

升级 commons-lang3 依赖到 3.18.0

将 commons-lang3 从 3.12.0 升级到 3.18.0 以解决 CVE-2025-48924。

更多信息

protobuf-java 从 3.x 升级到 4.32.1

Flink 2.2 从protobuf-java 3.21.7(Protocol Buffers 版本 21)升级到 protobuf-java 4.32.1(对应 Protocol Buffers 版本 32)。此次升级实现了以下功能:

  • Protobuf 版本支持:完全支持 Protocol Buffers v27 及更高版本中引入的 edition = "2023"edition = "2024" 语法。版本提供了一种统一的方法,将 proto2 和 proto3 的功能与细粒度的特性控制相结合。
  • 改进 Proto3 字段存在性检查:更好地处理 proto3 中的可选字段,不再受限于旧版 protobuf 的限制,无需将 protobuf.read-default-values 设置为 true 来进行字段存在性检查。
  • 性能提升:利用了 11 个 Protocol Buffers 版本(版本 22-32)中的性能改进和错误修复。
  • 现代 Protobuf 特性:可访问更新的 protobuf 功能,包括 Edition 2024 特性和改进的运行时行为。

使用 proto2 和 proto3 .proto 文件的用户可以兼容使用,无需更改。

更多信息

升级注意事项

Flink 社区致力于确保版本升级过程尽可能顺畅。但某些变更可能需要用户在升级到 2.2 版本时,对程序的某些部分进行调整。请参阅发版说明以获取升级过程中需要进行的调整和需要检查的问题完整列表。

贡献者列表

Apache Flink 社区衷心感谢所有为此次版本发布作出贡献的开发者:

Alan Sheinberg, Aleksandr Iushmanov, AlexYinHan, Arvid Heise, CuiYanxiang, David Hotham, David Radley, Dawid Wysakowicz, Dian Fu, Etienne Chauchot, Ferenc Csaky, Gabor Somogyi, Gustavo de Morais, Hang Ruan, Hao Li, Hongshun Wang, Jackeyzhe, Jakub Stejskal, Jiaan Geng, Jinkun Liu, Juntao Zhang, Kaiqi Dong, Khaled Hammouda, Kumar Mallikarjuna, Kunni, Laffery, Mario Petruccelli, Martijn Visser, Mate Czagany, Maximilian Michels, Mika Naylor, Mingliang Liu, Myracle, Naci Simsek, Natea Eshetu Beshada, Niharika Sakuru, Pan Yuepeng, Piotr Nowojski, Poorvank,Ramin Gharib, Roc Marshal, Roman Khachatryan, Ron, Rosa Kang, Rui Fan, Sergey Nuyanzin, Shengkai, Stefan Richter, Stepan Stepanishchev, Swapnil Aher, Timo Walther, Xingcan Cui, Xuyang, Yuepeng Pan, Yunfeng Zhou, Zakelly, Zhanghao Chen, dylanhz, gong-flying, hejufang, lincoln lee, lincoln-lil, mateczagany, morvenhuang, noorall, r-sidd, sxnan, voonhous, xia rui, xiangyu0xf, yangli1206, yunfengzhou-hub, zhou

更多内容


活动推荐

复制下方链接或者扫描二维码
即可快速体验 “一体化的实时数仓联合解决方案”
了解活动详情:https://www.aliyun.com/solution/tech-solution/flink-hologres

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
数据采集 监控 数据挖掘
企业级Data Agent: 从数据开始,以数据领先
在数字化转型背景下,数据被视为“新时代的石油”,但多数企业仍面临数据价值难以高效挖掘的困境。文章深入剖析了当前数据分析中存在的“被动响应”模式及其带来的四大挑战,并提出通过Data Agent实现主动智能与数据分析民主化的新路径。Data Agent基于大语言模型和强化学习技术,具备理解、思考与行动能力,能够从“人找数据”转变为“数据找人”,推动数据洞察从专业人员走向全员参与。
|
8天前
|
存储 人工智能 自然语言处理
LlamaIndex 深度实战:用《长安的荔枝》学会构建智能问答系统
本文深入浅出地讲解了RAG(检索增强生成)原理与LlamaIndex实战,通过《长安的荔枝》案例,从AI如何“读书”讲起,详解三大关键参数(chunk_size、top_k、overlap)对问答效果的影响,并结合真实实验展示不同配置下的回答质量差异。内容兼顾新手引导与进阶优化,帮助读者快速构建高效的文档问答系统。
LlamaIndex 深度实战:用《长安的荔枝》学会构建智能问答系统
|
16天前
|
人工智能 前端开发 算法
大厂CIO独家分享:AI如何重塑开发者未来十年
在 AI 时代,若你还在紧盯代码量、执着于全栈工程师的招聘,或者仅凭技术贡献率来评判价值,执着于业务提效的比例而忽略产研价值,你很可能已经被所谓的“常识”困住了脚步。
960 78
大厂CIO独家分享:AI如何重塑开发者未来十年
|
11天前
|
JSON 监控 数据可视化
云监控 UModel Explorer:用“图形化”重新定义可观测数据建模
阿里云 UModel Explorer 正式发布:告别复杂配置,拖拽即建模,点击即洞察,实现建模、探索、分析一体化,让可观测真正高效协同,开启可视化运维新时代!
131 16
|
29天前
|
消息中间件 存储 Kafka
流、表与“二元性”的幻象
本文探讨流与表的“二元性”本质,指出实现该特性需具备主键、变更日志语义和物化能力。强调Kafka与Iceberg因缺乏更新语义和主键支持,无法真正实现二元性,唯有统一系统如Flink、Paimon或Fluss才能无缝融合流与表。
130 7
流、表与“二元性”的幻象
|
2月前
|
存储 分布式计算 运维
云栖实录|驰骋在数据洪流上:Flink+Hologres驱动零跑科技实时计算的应用与实践
零跑科技基于Flink构建一体化实时计算平台,应对智能网联汽车海量数据挑战。从车机信号实时分析到故障诊断,实现分钟级向秒级跃迁,提升性能3-5倍,降低存储成本。通过Flink+Hologres+MaxCompute技术栈,打造高效、稳定、可扩展的实时数仓,支撑100万台量产车背后的数据驱动决策,并迈向流批一体与AI融合的未来架构。
219 2
云栖实录|驰骋在数据洪流上:Flink+Hologres驱动零跑科技实时计算的应用与实践
|
4月前
|
存储 消息中间件 人工智能
Lazada 如何用实时计算 Flink + Hologres 构建实时商品选品平台
本文整理自 Lazada Group EVP 及供应链技术负责人陈立群在 Flink Forward Asia 2025 新加坡实时分析专场的分享。作为东南亚领先的电商平台,Lazada 面临在六国管理数十亿商品 SKU 的挑战。为实现毫秒级数据驱动决策,Lazada 基于阿里云实时计算 Flink 和 Hologres 打造端到端实时商品选品平台,支撑日常运营与大促期间分钟级响应。本文深入解析该平台如何通过流式处理与实时分析技术重构电商数据架构,实现从“事后分析”到“事中调控”的跃迁。
458 55
Lazada 如何用实时计算 Flink + Hologres 构建实时商品选品平台
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
327 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式

热门文章

最新文章