1. 前言
Paimon的前身是Flink-Table-Store,希望提供流批一体的存储,提供一定的OLAP查询能力(基于列式存储),做到毫秒级别的实时流式读取。Flink-Table-Store希望能够支持Flink SQL的全部概念,能够结合Flink SQL提供DB级别体验,并且支持大规模的更新。Flink-Table-Store希望能够结合Flink,实现完整的流批一体体验(计算+存储),同时拓展Flink-Table-Store的生态,升级为Paimon,来支持更多大数据引擎的查询/写入。如果我们希望深度使用Paimon,并充分利用Paimon的特性,那么了解Flilnk写入Paimon的过程十分重要,本文希望通过源码分析的方式带大家充分了解Flink写入Paimon的完整过程。
2. 源码阅读demo
阅读源码最有效的方式就是跟读最简单的代码,由于Paimon的定位是打造类似Database的使用体验,与SQL结合很紧密,通常可以使用Flink SQL进行写入,但这里为了方便跟读代码,我们采用调api的方式进行写入
// 构建表
TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().build());
String tableSql = String.format(
"create table %s (uuid string, name string, age int, ts int, part string, primary key (part,uuid) NOT ENFORCED) partitioned by (part) with ('connector' = 'paimon', 'bucket' = '2', 'bucket-key' = 'uuid', 'path' = '%s')"
, tableName
, filePath
);
tableEnv.executeSql(tableSql);
Catalog catalog = tableEnv.getCatalog(tableEnv.getCurrentCatalog()).get();
String database = catalog.getDefaultDatabase();
ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) catalog.getTable(new ObjectPath(database, tableName));
ObjectIdentifier tablePath = ObjectIdentifier.of(catalogName, database, tableName);
FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(
tablePath,
catalogTable,
Collections.emptyMap(),
Configuration.fromMap(catalogTable.getOptions()),
Thread.currentThread().getContextClassLoader(),
false
);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(20000);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 数据源
DataStream<RowData> dataStream = ...
// 将数据源写入Paimon表
((DataStreamSinkProvider) new FlinkTableFactory()
.createDynamicTableSink(context)
.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)))
.consumeDataStream(n -> Optional.empty(), dataStream);
env.execute("PaimonWriteDemoWithFlink");
3. Flink写入Paimon的完整流程
Paimon表的写入是通过FlinkTableSink实现DynamicTableSink接口来写入数据,核心逻辑位于getSinkRuntimeProvider方法中,后面我们会重点跟读getSinkRuntimeProvider方法中的内容。
这里也顺带讲一下通过Flink SQL进行写入的场景,对于Flink SQL而言,首先会通过calcite进行解析优化后生成JobGraph,再提交集群运行。我们希望了解Flink写入Paimon的完整过程,可以通过sql转化成JobGraph的过程中得到的Transformation列表去了解中间具体进行了哪些操作。下图是一条简单Flink SQL转化后的Transformation序列,其中红框内的Transformation序列是Paimon数据写入的完整过程,本质上是通过执行getSinkRuntimeProvider方法生成的。
接着,我们对Flink写入Paimon表的流程进行了梳理,整理出完整的类图,如下图所示
其中,核心入口FlinkTableSink的getSinkRuntimeProvider方法逻辑如下所示,
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
...
LogSinkProvider logSinkProvider = null;
if (logStoreTableFactory != null) {
// 当配置了log.system,则会生成一个log sink的提供器,目前只有kafka的日志存储
logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context);
}
Options conf = Options.fromMap(table.options());
// 生成log sink函数,当overwrite模式时,不配置log sink
final LogSinkFunction logSinkFunction =
overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
return new PaimonDataStreamSinkProvider(
(dataStream) ->
// FileStoreTable时FileStore的抽象层,提供InternalRow的读取和写入
new FlinkSinkBuilder((FileStoreTable) table)
.withInput(
new DataStream<>(dataStream.getExecutionEnvironment(),dataStream.getTransformation()))
.withLogSinkFunction(logSinkFunction)
.withOverwritePartition(overwrite ? staticPartitions : null)
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
.build());
}
继续跟到FlinkSinkBuilder的build方法,会根据Paimon表的bucket模式生成不同类型的Sink
public DataStreamSink<?> build() {
BucketMode bucketMode = table.bucketMode();
switch (bucketMode) {
case FIXED:
// 生成固定bucket个数的Sink,这种方式结构固定,本文着重分析这一路径
return buildForFixedBucket();
case DYNAMIC:
// 生成动态变动Bucket个数的Sink,
return buildDynamicBucketSink();
case UNAWARE:
return buildUnawareBucketSink();
default:
throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
}
}
private DataStreamSink<?> buildForFixedBucket() {
// 首先根据分区信息、bucket字段进行bucket分组
DataStream<RowData> partitioned =
partition(
input,
new RowDataChannelComputer(table.schema(), logSinkFunction != null),
parallelism);
// FileStoreSink实现将记录写入Paimon,FileStoreSink提供了生成写入算子的方法
FileStoreSink sink = new FileStoreSink(table, overwritePartition, logSinkFunction);
return sink.sinkFrom(partitioned);
}
接着继续跟读到FileStoreSink的sinkFrom的方法
public DataStreamSink<?> sinkFrom(DataStream<T> input) {
String initialCommitUser = UUID.randomUUID().toString();
return sinkFrom(input, initialCommitUser);
}
public DataStreamSink<?> sinkFrom(DataStream<T> input, String initialCommitUser) {
// 执行真正的写入操作,在这个阶段不会进行提交,相当于两阶段提交的第一阶段,进行数据写入,不会有snapshot生成
SingleOutputStreamOperator<Committable> written =
doWrite(input, initialCommitUser, input.getParallelism());
// 执行提交操作,会生成snapshot,下游可见,如果日志配置
return doCommit(written, initialCommitUser);
}
public SingleOutputStreamOperator<Committable> doWrite(
DataStream<T> input, String commitUser, Integer parallelism) {
...
// 是否只是writeOnly,如果是,则会忽略compact和snapshot过期,这个配置需要结合专门的compact任务执行,不然会造成小文件剧增,同时降低数据读的性能
Boolean writeOnly = table.coreOptions().writeOnly();
SingleOutputStreamOperator<Committable> written =
input.transform(
(writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME) + " -> " + table.name(),
new CommittableTypeInfo(),
// 生成写入算子,上游的数据会通过这个算子将数据写入到Paimon表
createWriteOperator(
createWriteProvider(env.getCheckpointConfig(), isStreaming), commitUser)
).setParallelism(parallelism == null ? input.getParallelism() : parallelism);
...
Options options = Options.fromMap(table.options());
if (options.get(SINK_USE_MANAGED_MEMORY)) {
// Flink会创建一个独立的内存分配器用于merge tree的数据写入操作
// 否则会使用TM的管理内存支持写入操作
MemorySize memorySize = options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY);
written.getTransformation()
.declareManagedMemoryUseCaseAtOperatorScope(
ManagedMemoryUseCase.OPERATOR, memorySize.getMebiBytes());
}
return written;
}
protected DataStreamSink<?> doCommit(DataStream<Committable> written, String commitUser) {
...
SingleOutputStreamOperator<?> committed =
written.transform(
GLOBAL_COMMITTER_NAME + " -> " + table.name(),
new CommittableTypeInfo(),
// 执行提交操作的算子,该算子会生成snapshot,下游可见
new CommitterOperator<>(
streamingCheckpointEnabled,
commitUser,
createCommitterFactory(streamingCheckpointEnabled),
createCommittableStateManager()))
.setParallelism(1)
.setMaxParallelism(1);
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}
至此,我们可以发现,Flink写入Paimon的所有Transformation已经构建完成了,接下来,我们可以跟着写入算子的processElement方法了解数据是如何成为Paimon底层的LSM-tree数据结构的。
4. Flink写入数据
Flink写入Paimon的算子是RowDataStoreWriteOperator,算子是预提交算子,会将数据flush的磁盘,但不会执行commit操作,核心代码如下
public void processElement(StreamRecord<RowData> element) throws Exception {
sinkContext.timestamp = element.hasTimestamp() ? element.getTimestamp() : null;
SinkRecord record;
try {
// 将数据写入Paimon的文件系统
record = write.write(new FlinkRowWrapper(element.getValue()));
} catch (Exception e) {
throw new IOException(e);
}
if (logSinkFunction != null) {
// 将数据双写到日志存储层,目前只支持写入到kafka
SinkRecord logRecord = write.toLogRecord(record);
logSinkFunction.invoke(logRecord, sinkContext);
}
}
上述算子中的write生成是通过FlieStoreTable.store().newWrite()的逻辑生成的
FileStoreTable是对Paimon表的文件存储的抽象层,提供底层数据的读写api,Paimon会根据WriteMode和是否包含主键来决定生成哪一种FileStoreTable:
- 当WriteMode = APPEND_ONLY,则会生成AppendOnlyFileStoreTable,这种表的写入流程很简单,当写入的记录数达到阈值且缓存数据量达到单文件大小时就会进行flush操作;
- 当WriteMode = CHANGE_LOG 且不包含主键,则会生成ChangelogValueCountFileStoreTable,当从内存池拉不到内存块的时候,会进行flush操作,在flush操作的时候,会记录flush的数据的统计值;
- 当WriteMode = CHANGE_LOG 且包含主键,则会生成ChangelogWithKeyFileStoreTable,与ChangelogValueCountFileStoreTable的flush时机一致,只是在flush中会根据主键进行merge操作;
- 当WriteMode = AUTO,则会根据是否有主键生成AppendOnlyFileStoreTable还是ChangelogWithKeyFileStoreTable。
接下来,我们会分析FileStoreTable为ChangelogWithKeyFileStoreTable的情况。根据图3-2的类图会发现,最终会调用到MergeTreeWriter的write()方法。该方法会先将数据缓存在内存缓存中(SortBuffer),该部分缓存数据为排序数据,当缓存满了之后,会将缓存中的数据flush到磁盘。
public void write(KeyValue kv) throws Exception {
long sequenceNumber =
kv.sequenceNumber() == KeyValue.UNKNOWN_SEQUENCE
? newSequenceNumber()
: kv.sequenceNumber();
// 将数据put到内存缓存中
boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
// 缓存满了,会flush缓存数据到磁盘
flushWriteBuffer(false, false);
success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
throw new RuntimeException("Mem table is too small to hold a single element.");
}
}
}
flush的代码如下
private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception {
if (writeBuffer.size() > 0) {
if (compactManager.shouldWaitForLatestCompaction()) {
// 判断是否需要进行等待上一次compact结束
waitForLatestCompaction = true;
}
// 根据changelogProducer的类型生成changelog文件写入器
final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter =
changelogProducer == ChangelogProducer.INPUT ? writerFactory.createRollingChangelogFileWriter(0) : null;
// 生成flush数据的MergeTree文件写入器,根据文件大小进行rolling
final RollingFileWriter<KeyValue, DataFileMeta> dataWriter = writerFactory.createRollingMergeTreeFileWriter(0);
try {
// 将缓存中的所有数据flush到磁盘
writeBuffer.forEach(
keyComparator,
mergeFunction,
changelogWriter == null ? null : changelogWriter::write,
dataWriter::write);
} finally {
if (changelogWriter != null) {
changelogWriter.close();
}
dataWriter.close();
}
if (changelogWriter != null) {
// 将changelog新增文件缓存在算子中,供算子在进行checkpoint的时候将所有的flush下发到下游算子(提交算子),下发是在prepareSnapshotPreBarrier()方法中进行的,所以会在下游算子进行checkpoint之前接收所有的flush信息
newFilesChangelog.addAll(changelogWriter.result());
}
for (DataFileMeta fileMeta : dataWriter.result()) {
// 将新增的文件缓存在算子中,供算子在进行checkpoint的时候将所有的提交下发到下游算子(提交算子)
newFiles.add(fileMeta);
compactManager.addNewFile(fileMeta);
}
writeBuffer.clear();
}
// 尝试同步上一次compact结果
trySyncLatestCompaction(waitForLatestCompaction);
// 尝试去触发一次新的compact
compactManager.triggerCompaction(forcedFullCompaction);
}
在writeBuffer进行flush磁盘的时候,会对数据进行merge操作,当FileStoreTable为ChangelogWithKeyFileStoreTable时,会根据merge-engine配置来生成特定的merge引擎,也就是3-2类图中的merge function,具体代码如下,
...
// 获取merge-engine配置
CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
KeyValueFieldsExtractor extractor = ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
MergeFunctionFactory<KeyValue> mfFactory;
switch (mergeEngine) {
case DEDUPLICATE:
// merge操作时,只会保留最新的一条
mfFactory = DeduplicateMergeFunction.factory();
break;
case PARTIAL_UPDATE:
// 可以更新部分非空字段,可以设置字段级的sequence来进行局部比较更新
// 不支持retract类型的操作
mfFactory = PartialUpdateMergeFunction.factory(conf, rowType);
break;
case AGGREGATE:
// merge是对字段做聚合操作
mfFactory =
AggregateMergeFunction.factory(
conf,
tableSchema.fieldNames(),
rowType.getFieldTypes(),
tableSchema.primaryKeys());
break;
case FIRST_ROW:
// 只保留第一条
mfFactory =
FirstRowMergeFunction.factory(
new RowType(extractor.keyFields(tableSchema)), rowType);
break;
default:
throw new UnsupportedOperationException(
"Unsupported merge engine: " + mergeEngine);
}
if (options.changelogProducer() == ChangelogProducer.LOOKUP) {
// 当change log的生成方式为LOOKUP的时候,merge则为封装
mfFactory =
LookupMergeFunction.wrap(
mfFactory,
new RowType(extractor.keyFields(tableSchema)),
rowType);
}
在写入后,会进行尝试同步上一次compact的结果,同时也会尝试去触发一次新的compact,Paimon的Compact原理争取通过下一篇文章来分析。
这里,与Flink写入Hudi的过程一样,Flink写入Paimon是如何保证Exactly-Once语义的呢?
5. Flink提交写入状态
提交算子CommitterOperator首先会接收上游算子下发的flush信息
public void processElement(StreamRecord<CommitT> element) {
output.collect(element);
// 缓存上游下发的flush信息在算子内部
this.inputs.add(element.getValue());
}
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
// 在算子进行checkpoint完成后进行commit操作
commitUpToCheckpoint(endInput ? Long.MAX_VALUE : checkpointId);
}
private void commitUpToCheckpoint(long checkpointId) throws Exception {
NavigableMap<Long, GlobalCommitT> headMap =
committablesPerCheckpoint.headMap(checkpointId, true);
// 将算子内部缓存的flush信息整合进行commit操作
committer.commit(committables(headMap));
headMap.clear();
}
commiter.commit的逻辑是将所有的flush信息进行一次commit操作,最终flush信息会被序列化成json信息保存在snapshot文件中,当commit成功后,Paimon表会新增一次snapshot。详细的commit过程的代码比较简单,可以直接跟读,这里就不再赘述。
6. 最后
本文通过跟读源码的方式对Flink写入Paimon的核心流程进行了解析,相信通过对Flink写入Paimon流程细节的梳理,对理解Paimon的特性及性能优化都是有极大的助力。由于本人能力有限,文章中出现的错误在所难免,希望大家发现后帮忙指正,万分感谢。
最后总结一下,本文主要解析了Flink写入Paimon的核心流程:
1. 介绍了Flink SQL/api的方式构建写入流程DAG的完整过程;
2. 介绍了写入算子RowDataStoreWriteOperator处理数据的完整流程;
3. 介绍了提交算子CommitterOperator进行snapshot提交的完整流程。
当然,本文由于篇幅有限,没有对Flink和Paimon架构和概念进行详细的介绍,同时对Flink写入Paimon的Compact过程及性能优化也没有涉及,后续会加上这些方面的解析。