Flink 源码:广播流状态源码解析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: Broadcast State 是 Operator State 的一种特殊类型。它的引入是为了支持这样的场景: 一个流的记录需要广播到所有下游任务,在这些用例中,它们用于在所有子任务中维护相同的状态。然后可以在处理第二个流的数据时访问这个广播状态,广播状态有自己的一些特性。必须定义为一个 Map 结构。

Broadcast State 是 Operator State 的一种特殊类型。它的引入是为了支持这样的场景: 一个流的记录需要广播到所有下游任务,在这些用例中,它们用于在所有子任务中维护相同的状态。然后可以在处理第二个流的数据时访问这个广播状态,广播状态有自己的一些特性。


必须定义为一个 Map 结构。


广播状态只能在广播流侧修改,非广播侧不能修改状态。


Broadcast State 运行时的状态只能保存在内存中。


看到这相信你肯定会有下面的疑问:


广播状态为什么必须定义为 Map 结构,我用其他的状态类型不行吗?


广播状态为什么只能在广播侧修改,非广播侧为什么不能修改呢?


广播状态为什么只能保存在内存中,难道不能用 Rockdb 状态后端吗?


下面就带着这三个疑问通过阅读相关源码,回答上面的问题。


broadcast 源码


/**
 * Sets the partitioning of the {@link DataStream} so that the output elements are broadcasted
 * to every parallel instance of the next operation. In addition, it implicitly as many {@link
 * org.apache.flink.api.common.state.BroadcastState broadcast states} as the specified
 * descriptors which can be used to store the element of the stream.
 *
 * @param broadcastStateDescriptors the descriptors of the broadcast states to create.
 * @return A {@link BroadcastStream} which can be used in the {@link #connect(BroadcastStream)}
 *     to create a {@link BroadcastConnectedStream} for further processing of the elements.
 */
@PublicEvolving
public BroadcastStream<T> broadcast(
        final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {
    Preconditions.checkNotNull(broadcastStateDescriptors);
    final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
    return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
}


可以发现 broadcast 方法需要的参数是 MapStateDescriptor 也就是一个 Map 结构的状态描述符,我们在使用的时候就必须定义为 MapStateDescriptor,否则会直接报错,其实主要是因为广播状态的作用是和非广播流进行关联,你可以想象成双流 join 的场景,那么 join 的时候就必须要有一个主键,也就是相同的 key 才能 join 上,所以 Map(key-value) 结构是最适合这种场景的,key 可以存储要关联字段,value 可以是任意类型的广播数据,在关联的时候只需要获取到广播状态,然后 state.get(key) 就可以很容易拿到广播数据。


process 源码


@PublicEvolving
public <KEY, OUT> SingleOutputStreamOperator<OUT> process(
        final KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> function) {
  // 获取输出数据的类型信息
    TypeInformation<OUT> outTypeInfo =
            TypeExtractor.getBinaryOperatorReturnType(
                    function,
                    KeyedBroadcastProcessFunction.class,
                    1,
                    2,
                    3,
                    TypeExtractor.NO_INDEX,
                    getType1(),
                    getType2(),
                    Utils.getCallLocationName(),
                    true);
    return process(function, outTypeInfo);
}


process 方法需要的参数是 KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT>,跟普通的 KeyedProcessFunction<K, I, O> 相比,很容易发现多了一个泛型参数,因为这里的 process 上游连接的是两个数据流,所以需要两个类型。然后调用 process 的重载方法。


process 源码


@PublicEvolving
public <KEY, OUT> SingleOutputStreamOperator<OUT> process(
        final KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> function,
        final TypeInformation<OUT> outTypeInfo) {
    Preconditions.checkNotNull(function);
    Preconditions.checkArgument(
            nonBroadcastStream instanceof KeyedStream,
            "A KeyedBroadcastProcessFunction can only be used on a keyed stream.");
    return transform(function, outTypeInfo);
}


这个 process 方法里面什么都没干,直接调用 transform 方法。


transform 源码


@Internal
private <KEY, OUT> SingleOutputStreamOperator<OUT> transform(
        final KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> userFunction,
        final TypeInformation<OUT> outTypeInfo) {
    // read the output type of the input Transforms to coax out errors about MissingTypeInfo
    nonBroadcastStream.getType();
    broadcastStream.getType();
    KeyedStream<IN1, KEY> keyedInputStream = (KeyedStream<IN1, KEY>) nonBroadcastStream;
  // 构造 KeyedBroadcastStateTransformation
    final KeyedBroadcastStateTransformation<KEY, IN1, IN2, OUT> transformation =
            new KeyedBroadcastStateTransformation<>(
                    "Co-Process-Broadcast-Keyed",
                    nonBroadcastStream.getTransformation(),
                    broadcastStream.getTransformation(),
                    clean(userFunction),
                    broadcastStateDescriptors,
                    keyedInputStream.getKeyType(),
                    keyedInputStream.getKeySelector(),
                    outTypeInfo,
                    environment.getParallelism());
    @SuppressWarnings({"unchecked", "rawtypes"})
    final SingleOutputStreamOperator<OUT> returnStream =
            new SingleOutputStreamOperator(environment, transformation);
  // 添加到 List<Transformation<?>> 集合
    getExecutionEnvironment().addOperator(transformation);
    return returnStream;
}


transform 方法里面主要做了两件事:


先是构造对应的 KeyedBroadcastStateTransformation 对象,其实  KeyedBroadcastStateTransformation 也是 Transformation 的一个子类。


然后把构造好的 transformation 添加到 List<Transformation<?>> 集合里,后面在构建 StreamGraph 的时候会从这个集合里获取 Transformation。


getStreamGraph 源码


@Internal
public StreamGraph getStreamGraph(boolean clearTransformations) {
    final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
    if (clearTransformations) {
        transformations.clear();
    }
    return streamGraph;
}


getStreamGraph 的主要作用就是生成 StreamGraph。下面就会用到上一步生成的 List<Transformation<?>> 集合,因为这篇文章主要是分析 Flink 广播流的源码,所以只会对广播流相关的源码进行解析。


getStreamGraphGenerator 源码


private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
    if (transformations.size() <= 0) {
        throw new IllegalStateException(
                "No operators defined in streaming topology. Cannot execute.");
    }
    // We copy the transformation so that newly added transformations cannot intervene with the
    // stream graph generation.
    return new StreamGraphGenerator(
                    new ArrayList<>(transformations), config, checkpointCfg, configuration)
            .setStateBackend(defaultStateBackend)
            .setChangelogStateBackendEnabled(changelogStateBackendEnabled)
            .setSavepointDir(defaultSavepointDirectory)
            .setChaining(isChainingEnabled)
            .setUserArtifacts(cacheFile)
            .setTimeCharacteristic(timeCharacteristic)
            .setDefaultBufferTimeout(bufferTimeout)
            .setSlotSharingGroupResource(slotSharingGroupResources);
}


getStreamGraphGenerator 方法主要就是构造 StreamGraphGenerator 对象,StreamGraphGenerator 构造完成后,就可以调用 generate 方法来产生 StreamGraph 了,在看 generate 方法之前先来看一下 StreamGraphGenerator 的静态代码块。


StreamGraphGenerator 源码


static {
    @SuppressWarnings("rawtypes")
    Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
            tmp = new HashMap<>();
    tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
    tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
    tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
    tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
    tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
    tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
    tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
    tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
    tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
    tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
    tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
    tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
    tmp.put(
            TimestampsAndWatermarksTransformation.class,
            new TimestampsAndWatermarksTransformationTranslator<>());
    tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
    tmp.put(
            KeyedBroadcastStateTransformation.class,
            new KeyedBroadcastStateTransformationTranslator<>());
    translatorMap = Collections.unmodifiableMap(tmp);
}


在初始化 StreamGraphGenerator 之前,会先执行其静态代码块生成一个 Transformation -> TransformationTranslator 映射关系的 Map 集合,后面会用到这个 Map。


transform 源码


// 根据 Transformation 获取对应的 TransformationTranslator 
final TransformationTranslator<?, Transformation<?>> translator =
        (TransformationTranslator<?, Transformation<?>>)
                translatorMap.get(transform.getClass());
Collection<Integer> transformedIds;
if (translator != null) {
    transformedIds = translate(translator, transform);
} else {
    transformedIds = legacyTransform(transform);
}


构造完 StreamGraphGenerator 对象后,紧接着会调用 generate 方法,然后又调用了 transform 方法,这里会从上面生成的 Map 里面获取到对应的 TransformationTranslator,然后调用 translate 方法。


translate#translateForStreaming#translateForStreamingInternal 源码


@Override
protected Collection<Integer> translateForStreamingInternal(
        final KeyedBroadcastStateTransformation<KEY, IN1, IN2, OUT> transformation,
        final Context context) {
    checkNotNull(transformation);
    checkNotNull(context);
  // 构建 CoBroadcastWithKeyedOperator 
    CoBroadcastWithKeyedOperator<KEY, IN1, IN2, OUT> operator =
            new CoBroadcastWithKeyedOperator<>(
                    transformation.getUserFunction(),
                    transformation.getBroadcastStateDescriptors());
    return translateInternal(
            transformation,
            transformation.getRegularInput(),
            transformation.getBroadcastInput(),
            SimpleOperatorFactory.of(operator),
            transformation.getStateKeyType(),
            transformation.getKeySelector(),
            null /* no key selector on broadcast input */,
            context);
}


translate 方法最终会调用到 KeyedBroadcastStateTransformationTranslator 的 translateForStreamingInternal 方法中,根据 UserFunction(用户代码)和 broadcastStateDescriptors(广播状态描述符)构造CoBroadcastWithKeyedOperator 对象。


CoBroadcastWithKeyedOperator 源码


/**
 * A {@link TwoInputStreamOperator} for executing {@link KeyedBroadcastProcessFunction
 * KeyedBroadcastProcessFunctions}.
 *
 * @param <KS> The key type of the input keyed stream.
 * @param <IN1> The input type of the keyed (non-broadcast) side.
 * @param <IN2> The input type of the broadcast side.
 * @param <OUT> The output type of the operator.
 */
@Internal
public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
        extends AbstractUdfStreamOperator<OUT, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>>
        implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<KS, VoidNamespace> {
    private static final long serialVersionUID = 5926499536290284870L;
    private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
    private transient TimestampedCollector<OUT> collector;
    private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates;
    private transient ReadWriteContextImpl rwContext;
    private transient ReadOnlyContextImpl rContext;
    private transient OnTimerContextImpl onTimerContext;
    public CoBroadcastWithKeyedOperator(
            final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
            final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) {
        super(function);
        this.broadcastStateDescriptors = Preconditions.checkNotNull(broadcastStateDescriptors);
    }
    @Override
    public void open() throws Exception {
        super.open();
        InternalTimerService<VoidNamespace> internalTimerService =
                getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
        TimerService timerService = new SimpleTimerService(internalTimerService);
        collector = new TimestampedCollector<>(output);
        this.broadcastStates = new HashMap<>(broadcastStateDescriptors.size());
        for (MapStateDescriptor<?, ?> descriptor : broadcastStateDescriptors) {
            broadcastStates.put(
                    descriptor, 
              // 初始化状态实现实例
              getOperatorStateBackend().getBroadcastState(descriptor));
        }
        rwContext =
                new ReadWriteContextImpl(
                        getExecutionConfig(),
                        getKeyedStateBackend(),
                        userFunction,
                        broadcastStates,
                        timerService);
        rContext =
                new ReadOnlyContextImpl(
                        getExecutionConfig(), userFunction, broadcastStates, timerService);
        onTimerContext =
                new OnTimerContextImpl(
                        getExecutionConfig(), userFunction, broadcastStates, timerService);
    }
    @Override
    public void processElement1(StreamRecord<IN1> element) throws Exception {
        collector.setTimestamp(element);
        rContext.setElement(element);
        userFunction.processElement(element.getValue(), rContext, collector);
        rContext.setElement(null);
    }
    @Override
    public void processElement2(StreamRecord<IN2> element) throws Exception {
        collector.setTimestamp(element);
        rwContext.setElement(element);
        userFunction.processBroadcastElement(element.getValue(), rwContext, collector);
        rwContext.setElement(null);
    }
    private class ReadWriteContextImpl
            extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.Context {
        private final ExecutionConfig config;
        private final KeyedStateBackend<KS> keyedStateBackend;
        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
        private final TimerService timerService;
        private StreamRecord<IN2> element;
        ReadWriteContextImpl(
                final ExecutionConfig executionConfig,
                final KeyedStateBackend<KS> keyedStateBackend,
                final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
                final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
                final TimerService timerService) {
            function.super();
            this.config = Preconditions.checkNotNull(executionConfig);
            this.keyedStateBackend = Preconditions.checkNotNull(keyedStateBackend);
            this.states = Preconditions.checkNotNull(broadcastStates);
            this.timerService = Preconditions.checkNotNull(timerService);
        }
        void setElement(StreamRecord<IN2> e) {
            this.element = e;
        }
        @Override
        public Long timestamp() {
            checkState(element != null);
            return element.getTimestamp();
        }
        @Override
        public <K, V> BroadcastState<K, V> getBroadcastState(
                MapStateDescriptor<K, V> stateDescriptor) {
            Preconditions.checkNotNull(stateDescriptor);
            stateDescriptor.initializeSerializerUnlessSet(config);
            BroadcastState<K, V> state = (BroadcastState<K, V>) states.get(stateDescriptor);
            if (state == null) {
                throw new IllegalArgumentException(
                        "The requested state does not exist. "
                                + "Check for typos in your state descriptor, or specify the state descriptor "
                                + "in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return state;
        }
        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            checkArgument(outputTag != null, "OutputTag must not be null.");
            output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
        }
        @Override
        public long currentProcessingTime() {
            return timerService.currentProcessingTime();
        }
        @Override
        public long currentWatermark() {
            return timerService.currentWatermark();
        }
        @Override
        public <VS, S extends State> void applyToKeyedState(
                final StateDescriptor<S, VS> stateDescriptor,
                final KeyedStateFunction<KS, S> function)
                throws Exception {
            keyedStateBackend.applyToAllKeys(
                    VoidNamespace.INSTANCE,
                    VoidNamespaceSerializer.INSTANCE,
                    Preconditions.checkNotNull(stateDescriptor),
                    Preconditions.checkNotNull(function));
        }
    }
    private class ReadOnlyContextImpl extends ReadOnlyContext {
        private final ExecutionConfig config;
        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
        private final TimerService timerService;
        private StreamRecord<IN1> element;
        ReadOnlyContextImpl(
                final ExecutionConfig executionConfig,
                final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
                final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
                final TimerService timerService) {
            function.super();
            this.config = Preconditions.checkNotNull(executionConfig);
            this.states = Preconditions.checkNotNull(broadcastStates);
            this.timerService = Preconditions.checkNotNull(timerService);
        }
        void setElement(StreamRecord<IN1> e) {
            this.element = e;
        }
        @Override
        public Long timestamp() {
            checkState(element != null);
            return element.hasTimestamp() ? element.getTimestamp() : null;
        }
        @Override
        public TimerService timerService() {
            return timerService;
        }
        @Override
        public long currentProcessingTime() {
            return timerService.currentProcessingTime();
        }
        @Override
        public long currentWatermark() {
            return timerService.currentWatermark();
        }
        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            checkArgument(outputTag != null, "OutputTag must not be null.");
            output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
        }
        @Override
        public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(
                MapStateDescriptor<K, V> stateDescriptor) {
            Preconditions.checkNotNull(stateDescriptor);
            stateDescriptor.initializeSerializerUnlessSet(config);
            ReadOnlyBroadcastState<K, V> state =
                    (ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
            if (state == null) {
                throw new IllegalArgumentException(
                        "The requested state does not exist. "
                                + "Check for typos in your state descriptor, or specify the state descriptor "
                                + "in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return state;
        }
        @Override
        @SuppressWarnings("unchecked")
        public KS getCurrentKey() {
            return (KS) CoBroadcastWithKeyedOperator.this.getCurrentKey();
        }
    }
    private class OnTimerContextImpl
            extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.OnTimerContext {
        private final ExecutionConfig config;
        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
        private final TimerService timerService;
        private TimeDomain timeDomain;
        private InternalTimer<KS, VoidNamespace> timer;
        OnTimerContextImpl(
                final ExecutionConfig executionConfig,
                final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
                final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
                final TimerService timerService) {
            function.super();
            this.config = Preconditions.checkNotNull(executionConfig);
            this.states = Preconditions.checkNotNull(broadcastStates);
            this.timerService = Preconditions.checkNotNull(timerService);
        }
        @Override
        public Long timestamp() {
            checkState(timer != null);
            return timer.getTimestamp();
        }
        @Override
        public TimeDomain timeDomain() {
            checkState(timeDomain != null);
            return timeDomain;
        }
        @Override
        public KS getCurrentKey() {
            return timer.getKey();
        }
        @Override
        public TimerService timerService() {
            return timerService;
        }
        @Override
        public long currentProcessingTime() {
            return timerService.currentProcessingTime();
        }
        @Override
        public long currentWatermark() {
            return timerService.currentWatermark();
        }
        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            checkArgument(outputTag != null, "OutputTag must not be null.");
            output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp()));
        }
        @Override
        public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(
                MapStateDescriptor<K, V> stateDescriptor) {
            Preconditions.checkNotNull(stateDescriptor);
            stateDescriptor.initializeSerializerUnlessSet(config);
            ReadOnlyBroadcastState<K, V> state =
                    (ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
            if (state == null) {
                throw new IllegalArgumentException(
                        "The requested state does not exist. "
                                + "Check for typos in your state descriptor, or specify the state descriptor "
                                + "in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return state;
        }
    }
}


在分析 CoBroadcastWithKeyedOperator 源码之前,先来看一下 CoBroadcastWithKeyedOperator 的 UML 图。


CoBroadcastWithKeyedOperator UML 图



CoBroadcastWithKeyedOperator


可以看到 CoBroadcastWithKeyedOperator 实现了 TwoInputStreamOperator 这个接口,从命名上就能知道,这是一个具有两个输入流的 StreamOperator 接口,因为 CoBroadcastWithKeyedOperator 的上游连接的是两个数据流,所以就实现了这个接口,下面再来看一下 TwoInputStreamOperator 的源码。


TwoInputStreamOperator 源码


/**
 * Interface for stream operators with two inputs. Use {@link
 * org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if you want to
 * implement a custom operator.
 *
 * @param <IN1> The input type of the operator
 * @param <IN2> The input type of the operator
 * @param <OUT> The output type of the operator
 */
@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
    /**
     * Processes one element that arrived on the first input of this two-input operator. This method
     * is guaranteed to not be called concurrently with other methods of the operator.
     */
    void processElement1(StreamRecord<IN1> element) throws Exception;
    /**
     * Processes one element that arrived on the second input of this two-input operator. This
     * method is guaranteed to not be called concurrently with other methods of the operator.
     */
    void processElement2(StreamRecord<IN2> element) throws Exception;
}


TwoInputStreamOperator 接口里面定义了两个方法,其中 processElement1 是用来处理非广播流的数据,processElement2 是用来处理广播流的数据。


接着回到 CoBroadcastWithKeyedOperator 的 open 方法,首先会初始化 broadcastStates,用来保存 MapStateDescriptor -> BroadcastState 的映射关系,然后初始化 ReadWriteContextImpl 和 ReadOnlyContextImpl 对象,顾名思义 ReadWriteContextImpl 是既可以读也可以写状态,ReadOnlyContextImpl  是只能读状态,不能写状态,在 open 方法里面还有一个重要的事情,就是初始化广播状态的实现类。


getBroadcastState 源码


public <K, V> BroadcastState<K, V> getBroadcastState(
        final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
    Preconditions.checkNotNull(stateDescriptor);
    String name = Preconditions.checkNotNull(stateDescriptor.getName());
    BackendWritableBroadcastState<K, V> previous =
            (BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(name);
    if (previous != null) {
        checkStateNameAndMode(
                previous.getStateMetaInfo().getName(),
                name,
                previous.getStateMetaInfo().getAssignmentMode(),
                OperatorStateHandle.Mode.BROADCAST);
        return previous;
    }
    stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
    TypeSerializer<K> broadcastStateKeySerializer =
            Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
    TypeSerializer<V> broadcastStateValueSerializer =
            Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
    BackendWritableBroadcastState<K, V> broadcastState =
            (BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);
    if (broadcastState == null) {
        broadcastState =
                new HeapBroadcastState<>(
                        new RegisteredBroadcastStateBackendMetaInfo<>(
                                name,
                                OperatorStateHandle.Mode.BROADCAST,
                                broadcastStateKeySerializer,
                                broadcastStateValueSerializer));
        registeredBroadcastStates.put(name, broadcastState);
    } else {
        // has restored state; check compatibility of new state access
        checkStateNameAndMode(
                broadcastState.getStateMetaInfo().getName(),
                name,
                broadcastState.getStateMetaInfo().getAssignmentMode(),
                OperatorStateHandle.Mode.BROADCAST);
        RegisteredBroadcastStateBackendMetaInfo<K, V> restoredBroadcastStateMetaInfo =
                broadcastState.getStateMetaInfo();
        // check whether new serializers are incompatible
        TypeSerializerSchemaCompatibility<K> keyCompatibility =
                restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);
        if (keyCompatibility.isIncompatible()) {
            throw new StateMigrationException(
                    "The new key typeSerializer for broadcast state must not be incompatible.");
        }
        TypeSerializerSchemaCompatibility<V> valueCompatibility =
                restoredBroadcastStateMetaInfo.updateValueSerializer(
                        broadcastStateValueSerializer);
        if (valueCompatibility.isIncompatible()) {
            throw new StateMigrationException(
                    "The new value typeSerializer for broadcast state must not be incompatible.");
        }
        broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo);
    }
    accessedBroadcastStatesByName.put(name, broadcastState);
    return broadcastState;
}


getBroadcastState 方法主要就是初始化 HeapBroadcastState 对象,也就是广播状态的具体实现类,再来看一下 HeapBroadcastState 源码。


HeapBroadcastState 源码


/**
 * A {@link BroadcastState Broadcast State} backed a heap-based {@link Map}.
 *
 * @param <K> The key type of the elements in the {@link BroadcastState Broadcast State}.
 * @param <V> The value type of the elements in the {@link BroadcastState Broadcast State}.
 */
public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K, V> {
    /** Meta information of the state, including state name, assignment mode, and serializer. */
    private RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo;
    /** The internal map the holds the elements of the state. */
    private final Map<K, V> backingMap;
    /** A serializer that allows to perform deep copies of internal map state. */
    private final MapSerializer<K, V> internalMapCopySerializer;
    HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
        this(stateMetaInfo, new HashMap<>());
    }
    private HeapBroadcastState(
            final RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo,
            final Map<K, V> internalMap) {
        this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
        this.backingMap = Preconditions.checkNotNull(internalMap);
        this.internalMapCopySerializer =
                new MapSerializer<>(
                        stateMetaInfo.getKeySerializer(), stateMetaInfo.getValueSerializer());
    }
    private HeapBroadcastState(HeapBroadcastState<K, V> toCopy) {
        this(
                toCopy.stateMetaInfo.deepCopy(),
                toCopy.internalMapCopySerializer.copy(toCopy.backingMap));
    }
    @Override
    public void setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
        this.stateMetaInfo = stateMetaInfo;
    }
    @Override
    public RegisteredBroadcastStateBackendMetaInfo<K, V> getStateMetaInfo() {
        return stateMetaInfo;
    }
    @Override
    public HeapBroadcastState<K, V> deepCopy() {
        return new HeapBroadcastState<>(this);
    }
    @Override
    public void clear() {
        backingMap.clear();
    }
    @Override
    public String toString() {
        return "HeapBroadcastState{"
                + "stateMetaInfo="
                + stateMetaInfo
                + ", backingMap="
                + backingMap
                + ", internalMapCopySerializer="
                + internalMapCopySerializer
                + '}';
    }
    @Override
    public long write(FSDataOutputStream out) throws IOException {
        long partitionOffset = out.getPos();
        DataOutputView dov = new DataOutputViewStreamWrapper(out);
        dov.writeInt(backingMap.size());
        for (Map.Entry<K, V> entry : backingMap.entrySet()) {
            getStateMetaInfo().getKeySerializer().serialize(entry.getKey(), dov);
            getStateMetaInfo().getValueSerializer().serialize(entry.getValue(), dov);
        }
        return partitionOffset;
    }
    @Override
    public V get(K key) {
        return backingMap.get(key);
    }
    @Override
    public void put(K key, V value) {
        backingMap.put(key, value);
    }
    @Override
    public void putAll(Map<K, V> map) {
        backingMap.putAll(map);
    }
    @Override
    public void remove(K key) {
        backingMap.remove(key);
    }
    @Override
    public boolean contains(K key) {
        return backingMap.containsKey(key);
    }
    @Override
    public Iterator<Map.Entry<K, V>> iterator() {
        return backingMap.entrySet().iterator();
    }
    @Override
    public Iterable<Map.Entry<K, V>> entries() {
        return backingMap.entrySet();
    }
    @Override
    public Iterable<Map.Entry<K, V>> immutableEntries() {
        return Collections.unmodifiableSet(backingMap.entrySet());
    }
}


HeapBroadcastState 的代码比较简单,主要是对状态的读写操作,本质就是在操作 HashMap。


接着回到 CoBroadcastWithKeyedOperator 的 processElement1 方法里用的是 ReadOnlyContextImpl,processElement2 方法里用的是 ReadWriteContextImpl,换句话说,只有在广播侧才可以修改状态,在非广播侧不能修改状态,这里对应了上面的第二个问题。


虽然在广播侧和非广侧都可以获取到状态,但是 getBroadcastState 方法的返回值是不一样的。


BroadcastState & ReadOnlyBroadcastState UML 图



HeapBroadcastState


BroadcastState 接口继承了 ReadOnlyBroadcastState 接口又继承了 State 接口,BroadcastState 接口的唯一实现类是 HeapBroadcastState,从名字上就能看出广播状态是存储在 JVM 堆内存上的。底层就是一个 Map,上图中的 backingMap 就是用来保存状态数据的,这里对应了上面的第三个问题。


为了进一步解释上面的第二个问题,下面补充一个具体的场景来说明。


举例说明



BroadcastStream


我们来看上图中的场景,A 流读取 Kafka 的数据然后经过 keyby 返回一个 KeyedStream,B 流读取 mysql 的数据用于广播流返回一个 BroadcastStream,B 流有两条数据分别是 flink,spark,然后会广播到下游的每一个 subtask 上去,此时下游的 subtask-0,subtask-1 就拥有了广播状态中的 flink,spark 两条数据,这个时候往 Kafka 里写入两条数据分别是 flink 和 hive,经过 keyby 操作,flink 被分配到了下游的 subtask-0 上,hive 被分配到了 subtask-1 上,很明显 flink 这条数据可以和广播流数据关联上,hive 这条数据则关联不上,此时,如果在非广播侧也就是 A 流侧修改了状态,比如把 flink, hive 添加到了状态里面,此时 subtask-0 和 subtask-1 上的广播状态数据就会出现不一致的情况,所以,为了保证 operator 的所有并发实例持有的广播状态的一致性,在设计的时候就禁止在非广播侧修改状态。


总结


Broadcast State 是 Operator State 的一种特殊类型。主要是用来解决低吞吐量的流(小数据量)和另一个原始数据流关联的场景,广播状态必须定义为 Map 结构,并且只能在广播流侧修改状态,非广播流侧只能获取状态,不能修改状态。广播状态只能保存在堆内存中,所以在使用广播状态的时候需要给 TM 设置足够的内存,本文主要从源码的角度解释了 Flink 这么设计的原因,让大家对广播流状态有了更加深入的理解。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
22天前
|
监控 网络协议 Java
Tomcat源码解析】整体架构组成及核心组件
Tomcat,原名Catalina,是一款优雅轻盈的Web服务器,自4.x版本起扩展了JSP、EL等功能,超越了单纯的Servlet容器范畴。Servlet是Sun公司为Java编程Web应用制定的规范,Tomcat作为Servlet容器,负责构建Request与Response对象,并执行业务逻辑。
Tomcat源码解析】整体架构组成及核心组件
|
6天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
10天前
|
开发工具
Flutter-AnimatedWidget组件源码解析
Flutter-AnimatedWidget组件源码解析
|
6天前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
|
29天前
|
测试技术 Python
python自动化测试中装饰器@ddt与@data源码深入解析
综上所述,使用 `@ddt`和 `@data`可以大大简化写作测试用例的过程,让我们能专注于测试逻辑的本身,而无需编写重复的测试方法。通过讲解了 `@ddt`和 `@data`源码的关键部分,我们可以更深入地理解其背后的工作原理。
25 1
|
1月前
|
消息中间件 Kubernetes 监控
实时计算 Flink版操作报错合集之在编译源码时遇到报错:无法访问,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
开发者 Python
深入解析Python `httpx`源码,探索现代HTTP客户端的秘密!
深入解析Python `httpx`源码,探索现代HTTP客户端的秘密!
68 1
|
1月前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
2月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
705 7
阿里云实时计算Flink在多行业的应用和实践
|
1月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章

推荐镜像

更多