背景
为什么要做迁移?
我们的项目从2019年第一个版本开始就采用Flink做批处理,而且业务场景一直是纯批处理的场景:从TiDB读取数据并计算之后存入Starrocks和ElasticSearch,没有实时的需求。当时Flink的版本批处理还是基于DataSet API来做,从1.12版本开始Flink逐步放弃DataSet API,推荐转向Table API/SQL或者DataStream API。目前我们在用的是Flink 1.16版本,仍然保留对DataSet API的支持,但是在最新的版本中已经没有了DataSet API,为了以后的版本升级,我们就需要把现在这些基于DataSet API的job逐步迁移到DataStream API。
为什么不迁移到Table API/SQL
官方文档推荐基于DataSet API的批处理job迁移到Table API/SQL,但是我们在做了一些调研后发现Table API/SQL并不适合于我们的场景,主要有以下几个问题:
在DataSet API下我们可以基于InputFormat实现非常灵活的并行数据读取,比如基于date=?
来实现按日期的并发读取,但是在Table API/SQL下没法实现数据的并行读取,我们甚至实现了一套完整的基于JDBC的Catalog都没法达到我们的要求。
对于基于一部分数据读取关联数据的场景(实现类似于SQL中的join操作),我们在DataSet API下实现了丰富的MapFunction/MapPartitionFunction来实现并行批量的读取关联数据,但是在Table API/SQL下实现join要么先把底层的数据全部读进来之后再在Flink内部来实现join,这样会读取大量无用的数据对TiDB造成很大的负载,要么用点查来做的话不支持批量查询,效率太低。
在数据的处理过程中有非常复杂的业务逻辑基于DataSet API可以很方便的实现,但是这些没法转化成SQL的实现。
基于以上的问题迁移到DataStream API是我们唯一的选择。
迁移方案
https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/dataset_migration/ 这篇文章是官方文档中的迁移指南,我们的迁移主要也是参考了这篇文章。但是在迁移的过程中还是会遇到各种各样的困惑,主要是以下的几个方面。
Runtime Mode
DataStream API支持三种Runtime Mode:Streaming、Batch、Auto。Streaming就是流处理,Batch就是批处理,Auto是在有界数据上采用批处理模式,无界数据上采用流处理的模式。关于批处理和流处理模式的具体区别可以看官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/execution_mode/ 。
因为我们是纯批处理的场景,所以Runtime Mode选择的是Batch。但是我其实有一个疑问就是Streaming 模式和Batch模式下的Pipeline Execution Mode有什么区别?因为Pipeline Execution Mode也是一种类似于流式的处理方式。官方同学的回复是:“不一样,只是shuffle实现用了pipeline shuffle,批和流模式下算子的语义不完全相同比如streaming会有回撤流等等。比如 sum 算子,在流模式下每来一条数据都会往下输出一条,但是批模式下每个key才会输出一条”。
数据输入
前面提到了Runtime Mode选择Batch模式,但是选择Batch模式有一个前提是要求所有的输入都是有界的。StreamExecutionEnvironment有一个createInput方法可以接收我们原来在DataSet下使用的各种InputFormat,但是这个方法生成的输入是无界的,使用这个方法提供的输入那还是得采用Streaming模式。所以我们参考StreamExecutionEnvironment的addSource方法自己实现了一个方法把InputFormat转变成一个有界的输入,这样就可以在Batch模式下继续使用原来的InputFormat:
public static <OUT> DataStreamSource<OUT> createStreamSourceFromInputFormat(
StreamExecutionEnvironment environment, InputFormat<OUT, ?> inputFormat, String name) {
if (StringUtils.isBlank(name)) {
name = "Custom Input Format Source";
}
TypeInformation<OUT> inputFormatTypes = TypeExtractor.getInputFormatTypes(inputFormat);
InputFormatSourceFunction<OUT> function = new InputFormatSourceFunction<>(inputFormat, inputFormatTypes);
TypeInformation<OUT> resolvedTypeInfo = inputFormatTypes;
if (resolvedTypeInfo == null && function instanceof ResultTypeQueryable) {
resolvedTypeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
}
if (resolvedTypeInfo == null) {
try {
resolvedTypeInfo =
TypeExtractor.createTypeInfo(SourceFunction.class, function.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
resolvedTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo(name, e);
}
}
boolean isParallel = function instanceof ParallelSourceFunction;
if (environment.getConfig().isClosureCleanerEnabled()) {
ClosureCleaner.clean(function, environment.getConfig().getClosureCleanerLevel(), true);
}
ClosureCleaner.ensureSerializable(function);
final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
return new DataStreamSource<>(
environment, resolvedTypeInfo, sourceOperator, isParallel, name, Boundedness.BOUNDED);
}
我们另外也开发了一个实现Source接口的TableSource类,但是本质上跟上面的方法+InputFormat差不多,而且还可以复用之前的InputFormat,所有就没有使用这个TableSource类。
数据输出
DataSet下的数据输出采用的是OutputFormat,DataStream有一个输出的方法writeUsingOutputFormat可以使用原来的OutputFormat,但是这个方法已经标记为废弃了,所以我们还是切换到了addSink上做输出:对TiDB/MySQL的输出就采用JdbcSink,对StarRocks和ElasticSearch的Sink我们也自己实现了一套,基于原来对应的OutputFormat修改起来很方便。
数据处理
其实原来的DataSet 这一套API非常好用,切换到DataStream之后即使有上面的迁移文档,有很多的数据操作实现起来还是没有比原来的API更复杂或者和原来的使用方式不一样。比如比较核心的一点是大部分的操作都需要开窗:先把数据使用keyBy分组然后使用迁移文档里的EndOfStreamWindows开窗再处理,这个需要适应一下。
另外一个比较大的问题是DataStream下没有了mapPartition方法,在DataSet API下我们经常在rebalance之后使用mapPartition进行处理,但是在DataStream下并没有对应的方法。迁移文档里提供了一种方式是先把数据使用AddSubtaskIDMapFunction做一次map之后再根据subtaskId做分组开窗处理,但是这个会改变原来DataStream下的数据类型,强行嵌套一层Tuple在外面,还是很不方便的。我们现在采用的方式是直接找一个分布相对均匀的字段做keyBy再开窗处理。好消息是官方透露在1.20的版本上会提供DataStream的mapPartition方法,还会有sortPatition等方法,具体可以看这个FLIP:https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream 。
可行性
基于上面的这套方案我们已经迁移了10多个job到DataStream API,目前迁移后的job运行都正常,我们也会在后续的版本迭代中逐步的完成所有job的迁移。
参考
除了上面提到的一些文档,还有一些与批处理相关的FLIP可以参考:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API#FLIP134:BatchexecutionfortheDataStreamAPI-Motivation
https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Motivation
https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data#FLIP327:Supportswitchingfrombatchtostreammodetoimprovethroughputwhenprocessingbacklogdata-Motivation