Flink进行Paimon写入源码分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: 本文主要解析了Flink写入Paimon的核心流程。

1. 前言

Paimon的前身是Flink-Table-Store,希望提供流批一体的存储,提供一定的OLAP查询能力(基于列式存储),做到毫秒级别的实时流式读取。Flink-Table-Store希望能够支持Flink SQL的全部概念,能够结合Flink SQL提供DB级别体验,并且支持大规模的更新。Flink-Table-Store希望能够结合Flink,实现完整的流批一体体验(计算+存储),同时拓展Flink-Table-Store的生态,升级为Paimon,来支持更多大数据引擎的查询/写入。如果我们希望深度使用Paimon,并充分利用Paimon的特性,那么了解Flilnk写入Paimon的过程十分重要,本文希望通过源码分析的方式带大家充分了解Flink写入Paimon的完整过程。

2. 源码阅读demo

阅读源码最有效的方式就是跟读最简单的代码,由于Paimon的定位是打造类似Database的使用体验,与SQL结合很紧密,通常可以使用Flink SQL进行写入,但这里为了方便跟读代码,我们采用调api的方式进行写入

// 构建表
TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().build());
String tableSql = String.format(
    "create table %s (uuid string, name string, age int, ts int, part string, primary key (part,uuid) NOT ENFORCED)  partitioned by (part) with ('connector' = 'paimon', 'bucket' = '2', 'bucket-key' = 'uuid', 'path' = '%s')"
    , tableName
    , filePath
);
tableEnv.executeSql(tableSql);
Catalog catalog = tableEnv.getCatalog(tableEnv.getCurrentCatalog()).get();
String database = catalog.getDefaultDatabase();
ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) catalog.getTable(new ObjectPath(database, tableName));
ObjectIdentifier tablePath = ObjectIdentifier.of(catalogName, database, tableName);
FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(
    tablePath, 
    catalogTable,
    Collections.emptyMap(),
    Configuration.fromMap(catalogTable.getOptions()),
    Thread.currentThread().getContextClassLoader(), 
    false
);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(20000);
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 数据源
DataStream<RowData> dataStream = ...
// 将数据源写入Paimon表
((DataStreamSinkProvider) new FlinkTableFactory()
    .createDynamicTableSink(context)
    .getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)))
    .consumeDataStream(n -> Optional.empty(), dataStream);

env.execute("PaimonWriteDemoWithFlink");

3. Flink写入Paimon的完整流程

Paimon表的写入是通过FlinkTableSink实现DynamicTableSink接口来写入数据,核心逻辑位于getSinkRuntimeProvider方法中,后面我们会重点跟读getSinkRuntimeProvider方法中的内容。

这里也顺带讲一下通过Flink SQL进行写入的场景,对于Flink SQL而言,首先会通过calcite进行解析优化后生成JobGraph,再提交集群运行。我们希望了解Flink写入Paimon的完整过程,可以通过sql转化成JobGraph的过程中得到的Transformation列表去了解中间具体进行了哪些操作。下图是一条简单Flink SQL转化后的Transformation序列,其中红框内的Transformation序列是Paimon数据写入的完整过程,本质上是通过执行getSinkRuntimeProvider方法生成的。

图3-1 Flink SQL中Paimon表写入转化成Transformation序列

接着,我们对Flink写入Paimon表的流程进行了梳理,整理出完整的类图,如下图所示

图3-2 Flink写入Paimon完整流程类图

其中,核心入口FlinkTableSink的getSinkRuntimeProvider方法逻辑如下所示,

public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
   
   
    ...
    LogSinkProvider logSinkProvider = null;
    if (logStoreTableFactory != null) {
   
   
        // 当配置了log.system,则会生成一个log sink的提供器,目前只有kafka的日志存储
        logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context);
    }

    Options conf = Options.fromMap(table.options());
    // 生成log sink函数,当overwrite模式时,不配置log sink
    final LogSinkFunction logSinkFunction =
            overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
    return new PaimonDataStreamSinkProvider(
            (dataStream) ->
                    // FileStoreTable时FileStore的抽象层,提供InternalRow的读取和写入
                    new FlinkSinkBuilder((FileStoreTable) table)
                            .withInput(
                                    new DataStream<>(dataStream.getExecutionEnvironment(),dataStream.getTransformation()))
                            .withLogSinkFunction(logSinkFunction)
                            .withOverwritePartition(overwrite ? staticPartitions : null)
                                            .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
                            .build());
}

继续跟到FlinkSinkBuilder的build方法,会根据Paimon表的bucket模式生成不同类型的Sink

public DataStreamSink<?> build() {
   
   
    BucketMode bucketMode = table.bucketMode();
    switch (bucketMode) {
   
   
        case FIXED:
            // 生成固定bucket个数的Sink,这种方式结构固定,本文着重分析这一路径
            return buildForFixedBucket();
        case DYNAMIC:
            // 生成动态变动Bucket个数的Sink,
            return buildDynamicBucketSink();
        case UNAWARE:
            return buildUnawareBucketSink();
        default:
            throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
    }
}

private DataStreamSink<?> buildForFixedBucket() {
   
   
    // 首先根据分区信息、bucket字段进行bucket分组
    DataStream<RowData> partitioned =
            partition(
                    input,
                    new RowDataChannelComputer(table.schema(), logSinkFunction != null),
                    parallelism);
    // FileStoreSink实现将记录写入Paimon,FileStoreSink提供了生成写入算子的方法
    FileStoreSink sink = new FileStoreSink(table, overwritePartition, logSinkFunction);
    return sink.sinkFrom(partitioned);
}

接着继续跟读到FileStoreSink的sinkFrom的方法

public DataStreamSink<?> sinkFrom(DataStream<T> input) {
   
   
    String initialCommitUser = UUID.randomUUID().toString();
    return sinkFrom(input, initialCommitUser);
}

public DataStreamSink<?> sinkFrom(DataStream<T> input, String initialCommitUser) {
   
   
    // 执行真正的写入操作,在这个阶段不会进行提交,相当于两阶段提交的第一阶段,进行数据写入,不会有snapshot生成
    SingleOutputStreamOperator<Committable> written =
                doWrite(input, initialCommitUser, input.getParallelism());
    // 执行提交操作,会生成snapshot,下游可见,如果日志配置
    return doCommit(written, initialCommitUser);
}

public SingleOutputStreamOperator<Committable> doWrite(
    DataStream<T> input, String commitUser, Integer parallelism) {
   
   
    ...
    // 是否只是writeOnly,如果是,则会忽略compact和snapshot过期,这个配置需要结合专门的compact任务执行,不然会造成小文件剧增,同时降低数据读的性能
    Boolean writeOnly = table.coreOptions().writeOnly();
    SingleOutputStreamOperator<Committable> written =
        input.transform(
           (writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME) + " -> " + table.name(),
            new CommittableTypeInfo(),
            // 生成写入算子,上游的数据会通过这个算子将数据写入到Paimon表
            createWriteOperator(
                createWriteProvider(env.getCheckpointConfig(), isStreaming), commitUser)
        ).setParallelism(parallelism == null ? input.getParallelism() : parallelism);
    ...
    Options options = Options.fromMap(table.options());
    if (options.get(SINK_USE_MANAGED_MEMORY)) {
   
   
        // Flink会创建一个独立的内存分配器用于merge tree的数据写入操作
        // 否则会使用TM的管理内存支持写入操作
        MemorySize memorySize = options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY);
        written.getTransformation()
            .declareManagedMemoryUseCaseAtOperatorScope(
                ManagedMemoryUseCase.OPERATOR, memorySize.getMebiBytes());
    }
    return written;
}

protected DataStreamSink<?> doCommit(DataStream<Committable> written, String commitUser) {
   
   
    ...
    SingleOutputStreamOperator<?> committed =
        written.transform(
                GLOBAL_COMMITTER_NAME + " -> " + table.name(),
                new CommittableTypeInfo(),
                // 执行提交操作的算子,该算子会生成snapshot,下游可见
                new CommitterOperator<>(
                    streamingCheckpointEnabled,
                    commitUser,
                    createCommitterFactory(streamingCheckpointEnabled),
                    createCommittableStateManager()))
            .setParallelism(1)
            .setMaxParallelism(1);
    return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}

至此,我们可以发现,Flink写入Paimon的所有Transformation已经构建完成了,接下来,我们可以跟着写入算子的processElement方法了解数据是如何成为Paimon底层的LSM-tree数据结构的。

4. Flink写入数据

Flink写入Paimon的算子是RowDataStoreWriteOperator,算子是预提交算子,会将数据flush的磁盘,但不会执行commit操作,核心代码如下

public void processElement(StreamRecord<RowData> element) throws Exception {
   
   
    sinkContext.timestamp = element.hasTimestamp() ? element.getTimestamp() : null;

    SinkRecord record;
    try {
   
   
        // 将数据写入Paimon的文件系统
        record = write.write(new FlinkRowWrapper(element.getValue()));
    } catch (Exception e) {
   
   
        throw new IOException(e);
    }

    if (logSinkFunction != null) {
   
   
        // 将数据双写到日志存储层,目前只支持写入到kafka
        SinkRecord logRecord = write.toLogRecord(record);
        logSinkFunction.invoke(logRecord, sinkContext);
    }
}

上述算子中的write生成是通过FlieStoreTable.store().newWrite()的逻辑生成的

FileStoreTable是对Paimon表的文件存储的抽象层,提供底层数据的读写api,Paimon会根据WriteMode和是否包含主键来决定生成哪一种FileStoreTable:

  • 当WriteMode = APPEND_ONLY,则会生成AppendOnlyFileStoreTable,这种表的写入流程很简单,当写入的记录数达到阈值且缓存数据量达到单文件大小时就会进行flush操作;
  • 当WriteMode = CHANGE_LOG 且不包含主键,则会生成ChangelogValueCountFileStoreTable,当从内存池拉不到内存块的时候,会进行flush操作,在flush操作的时候,会记录flush的数据的统计值;
  • 当WriteMode = CHANGE_LOG 且包含主键,则会生成ChangelogWithKeyFileStoreTable,与ChangelogValueCountFileStoreTable的flush时机一致,只是在flush中会根据主键进行merge操作;
  • 当WriteMode = AUTO,则会根据是否有主键生成AppendOnlyFileStoreTable还是ChangelogWithKeyFileStoreTable。

接下来,我们会分析FileStoreTable为ChangelogWithKeyFileStoreTable的情况。根据图3-2的类图会发现,最终会调用到MergeTreeWriter的write()方法。该方法会先将数据缓存在内存缓存中(SortBuffer),该部分缓存数据为排序数据,当缓存满了之后,会将缓存中的数据flush到磁盘。

public void write(KeyValue kv) throws Exception {
   
   
    long sequenceNumber =
            kv.sequenceNumber() == KeyValue.UNKNOWN_SEQUENCE
                    ? newSequenceNumber()
                    : kv.sequenceNumber();
    // 将数据put到内存缓存中
    boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
    if (!success) {
   
   
        // 缓存满了,会flush缓存数据到磁盘
        flushWriteBuffer(false, false);
        success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
        if (!success) {
   
   
            throw new RuntimeException("Mem table is too small to hold a single element.");
        }
    }
}

flush的代码如下

private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception {
   
   
    if (writeBuffer.size() > 0) {
   
   
        if (compactManager.shouldWaitForLatestCompaction()) {
   
   
            // 判断是否需要进行等待上一次compact结束
            waitForLatestCompaction = true;
        }
        // 根据changelogProducer的类型生成changelog文件写入器
        final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter =
            changelogProducer == ChangelogProducer.INPUT ? writerFactory.createRollingChangelogFileWriter(0) : null;
        // 生成flush数据的MergeTree文件写入器,根据文件大小进行rolling
        final RollingFileWriter<KeyValue, DataFileMeta> dataWriter = writerFactory.createRollingMergeTreeFileWriter(0);
        try {
   
   
            // 将缓存中的所有数据flush到磁盘
            writeBuffer.forEach(
                keyComparator,
                mergeFunction,
                changelogWriter == null ? null : changelogWriter::write,
                dataWriter::write);
        } finally {
   
   
            if (changelogWriter != null) {
   
   
                changelogWriter.close();
            }
            dataWriter.close();
        }

        if (changelogWriter != null) {
   
   
            // 将changelog新增文件缓存在算子中,供算子在进行checkpoint的时候将所有的flush下发到下游算子(提交算子),下发是在prepareSnapshotPreBarrier()方法中进行的,所以会在下游算子进行checkpoint之前接收所有的flush信息
            newFilesChangelog.addAll(changelogWriter.result());
        }

        for (DataFileMeta fileMeta : dataWriter.result()) {
   
   
            // 将新增的文件缓存在算子中,供算子在进行checkpoint的时候将所有的提交下发到下游算子(提交算子)
            newFiles.add(fileMeta);
            compactManager.addNewFile(fileMeta);
        }

        writeBuffer.clear();
    }
    // 尝试同步上一次compact结果
    trySyncLatestCompaction(waitForLatestCompaction);
    // 尝试去触发一次新的compact
    compactManager.triggerCompaction(forcedFullCompaction);
}

在writeBuffer进行flush磁盘的时候,会对数据进行merge操作,当FileStoreTable为ChangelogWithKeyFileStoreTable时,会根据merge-engine配置来生成特定的merge引擎,也就是3-2类图中的merge function,具体代码如下,

...
// 获取merge-engine配置
CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
KeyValueFieldsExtractor extractor = ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;

MergeFunctionFactory<KeyValue> mfFactory;
switch (mergeEngine) {
   
   
    case DEDUPLICATE:
        // merge操作时,只会保留最新的一条
        mfFactory = DeduplicateMergeFunction.factory();
        break;
    case PARTIAL_UPDATE:
        // 可以更新部分非空字段,可以设置字段级的sequence来进行局部比较更新
        // 不支持retract类型的操作
        mfFactory = PartialUpdateMergeFunction.factory(conf, rowType);
        break;
    case AGGREGATE:
        // merge是对字段做聚合操作
        mfFactory =
            AggregateMergeFunction.factory(
                conf,
                tableSchema.fieldNames(),
                rowType.getFieldTypes(),
                tableSchema.primaryKeys());
        break;
    case FIRST_ROW:
        // 只保留第一条
        mfFactory =
            FirstRowMergeFunction.factory(
                new RowType(extractor.keyFields(tableSchema)), rowType);
            break;
    default:
        throw new UnsupportedOperationException(
            "Unsupported merge engine: " + mergeEngine);
}

if (options.changelogProducer() == ChangelogProducer.LOOKUP) {
   
   
    // 当change log的生成方式为LOOKUP的时候,merge则为封装
    mfFactory =
        LookupMergeFunction.wrap(
            mfFactory,
            new RowType(extractor.keyFields(tableSchema)),
            rowType);
}

在写入后,会进行尝试同步上一次compact的结果,同时也会尝试去触发一次新的compact,Paimon的Compact原理争取通过下一篇文章来分析。

这里,与Flink写入Hudi的过程一样,Flink写入Paimon是如何保证Exactly-Once语义的呢?

5. Flink提交写入状态

提交算子CommitterOperator首先会接收上游算子下发的flush信息

public void processElement(StreamRecord<CommitT> element) {
   
   
    output.collect(element);
    // 缓存上游下发的flush信息在算子内部
    this.inputs.add(element.getValue());
}

public void notifyCheckpointComplete(long checkpointId) throws Exception {
   
   
    super.notifyCheckpointComplete(checkpointId);
    // 在算子进行checkpoint完成后进行commit操作
    commitUpToCheckpoint(endInput ? Long.MAX_VALUE : checkpointId);
}

private void commitUpToCheckpoint(long checkpointId) throws Exception {
   
   
    NavigableMap<Long, GlobalCommitT> headMap =
            committablesPerCheckpoint.headMap(checkpointId, true);
    // 将算子内部缓存的flush信息整合进行commit操作
    committer.commit(committables(headMap));
    headMap.clear();
}

commiter.commit的逻辑是将所有的flush信息进行一次commit操作,最终flush信息会被序列化成json信息保存在snapshot文件中,当commit成功后,Paimon表会新增一次snapshot。详细的commit过程的代码比较简单,可以直接跟读,这里就不再赘述。

6. 最后

本文通过跟读源码的方式对Flink写入Paimon的核心流程进行了解析,相信通过对Flink写入Paimon流程细节的梳理,对理解Paimon的特性及性能优化都是有极大的助力。由于本人能力有限,文章中出现的错误在所难免,希望大家发现后帮忙指正,万分感谢。

最后总结一下,本文主要解析了Flink写入Paimon的核心流程:

1. 介绍了Flink SQL/api的方式构建写入流程DAG的完整过程;
2. 介绍了写入算子RowDataStoreWriteOperator处理数据的完整流程;
3. 介绍了提交算子CommitterOperator进行snapshot提交的完整流程。

当然,本文由于篇幅有限,没有对Flink和Paimon架构和概念进行详细的介绍,同时对Flink写入Paimon的Compact过程及性能优化也没有涉及,后续会加上这些方面的解析。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
SQL 消息中间件 Kafka
流数据湖平台Apache Paimon(二)集成 Flink 引擎
流数据湖平台Apache Paimon(二)集成 Flink 引擎
933 0
|
3月前
|
存储 数据采集 OLAP
饿了么基于Flink+Paimon+StarRocks的实时湖仓探索
饿了么的实时数仓经历了多个阶段的演进。初期通过实时ETL、报表应用、联动及监控构建基础架构,随后形成了涵盖数据采集、加工和服务的整体数据架构。1.0版本通过日志和Binlog采集数据,但在研发效率和数据一致性方面存在问题。2.0版本通过Dataphin构建流批一体化系统,提升了数据一致性和研发效率,但仍面临新业务适应性等问题。最终,饿了么选择Paimon和StarRocks作为实时湖仓方案,显著降低了存储成本并提高了系统稳定性。未来,将进一步优化带宽瓶颈、小文件问题及权限控制,实现更多场景的应用。
382 7
饿了么基于Flink+Paimon+StarRocks的实时湖仓探索
|
5月前
|
数据采集 运维 Cloud Native
Flink+Paimon在阿里云大数据云原生运维数仓的实践
构建实时云原生运维数仓以提升大数据集群的运维能力,采用 Flink+Paimon 方案,解决资源审计、拓扑及趋势分析需求。
18518 54
Flink+Paimon在阿里云大数据云原生运维数仓的实践
|
6月前
|
分布式计算 Serverless 调度
EMR Serverless Spark:结合实时计算 Flink 基于 Paimon 实现流批一体
本文演示了使用实时计算 Flink 版和 Serverless Spark 产品快速构建 Paimon 数据湖分析的流程,包括数据入湖 OSS、交互式查询,以及离线Compact。Serverless Spark完全兼容Paimon,通过内置的DLF的元数据实现了和其余云产品如实时计算Flink版的元数据互通,形成了完整的流批一体的解决方案。同时支持灵活的作业运行方式和参数配置,能够满足实时分析、生产调度等多项需求。
60810 107
|
7月前
|
SQL 存储 数据库
Flink + Paimon 数据 CDC 入湖最佳实践
Flink + Paimon 数据 CDC 入湖最佳实践
1438 59
|
5月前
|
SQL 关系型数据库 MySQL
如何在Dataphin中构建Flink+Paimon流式湖仓方案
当前大数据处理工业界非常重要的一个大趋势是一体化,尤其是湖仓一体架构。与过去分散的数据仓库和数据湖不同,湖仓一体架构通过将数据存储和处理融为一体,不仅提升了数据访问速度和处理效率,还简化了数据管理流程,降低了资源成本。企业可以更轻松地实现数据治理和分析,从而快速决策。paimon是国内开源的,也是最年轻的成员。 本文主要演示如何在 Dataphin 产品中构建 Flink+Paimon 的流式湖仓方案。
7832 10
如何在Dataphin中构建Flink+Paimon流式湖仓方案
|
5月前
|
分布式计算 Apache 流计算
【邀请函】相约CommunityOverCode Asia 2024,共探Flink、Paimon、Celeborn开源新境界!
相约 CommunityOverCode Asia 2024,共探 Flink、Paimon、Celeborn 开源新境界!让我们在技术的浩瀚星海中,携手航行,共创辉煌!
628 7
【邀请函】相约CommunityOverCode Asia 2024,共探Flink、Paimon、Celeborn开源新境界!
|
4月前
|
存储 运维 Cloud Native
"Flink+Paimon:阿里云大数据云原生运维数仓的创新实践,引领实时数据处理新纪元"
【8月更文挑战第2天】Flink+Paimon在阿里云大数据云原生运维数仓的实践
284 3
|
6月前
|
SQL 存储 NoSQL
贝壳找房基于Flink+Paimon进行全量数据实时分组排序的实践
本文投稿自贝壳家装数仓团队,在结合家装业务场景下所探索出的一种基于 Flink+Paimon 的排序方案。这种方案可以在实时环境对全量数据进行准确的分组排序,同时减少对内存资源的消耗。在这一方案中,引入了“事件时间分段”的概念,以避免 Flink State 中冗余数据对排序结果的干扰,在保证排序结果准确性的同时,减少了对内存的消耗。并且基于数据湖组件 Paimon 的聚合模型和 Audit Log 数据在数据湖内构建了拉链表,为排序结果提供了灵活的历史数据基础。
28797 8
贝壳找房基于Flink+Paimon进行全量数据实时分组排序的实践
|
7月前
|
存储 消息中间件 运维
友盟+|如何通过阿里云Flink+Paimon实现流式湖仓落地方案
本文主要分享友盟+ U-App 整体的技术架构,以及在实时和离线计算上面的优化方案。
640 2
友盟+|如何通过阿里云Flink+Paimon实现流式湖仓落地方案