怎么让flink 部分算子跑批 部分跑流模式? ALL_EXCHANGES_BLOCKING
阿里云实时计算 Flink 支持在 Flink 程序中同时使用批处理算子和流处理算子,即混合流批模式。在混合流批模式下,可以通过 ALL_EXCHANGES_BLOCKING 选项来控制批处理算子和流处理算子之间的交互方式。
ALL_EXCHANGES_BLOCKING 选项表示所有交换操作都会阻塞,即批处理算子和流处理算子之间的数据交换会按照批处理的方式进行,直到批处理算子处理完所有数据后才会进行下一轮数据交换。这种方式可以保证数据的有序性和完整性,但可能会导致一些数据的延迟。
要让 Flink 部分算子跑批部分跑流模式,可以按照以下步骤进行设置:
DataStreamSource<String> source = env.fromElements("1", "2", "3");
BatchOperator<String> batch = BatchOperator.fromDataStream(source);
batch.map(new MyBatchMapFunction())
.returns(Types.STRING)
.print();
DataStreamSource<String> source = env.fromElements("1", "2", "3");
DataStream<String> stream = source.map(new MyStreamMapFunction())
.returns(Types.STRING);
stream.print();
DataStreamSource<String> source = env.fromElements("1", "2", "3");
BatchOperator<String> batch = BatchOperator.fromDataStream(source);
DataStream<String> stream = source.map(new MyStreamMapFunction())
.returns(Types.STRING);
ConnectedStreams<String, String> connected = batch.connect(stream);
connected.flatMap(new MyFlatMapFunction())
.setParallelism(1)
.startNewChain()
.name("MyConnectedFunction")
.setConnectionType(new AllEdgesBlocking())
.print();
在上述代码中,使用 connect() 方法将 BatchOperator 和 DataStream 进行连接,设置 ALL_EXCHANGES_BLOCKING 选项来控制批处理算子和流处理算子之间的交互方式。需要注意的是,使用 ALL_EXCHANGES_BLOCKING 选项可能会导致一定的延迟和性能损失,需要根据具体业务场景进行权衡和测试。
在 Flink 中,可以通过设置算子的执行模式,来控制部分算子跑批(Batch)模式,部分算子跑流(Streaming)模式。其中,跑批模式是将数据一批一批地处理,跑流模式是将数据流式处理。
要实现部分跑批、部分跑流的效果,可以使用 Flink 提供的 ALL_EXCHANGES_BLOCKING
执行模式。该执行模式要求所有的数据交换都使用阻塞方式,从而将数据流转为批处理形式。
具体实现步骤如下:
在 Flink 中,可以使用 DataStream.transform()
方法添加算子,并在该算子上设置 setExecutionMode()
方法来设置执行模式。例如:
DataStream<MyData> dataStream = ...;
DataStream<MyData> batchStream = dataStream.transform(
"batchOperator",
TypeInformation.of(MyData.class),
new MyBatchOperator()).setExecutionMode(ExecutionMode.BATCH);
DataStream<MyData> streamStream = dataStream.transform(
"streamOperator",
TypeInformation.of(MyData.class),
new MyStreamOperator()).setExecutionMode(ExecutionMode.PIPELINED);
在上述代码中,通过设置 ExecutionMode.BATCH
或 ExecutionMode.PIPELINED
,将算子划分为批次模式或流模式。其中,MyBatchOperator
和 MyStreamOperator
分别为自定义的跑批算子和跑流算子。
当部分算子跑批、部分算子跑流时,需要使用 ALL_EXCHANGES_BLOCKING
执行模式来控制数据流的转换。在该执行模式中,数据交换和处理都会使用阻塞方式,从而将数据流转为批处理形式。
例如,可以使用以下方式来设置算子的执行模式:
batchStream.setConnectionType(StreamPartitionerType.ALL_EXCHANGES_BLOCKING);
streamStream.setConnectionType(StreamPartitionerType.ALL_EXCHANGES_BLOCKING);
在上述代码中,通过设置 StreamPartitionerType.ALL_EXCHANGES_BLOCKING
,将数据流转为批处理形式。
在 Flink 中,批处理模式和流处理模式是可以混合使用的,我们可以根据实际情况选择使用不同的算子来实现批处理或流处理。其中,一些算子(如 group by、join 等)既可以作为批处理也可以作为流处理,而另外一些算子则只能用于流处理(如 window、filter 等)。
如果希望部分算子运行在批处理模式,而其他算子运行在流处理模式,可以使用 Flink 的 DataStream API 和 DataSet API 的混合编程模式。例如,可以使用 DataSet API 中的 groupBy 算子,并结合 DataStream API 中的 window 算子来实现一些批处理和流处理混合运算。在这种情况下,Flink 会根据运行时情况自动选择运行模式,以提高计算效率。
此外,在 Flink 1.12.0 版本中,还引入了 "ALL_EXCHANGES_BLOCKING" 机制,可以有效地实现混合批流计算。通过将某些算子标记为 ALL_EXCHANGES_BLOCKING,Flink 将在其前后插入一个特殊的 exchange 算子,从而实现该算子的全局排序和一致性。这种机制可以在一些特殊的应用场景中提高程序性能和稳定性。
可以通过设置 DataStream 的 Exchange 模式来控制算子的执行模式。在 Flink 的数据流中,数据的传输主要分为两种模式:Blocking Exchange 和 Pipelined Exchange( Pipeline 交换)。两种模式的主要区别在于数据在传输时如何缓存和响应处理。
对于 Blocking Exchange,会在数据交换的边界(即相邻算子之间)强制使用 Blocking 内存缓冲区。这意味着相邻算子中的一个算子将产生流量并强制等待缓冲区中的空间到达时才能继续执行,从而引入阻塞并减慢整个数据流的进程。Blocking Exchange 适用于需要对流数据进行排序和聚合等场景。
对于 Pipeline Exchange,流数据像管道一样流动,算子之间不会有止境的缓冲区。在 Pipeline Exchange 模式下,所有算子都可以在以流水线方式处理数据的同一时间内运行,并最终导致更少的局部缓存,更好的处理性能和更低的延迟。Pipeline Exchange 模式适用于需要低延迟但不需要排序和聚合的场景。
为了让某些算子运行在 blocking 形式下,而另一些算子运行在 pipeline 形式下,您可以将 blocking 算子和 pipeline 算子分别放置在不同的 operator chain 中,并使用 setAllStrategy() 方法分别设置它们的 Exchange 模式。在设置 Exchange 模式时,可以使用 ALL_EXCHANGES_BLOCKING 常量表示所有 Exchange 模式均为 Blocking Exchange,即使此常量被应用于 pipeline 元素。
例如,假设有两个算子 A 和 B,其中 A 运行于 Blocking Exchange 模式,B 运行于 Pipeline Exchange 模式:
DataStream<X> input = ...;
DataStream<Y> block = input
.map(new A())
.setConnectionType(REDISTRIBUTE)
.slotSharingGroup("blocking")
.name("blockingMap")
.setAllStrategy(new ALL_EXCHANGES_BLOCKING<>())
.map(new B())
.setConnectionType(REDISTRIBUTE)
.slotSharingGroup("pipeline")
.name("pipelineMap");
在这个例子中,两个算子 A 和 B 都是具有一定 compute 能力的算子,但它们在数据的 Exchange 方式上存在差异。A 算子所有的 Exchange 模式都设置为 Blocking Exchange,B 算子所有的 Exchange 模式都设置为 Pipeline Exchange。
通过这样的设置,可以让 A 算子运行在 blocking 形式下,B 算子运行在 pipeline 形式下,从而使数据的传输更加高效。
需要注意的是,Blocking Exchange 模式可能会在处理大量数据时对应用程序的吞吐量产生负面影响,因此需要根据应用程序的实际需求来选择合适的 Exchange 模式。
Flink 的批处理和流处理都是基于相同的分布式流式数据处理引擎实现的,所以可以实现部分算子跑批,部分算子跑流模式。
可以通过以下几种方式实现:
使用Flink的DataSet API实现批处理,使用DataStream API实现流处理,然后通过connect或union方法将两个数据流进行连接或合并操作。
在算子中使用判断逻辑,根据输入数据的特征或时间戳等信息,选择对该批数据进行批处理或流处理。
将算子划分为两部分,一部分实现批处理,一部分实现流处理,然后通过Flink的迭代计算或迭代流计算实现算子的复合操作。
需要注意的是,由于批处理和流处理的数据特征和处理方式不同,部分算子跑批部分跑流模式可能会存在性能和数据准确性方面的问题,所以在实现时需要进行详细的测试和优化。
要让Flink部分算子跑批部分跑流模式,可以使用ALL_EXCHANGES_BLOCKING模式,该模式下所有的数据交换都会被阻塞,直到所有的数据都到达目标算子才会继续执行。这种模式可以让部分算子以批处理的方式运行,而其他算子则以流处理的方式运行。
要使用ALL_EXCHANGES_BLOCKING模式,可以在Flink程序中设置ExecutionConfig的setExecutionMode方法为ExecutionMode.BATCH_FORCED,然后在需要批处理的算子上调用forceNonParallel()方法。这样,这些算子就会以批处理的方式运行,而其他算子则会以流处理的方式运行。
以下是一个示例代码:
ExecutionConfig config = env.getConfig(); config.setExecutionMode(ExecutionMode.BATCH_FORCED); DataStream stream = env.fromElements(1, 2, 3, 4, 5); DataStream batchedStream = stream .map(new MyMapper()) .forceNonParallel(); DataStream streamedStream = stream .filter(new MyFilter()); DataStream finalStream = batchedStream.union(streamedStream); finalStream.print(); 在上述示例中,MyMapper算子会以批处理的方式运行,而MyFilter算子会以流处理的方式运行。forceNonParallel()方法会强制将该算子设置为非并行运行,从而使其以批处理的方式运行。最后,使用union()方法将两个算子的输出合并为最终的输出流。
Flink 既可以在流模式下运行,也可以在批模式下运行。根据不同的场景来选择适合的模式运行 Flink 可以提高程序的性能和效率。
在 Flink 中,要让部分算子跑批、部分算子跑流模式,需要使用到 Exchange 的 Blocking 策略——ALL_EXCHANGES_BLOCKING。ALL_EXCHANGES_BLOCKING 是一种特殊的 Exchange 策略,它可以将整个 Flink 程序分为多个任务 (Task),并通过 Barrier 消息进行协调。Barrier 消息被看作是一个事件点,在产生这些消息的算子之前执行的所有操作都会被称为 Batch。而后续操作会被视为 Streaming,因此 ALL_EXCHANGES_BLOCKING 将 Batch 操作与 Streaming 操作统一起来,实现了部分算子跑批、部分算子跑流模式的功能。
具体来说,当设置了 ALL_EXCHANGES_BLOCKING 后,发送数据时由于拦截器的存在,数据无法直接发出去。只有等到内部的 Trigger 触发或者收到 Acknowledgment 或 Timeout 来响应才会把数据真正发送出去。因此,采用 ALL_EXCHANGES_BLOCKING 集群中的每一个 Task 都必须等待其它 Task 经过 Sync 计数相等的时间之后,全部完成任务再进行下一个阶段。
举个例子:假设 Flink 程序中有 TwoStream(数据流)、BatchA、BatchB 三个算子,其中 BatchA 和 BatchB 是批处理任务,TwoStream 是流式任务。如果使用 ALL_EXCHANGES_BLOCKING 将程序分为两个 Task,则 BatchA 和 BatchB 所在的 Task 触发 Batch 操作,TwoStream 所在的 Task 触发 Streaming 操作。
需要注意的是,在使用 ALL_EXCHANGES_BLOCKING 时,要将等待时间 Sync Duration 设置得足够长,以确保所有 Task 的计算都能完成。同时还需要确保 BatchA、BatchB 两个算子产生的结果不会对 TwoStream 进行负反馈,否则可能会导致该阶段任务无法正常结束而一直阻塞。
总之,通过合理地设置 Exchange 的 Blocking 策略和等待时间 Sync Duration,我们可以让部分算子跑批、部分算子跑流模式,并达到更好的性能和效率。
在Flink作业中混合批处理和流处理模式,可以通过以下几个方式实现: 1. 在DataStream环境中使用ProcessFunction进行小批量(micro-batch)处理。可以在ProcessFunction中以小批处理方式消费流数据,实现近似批处理的效果。 2. 在DataStream环境中使用KeyedProcessFunction,并设置超时定时器。当定时器触发时,可以消费当前key下全部积累的数据,实现小批处理。 3. 在流作业的某些部分使用匿名批处理作业(batch.execute())。这会创建一个单独的批处理环境,在其中执行批处理逻辑。 4. 将流作业输出至批处理作业。可以在流作业的某个节点创建批处理环境,并使用addSink将数据输出至该批处理作业。批处理作业会按间隔消费这些数据进行处理。 5. 在StreamGraph中设置部分Operator为批处理模式。通过调用setUid(UIDs.BATCH_PROCESSING_STRATEGY)和设置类型为BatchStrategies.ALL_WORKERS_BLOCKING或BatchStrategies.ALL_EXCHANGES_BLOCKING,可以将Operator转换为批处理模式。 那么,具体实现方式的选择就需要根据场景的实际需要进行权衡: 1. ProcessFunction方式简单易用,但是无法利用Flink的批处理优势如批量排序等。同时无法设置checkpoint,处理长窗口数据时可能会遇到数据丢失的问题。 2. KeyedProcessFunction可以对指定key的数据进行小批处理,但需要维护定时器和keyed state,实现较为复杂。 3. batch.execute()方式可以充分利用Flink批处理的功能,实现 checkpoint续跑等,但是需要独立管理批处理环境和作业,较为繁琐。 4. 将流输出至批,可以很方便地实现流批融合,但是需要两套环境一起调度和部署,稍显麻烦。 5. 设置Operator为批处理模式,可以在一个作业中很容易地切换流批模式,但是目前仅支持source和exchange两种Operator类型,适用场景较为局限。 所以,可以看出每种实现方式都有其优点和局限性。在实际应用中,需要结合作业的复杂度和场景需要进行方式的选择和设计,才能最好地实现混合流批处理的效果。
在 Flink 中,可以通过设置算子的执行模式来控制算子的执行方式。Flink 提供了两种执行模式:批处理模式(Batch Execution Mode)和流处理模式(Streaming Execution Mode)。默认情况下,Flink 的算子会根据上下游算子的执行模式自动选择执行模式。如果上下游算子都是批处理算子,那么该算子会以批处理模式执行;如果上下游算子都是流处理算子,那么该算子会以流处理模式执行。如果上下游算子的执行模式不一致,那么该算子会以 ALL_EXCHANGES_BLOCKING 模式执行,即所有数据交换都会被阻塞,直到批处理算子的所有数据都被处理完毕后才会流动到下游流处理算子。
因此,如果您需要让部分算子以批处理模式执行,部分算子以流处理模式执行,可以考虑按照以下方式设置算子的执行模式:
对于需要以批处理模式执行的算子,可以将算子的执行模式设置为 BatchExecutionMode.EXPLICIT
,并将其放置在一个 BatchSlotSharingGroup
中。这样,该算子就会以批处理模式执行,不会被流处理算子阻塞。
对于需要以流处理模式执行的算子,可以将算子的执行模式设置为 StreamingExecutionMode.PIPELINED
,并将其放置在一个 StreamingSlotSharingGroup
中。这样,该算子就会以流处理模式执行,不会阻塞批处理算子。
另外,如果上下游算子的数据交换方式是 keyBy,那么该数据交换方式不受算子执行模式的影响。即使上下游算子的执行模式不一致,也会按照 keyBy 的方式进行数据交换。
要让 Apache Flink 中的部分算子运行批处理模式,部分算子运行流处理模式,可以通过以下两种方法来实现:
根据数据量大小切换算子执行模式
可以根据输入数据量的大小来判断是使用批处理还是流处理模式。当数据规模较小时,选择批处理模式;当数据规模较大,选择流处理模式。具体实现是使用 ExecutionConfig 中的 setAutoWatermarkInterval(0) 方法来将 Flink 的流处理模式关闭,这样就可以将算子设置为批处理模式。
举个例子,在以下代码中,我们定义了一个基于时间窗口的聚合算子,当窗口间隔小于等于 5 分钟时,使用批处理模式,否则使用流处理模式:
java
DataStream<Tuple2<String, Integer>> dataStream = // 输入数据流
dataStream.keyBy(data -> data.f0) .timeWindow(Time.minutes(5)) .process(new AggregationFunction()) .setExecutionConfig(new ExecutionConfig()..setAutoWatermarkInterval(0)); // 关闭自动水印
public class AggregationFunction extends ProcessWindowFunction<...> { @Override public void process(...) { // 批处理逻辑 } }
使用两个算子分别处理批处理和流处理数据
使用两个算子分别处理批处理和流处理数据。在实际应用中,可以根据数据来源、业务需求等条件选择不同的算子来处理。当数据规模较小时,选择批处理算子;当数据规模较大时,选择流处理算子。
举个例子,在以下代码中,我们定义了两个算子:BatchFunction 和 StreamFunction,分别对批处理和流处理数据进行处理:
java
DataStream<Tuple2<String, Integer>> dataStream = // 输入数据流
dataStream.keyBy(data -> data.f0) .flatMap(new BatchFunction()) .union(dataStream) .process(new StreamFunction());
public class BatchFunction extends FlatMapFunction<Tuple2<String, Integer>, ...> { @Override public void flatMap(...) { // 批处理逻辑 } }
public class StreamFunction extends ProcessFunction<Tuple2<String, Integer>, ...> { @Override public void processElement(...) { // 流处理逻辑 } }
在 Flink 中,批处理和流处理的本质区别是数据的处理方式,因此要想部分算子跑批,部分算子跑流模式,需要考虑如何根据数据特点,采用不同的处理方式。
具体来说,如果您想要让某些算子在批处理模式下运行,可以考虑采用以下方法:
使用 BatchExecutionEnvironment:Flink 提供了 BatchExecutionEnvironment 和 StreamExecutionEnvironment 两种不同的执行环境,分别用于批处理和流处理。如果您想要让某些算子在批处理模式下运行,可以使用 BatchExecutionEnvironment 创建相应的数据源和算子。
使用 DataSet:在 Flink 中,DataSet 表示批处理数据,DataStream 表示流处理数据。如果您想要让某些算子在批处理模式下运行,可以使用 DataSet 对数据进行处理。
使用 ExecutionConfig:Flink 的 ExecutionConfig 提供了一些配置选项,可以控制 Flink 任务的执行方式。例如,可以通过设置 ExecutionConfig.setAutoWatermarkInterval() 方法来控制水印的生成间隔,从而调整流处理任务的执行方式。
如果您想要让某些算子在流处理模式下运行,可以考虑采用以下方法:
使用 DataStream:在 Flink 中,DataStream 表示流处理数据,DataSet 表示批处理数据。如果您想要让某些算子在流处理模式下运行,可以使用 DataStream 对数据进行处理。
使用 ExecutionConfig:同样地,通过调整 ExecutionConfig 的相关配置选项,可以控制 Flink 任务的执行方式。例如,可以通过设置 ExecutionConfig.setAutoWatermarkInterval() 方法来控制水印的生成间隔,从而调整流处理任务的执行方式。
需要注意的是,批处理和流处理的本质区别在于数据的处理方式,因此无法对同一数据同时采用不同的处理方式。如果您需要在批处理和流处理之间切换,需要通过改变数据的处理方式或者调整 Flink 任务的执行方式来实现。同时,建议在任务开发和部署时,根据数据特点和处理需求,选择合适的处理方式和执行环境,以最大化 Flink 任务的性能和效率。
在 Flink 中,可以通过将算子分为不同的 Task 来实现批处理和流处理混合的需求。具体做法是将需要批处理的算子放在 Batch Task 中,将需要流处理的算子放在 Stream Task 中,然后再将 Batch Task 和 Stream Task 组合起来构建一个完整的 Flink 任务。
Batch Task 和 Stream Task 的主要区别在于它们的输入数据集合不同:
因此,在设计 Flink 任务时,需要针对不同的算子选择合适的 Task 类型。一般来说,Aggregate、Reduce、Window 等算子比较适合使用 Batch Task 进行计算,而Map、Filter、Join 等算子则比较适合使用 Stream Task 进行计算。
下面是一个简单的示例,演示如何将 Batch Task 和 Stream Task 结合起来构建一个 Flink 任务。假设我们有一个数据源是 Kafka,其中包含多个 topic,每个 topic 都包含批量数据和流式数据。我们需要对这些数据进行处理,最终将结果输出到 MySQL 数据库中。
首先,我们可以创建一个 Batch Task,用于处理批量数据:
DataStream<BatchData> batchStream = env.addSource(new FlinkKafkaConsumer<>(...))
.flatMap(new BatchDataMapper());
DataStream<BatchResult> batchResults = batchStream
.keyBy(BatchData::getKey)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.aggregate(new BatchAggregator());
batchResults.addSink(new JdbcSink<>(...));
上述代码中,我们从 Kafka 中读取批量数据,并使用BatchDataMapper
将数据转换为BatchData
对象。然后,对BatchData
对象进行分组和窗口聚合操作,并最终将结果输出到 MySQL 数据库中。
接下来,我们再创建一个 Stream Task,用于处理流式数据:
DataStream<StreamData> streamStream = env.addSource(new FlinkKafkaConsumer<>(...))
.flatMap(new StreamDataMapper());
DataStream<StreamResult> streamResults = streamStream
.keyBy(StreamData::getKey)
.process(new StreamProcessor());
streamResults.addSink(new JdbcSink<>(...));
在这里,我们从 Kafka 中读取流式数据,并使用StreamDataMapper
将数据转换为StreamData
对象。然后,对StreamData
对象进行分组和流式计算操作,并最终将结果输出到 MySQL 数据库中。
最后,我们可以将 Batch Task 和 Stream Task 组合起来构建一个完整的 Flink 任务:
env.setParallelism(2);
JobExecutionResult result = env.execute("MyFlinkJob");
需要注意的是,上述代码中设置了并行度为 2,表示同时运行两个 Task,其中一个是 Batch Task,另一个是 Stream Task。如果需要更改 Task 的并行度,可以通过调整setParallelism
方法的参数来修改。
在 Flink 中,可以使用 GlobalDataExchangeMode 来控制算子之间数据交换的模式。其中,ALL_EXCHANGES_BLOCKING 模式表示所有交换都是阻塞同步的方式进行。
如果您想让部分算子以批量模式运行,部分算子以流模式运行,可以通过以下步骤来实现:
将数据流划分为批处理和流处理 首先,需要对数据流进行划分,将需要批处理的数据和需要流处理的数据分别发送到不同的算子中。这可以通过 SplitStream 操作来实现。
例如,假设您有一个包含 sensor 数据的 DataStream(其中 Sensor 是自定义的数据类型),您希望对其中的某些传感器数据进行批量处理,而其他传感器数据则采用流处理方式。在这种情况下,可以使用 filter() 操作将要进行批处理的数据划分出来,并使用 split() 操作将它们分配给一个新的数据流。例如:
DataStream source = ...;
// 划分出需要批处理的数据 DataStream batchInput = source.filter(new BatchFilter());
// 将需要批处理的数据分配给一个新的数据流 SplitStream splitStream = source.split(new OutputSelector() { @Override public Iterable select(Sensor value) { if (value.isBatch()) { return Collections.singleton("batch"); } else { return Collections.singleton("stream"); } } });
// 获取需要流处理的数据流 DataStream streamInput = splitStream.select("stream"); 其中,BatchFilter 是一个自定义的函数,用于过滤需要批量处理的传感器数据。在这里,您可以根据数据的特征、时间戳等信息来判断哪些数据需要进行批量处理。
对不同的数据流应用不同的 GlobalDataExchangeMode 接下来,需要为批处理和流处理分别设置不同的 GlobalDataExchangeMode。例如,如果您想让批处理使用 ALL_EXCHANGES_BLOCKING 模式,而流处理使用 PIPELINED 模式,可以像这样做:
DataStream batchOutput = batchInput .setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EXCHANGES_BLOCKING) .map(new BatchProcessingFunction());
DataStream streamOutput = streamInput .setGlobalDataExchangeMode(GlobalDataExchangeMode.PIPELINED) .map(new StreamProcessingFunction()); 其中,BatchProcessingFunction 和 StreamProcessingFunction 分别是对批处理和流处理数据进行处理的自定义函数。在这里,您可以编写相应的数据转换逻辑并将它们应用于不同的数据流中。
请注意,在设置 GlobalDataExchangeMode 时,只有当使用 DataStream API 中的 setGlobalDataExchangeMode() 方法时才会生效。如果您将算子作为 Flink Job 中的 Task 进行部署,则需要在启动任务时使用 -Dexecution.runtime-mode=STREAMING/BATCH 参数来指定运行模式。
Flink 的全局配置 ALL_EXCHANGES_BLOCKING
是对应着 Task 的 Input Gate 和 Output Gate 的 Blocking Queues。
如果该值为 true
,则所有 Exchange Execution Mode 都被设置为 PIPELINED
,意味着输入和输出缓冲区都是无界的;如果该值为 false
,则所有 Exchange Execution Mode 都被设置为 ALL_BATCH
,意味着输入和输出缓冲区都是有界的,每次按照固定的大小进行批量处理。
如果你想让任务的部分算子跑批,部分跑流模式,可以通过任务图自定义 Connection 可以达到此目的。你可以手动设定输入和输出的 Exchange Execution Mode,使其兼具批和流的特点。这一般通过在 operator 之间设置具有不同 Exchange Execution Mode 的 Connnection 来完成。具体而言,可以使用 InputTransformation
和 OutputTransformation
对输入和输出的 Execution Mode 进行定制化配置。
例如,你可以通过代码编写的方式,并手动设置 InputTransformation 和 OutputTransformation,来自定义一个 Exchange Execution Mode 组合的任务,示例如下:
DataStream<Tuple2<String, Integer>> source = ...;
DataStream<String> streamOp = source
.keyBy(in -> in.f0)
.transform("BatchPart", Types.STRING, new BatchModeTransformer())
.setParallelism(1)
.name("BatchPart");
DataStreamSink<String> streamSink = streamOp
.addSink(new CustomFileSink())
.setParallelism(1)
.name("CustomSink");
// 流模式
DataStream<String> streamOp2 = source
.flatMap(new StatefulMapper())
.name("StreamPart");
DataStreamSink<String> streamSink2 = streamOp2
.addSink(new CustomFileSink())
.name("CustomStreamSink");
// 对两个算子之间的连接进行手动定制
// 部分使用 PIPELINED,部分使用 BATCH,从而兼具流和批的特点
streamOp.connect(streamOp2)
.flatMap(new CustomCoFlatMap())
.setConnectionType(new InputTransformation().withRegularInput())
.setConnectionType(new OutputTransformation().withBatchOutput())
.name("CustomConnect");
env.execute("BatchStreamJob");
在这个例子中,我们将源数据按照 key 进行了分区,并将该部分算子设置为批处理,即 BatchModeTransformer
是一个批处理算子;而后又将其与另一个算子使用了 connect
方法进行连接,并手动设置该连接的 Exchange Execution Mode,使其连接处兼具流处理和批处理的特点。
需要说明的是,你可以根据需要自由地拓展和组合这些算子,从而实现各种 Exchange Execution Mode 的组合方式。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。