四、操作数据流
进行具体的转换操作:
DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } });
这段逻辑中,对数据流做了四次操作,分别是flatMap、keyBy、timeWindow、reduce,接下来分别介绍每个转换都做了些什么操作。
4.1 flatMap 转换
flatMap的入参是一个FlatMapFunction的具体实现,功能就是将接收到的字符串,按空格切割成不同单词,然后每个单词构建一个WordWithCount实例,然后向下游转发,用于后续的数据统计。然后调用DataStream的flatMap方法,进行数据流的转换,如下:
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) { TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType(), Utils.getCallLocationName(), true); /** 根据传入的flatMapper这个Function,构建StreamFlatMap这个StreamOperator的具体子类实例 */ return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper))); } public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { /** 读取输入转换的输出类型, 如果是MissingTypeInfo, 则及时抛出异常, 终止操作 */ transformation.getOutputType(); OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operator, outTypeInfo, environment.getParallelism()); @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); getExecutionEnvironment().addOperator(resultTransform); return returnStream; }
整个构建过程,与构建数据源的过程相似。
a、先根据传入的flatMapper这个Function构建一个StreamOperator的具体子类StreamFlatMap的实例;
b、根据a中构建的StreamFlatMap的实例,构建出OneInputTransFormation这个StreamTransformation的子类的实例;
c、再构建出DataStream的子类SingleOutputStreamOperator的实例。
除了构建出了 SingleOutputStreamOperator 这个实例为并返回外,还有代码:
getExecutionEnvironment().addOperator(resultTransform); public void addOperator(StreamTransformation<?> transformation) { Preconditions.checkNotNull(transformation, "transformation must not be null."); this.transformations.add(transformation); }
就是将上述构建的OneInputTransFormation的实例,添加到了StreamExecutionEnvironment的属性transformations这个类型为List。
4.2 keyBy 转换
这里的keyBy转换,入参是一个字符串”word”,意思是按照WordWithCount中的word字段进行分区操作。
public KeyedStream<T, Tuple> keyBy(String... fields) { return keyBy(new Keys.ExpressionKeys<>(fields, getType())); }
先根据传入的字段名数组,以及数据流的输出数据类型信息,构建出用来描述key的ExpressionKeys的实例,ExpressionKeys有两个属性:
/** key字段的列表, FlatFieldDescriptor 描述了每个key, 在所在类型中的位置以及key自身的数据类信息 */ private List<FlatFieldDescriptor> keyFields; /** 包含key的数据类型的类型信息, 与构造函数入参中的字段顺序一一对应 */ private TypeInformation<?>[] originalKeyTypes;
在获取key的描述之后,继续调用keyBy的重载方法:
private KeyedStream<T, Tuple> keyBy(Keys<T> keys) { return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))); }
这里首先构建了一个KeySelector的子类ComparableKeySelector的实例,作用就是从具体的输入实例中,提取出key字段对应的值(可能是多个key字段)组成的元组(Tuple)。
对于这里的例子,就是从每个WordWithCount实例中,提取出word字段的值。
然后构建一个KeyedStream的实例,KeyedStream也是DataStream的子类。构建过程如下:
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) { this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType())); } public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) { super( dataStream.getExecutionEnvironment(), new PartitionTransformation<>( dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; this.keyType = validateKeyType(keyType); }
在进行父类构造函数调用之前,先基于keySelector构造了一个KeyGroupStreamPartitioner的实例,再进一步构造了一个PartitionTransformation实例。
这里与flatMap的转换略有不同:
a、flatMap中,根据传入的flatMapper这个Function构建的是StreamOperator这个接口的子类的实例,而keyBy中,则是根据keySelector构建了ChannelSelector接口的子类实例;
b、keyBy中构建的StreamTransformation实例,并没有添加到StreamExecutionEnvironment的属性transformations这个列表中。
ChannelSelector只有一个接口,根据传入的数据流中的具体数据记录,以及下个节点的并行度来决定该条记录需要转发到哪个通道。
public interface ChannelSelector<T extends IOReadableWritable> { int[] selectChannels(T record, int numChannels); } KeyGroupStreamPartitioner中该方法的实现如下: public int[] selectChannels( SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) { K key; try { /** 通过keySelector从传入的record中提取出对应的key */ key = keySelector.getKey(record.getInstance().getValue()); } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e); } /** 根据提取的key,最大并行度,以及输出通道数,决定出record要转发到的通道编号 */ returnArray[0] = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfOutputChannels); return returnArray; }
再进一步看一下KeyGroupRangerAssignment的assignKeyToParallelOperator方法的实现逻辑。
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); } public static int assignToKeyGroup(Object key, int maxParallelism) { return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); } public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { return MathUtils.murmurHash(keyHash) % maxParallelism; } public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) { return keyGroupId * parallelism / maxParallelism; }
a、先通过key的hashCode,算出maxParallelism的余数,也就是可以得到一个[0, maxParallelism)的整数;
b、在通过公式 keyGroupId * parallelism / maxParallelism ,计算出一个[0, parallelism)区间的整数,从而实现分区功能。
4.3 timeWindow 转换
这里timeWindow转换的入参是两个时间,第一个参数表示窗口长度,第二个参数表示窗口滑动的时间间隔。
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return window(SlidingProcessingTimeWindows.of(size, slide)); } else { return window(SlidingEventTimeWindows.of(size, slide)); } }
根据环境配置的数据流处理时间特征构建不同的WindowAssigner的具体实例。
WindowAssigner的功能就是对于给定的数据流中的记录,决定出该记录应该放入哪些窗口中,并提供触发器等供。默认的时间特征是ProcessingTime,所以这里会构建一个SlidingProcessingTimeWindow实例,来看下SlidingProcessingTimeWindow类的assignWindows方法的实现逻辑。
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { /** 根据传入的WindowAssignerContext获取当前处理时间 */ timestamp = context.getCurrentProcessingTime(); List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); /** 获取最近一次的窗口的开始时间 */ long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); /** 循环找出满足条件的所有窗口 */ for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; }
看一下根据给定时间戳获取最近一次的窗口的开始时间的实现逻辑。
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }
通过一个例子来解释上述代码的逻辑。比如:
a、timestamp = 1520406257000 // 2018-03-07 15:04:17
b、offset = 0
c、windowSize = 60000
d、(timestamp - offset + windowSize) % windowSize = 17000
e、说明在时间戳 1520406257000 之前最近的窗口是在 17000 毫秒的地方
f、timestamp - (timestamp - offset + windowSize) % windowSize = 1520406240000 // 2018-03-07 15:04:00
g、这样就可以保证每个时间窗口都是从整点开始, 而offset则是由于时区等原因需要时间调整而设置。
通过上述获取WindowAssigner的子类实例后,调用window方法:
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) { return new WindowedStream<>(this, assigner); }
比keyBy转换的逻辑还简单,就是构建了一个WindowedStream实例,然后返回,就结束了。而WindowedStream是一个新的数据流,不是DataStream的子类。
WindowedStream描述一个数据流中的元素会基于key进行分组,并且对于每个key,对应的元素会被划分到多个时间窗口内。然后窗口会基于触发器,将对应窗口中的数据转发到下游节点。