Flink WindowAssigner 源码解析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: Flink 窗口的时候有没有想过数据是怎么被划分到窗口里面的? 它是根据什么规则划分的? 相信看完这篇文章你就明白了.@PublicEvolvingpublic <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) { return new WindowedStream<>(this, assigner);}

当你在使用 Flink 窗口的时候有没有想过数据是怎么被划分到窗口里面的? 它是根据什么规则划分的? 相信看完这篇文章你就明白了.


@PublicEvolving
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
   return new WindowedStream<>(this, assigner);
}


当有数据流入到 Window Operator 时需要按照一定规则将数据分配给窗口,WindowAssigner 为数据分配窗口。在新版本里已经把 timeWindow 标记为弃用状态,统一改成了 window 方法,该方法接收的输入是一个 WindowAssigner, WindowAssigner  负责将每条输入的数据分发到正确的 window 中(一条数据可能同时分发到多个 Window 中),Flink 提供了几种通用的 WindowAssigner:tumbling window(窗口间的元素无重复),sliding window(窗口间的元素可能重复),session window 以及 global window。比如 TumblingEventTimeWindows 就是一个基于 eventtime 时间语义的滚动窗口.如果需要自己定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。


我们先来看一下 WindowAssigner 类的源码如下:


/**
 * A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.
 *
 * <p>In a window operation, elements are grouped by their key (if available) and by the windows to
 * which it was assigned. The set of elements with the same key and window is called a pane.
 * When a {@link Trigger} decides that a certain pane should fire the
 * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied
 * to produce output elements for that pane.
 *
 * @param <T> The type of elements that this WindowAssigner can assign windows to.
 * @param <W> The type of {@code Window} that this assigner assigns.
 */
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
   private static final long serialVersionUID = 1L;
   /**
    * Returns a {@code Collection} of windows that should be assigned to the element.
    *
    * @param element The element to which windows should be assigned.
    * @param timestamp The timestamp of the element.
    * @param context The {@link WindowAssignerContext} in which the assigner operates.
    */
   public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
   /**
    * Returns the default trigger associated with this {@code WindowAssigner}.
    */
   public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
   /**
    * Returns a {@link TypeSerializer} for serializing windows that are assigned by
    * this {@code WindowAssigner}.
    */
   public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
   /**
    * Returns {@code true} if elements are assigned to windows based on event time,
    * {@code false} otherwise.
    */
   public abstract boolean isEventTime();
   /**
    * A context provided to the {@link WindowAssigner} that allows it to query the
    * current processing time.
    *
    * <p>This is provided to the assigner by its containing
    * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
    * which, in turn, gets it from the containing
    * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
    */
   public abstract static class WindowAssignerContext {
      /**
       * Returns the current processing time.
       */
      public abstract long getCurrentProcessingTime();
   }
}


这是一个抽象类主要有 4 个方法,简单说一下每个方法的作用:


assignWindows 将某个带有时间戳timestamp的元素element分配给一个或多个窗口,并返回窗口集合


getDefaultTrigger 返回WindowAssigner默认的 trigger


getWindowSerializer 返回一个类型序列化器用来序列化窗口


isEventTime 是否是 event time


然后再来看一下 WindowAssigner 的实现类 UML 图,如下所示:


image-20210316223805



windowAssigner


这里主要展示了 eventime 语义的, 可以看出 WindowAssigner 有 4 种不同的类型:


Tumbling windows


Sliding windows


Session windows


Global windows


接下来看一下大家用的比较多的 TumblingEventTimeWindows 和 SlidingEventTimeWindows 的源码(processing time 的实现类似) 看下窗口的划分到底是怎么实现的?


TumblingEventTimeWindows 源码


@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
   if (timestamp > Long.MIN_VALUE) {
      if (staggerOffset == null) {
         staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
      }
      // Long.MIN_VALUE is currently assigned when no timestamp is present
      long start = TimeWindow.getWindowStartWithOffset(timestamp, (globalOffset + staggerOffset) % size, size);
      return Collections.singletonList(new TimeWindow(start, start + size));
   } else {
      throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
            "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
            "'DataStream.assignTimestampsAndWatermarks(...)'?");
   }
}


元素的时间戳肯定是大于 Long.MIN_VALUE 的,所以会走到 if 里面 staggerOffset 默认值是空的,所以会先初始化(这个是一个新特性为了解决同一时间触发大量的窗口计算造成的性能问题),然后根据 timestamp 和 size 计算出窗口的开始时间,最后返回一个存储 TimeWindow 的单例集合.


SlidingEventTimeWindows 源码


@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
   if (timestamp > Long.MIN_VALUE) {
      List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
      long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
      for (long start = lastStart;
         start > timestamp - size;
         start -= slide) {
         windows.add(new TimeWindow(start, start + size));
      }
      return windows;
   } else {
      throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
            "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
            "'DataStream.assignTimestampsAndWatermarks(...)'?");
   }
}


滑动窗口跟上面的滚动窗口最大的不同是数据不是分配到一个窗口,而是分配到 size / slide 个不同的窗口里面,返回的是窗口的集合.


/**
  * Method to get the window start for a timestamp.
  *
  * @param timestamp epoch millisecond to get the window start.
  * @param offset The offset which window start would be shifted by.
  * @param windowSize The size of the generated windows.
  * @return window start
  */
 public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
  return timestamp - (timestamp - offset + windowSize) % windowSize;
 }


首先会根据元素的 timestamp offset slide 计算出窗口开始的时间戳,然后循环初始化给定的size内不同slide的窗口对象,最后返回一个 List


Session windows 和 Global windows 的实现相对简单这里就不在展开分析了,感兴趣的同学可以自己去看一下.


总结


这篇文章主要解析了 Window Assigner 的实现原理,结合滚动窗口和滑动窗口的源码分析了具体的实现过程.让大家对窗口的划分有更加深入的理解.

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
22天前
|
监控 网络协议 Java
Tomcat源码解析】整体架构组成及核心组件
Tomcat,原名Catalina,是一款优雅轻盈的Web服务器,自4.x版本起扩展了JSP、EL等功能,超越了单纯的Servlet容器范畴。Servlet是Sun公司为Java编程Web应用制定的规范,Tomcat作为Servlet容器,负责构建Request与Response对象,并执行业务逻辑。
Tomcat源码解析】整体架构组成及核心组件
|
6天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
10天前
|
开发工具
Flutter-AnimatedWidget组件源码解析
Flutter-AnimatedWidget组件源码解析
|
6天前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
|
29天前
|
测试技术 Python
python自动化测试中装饰器@ddt与@data源码深入解析
综上所述,使用 `@ddt`和 `@data`可以大大简化写作测试用例的过程,让我们能专注于测试逻辑的本身,而无需编写重复的测试方法。通过讲解了 `@ddt`和 `@data`源码的关键部分,我们可以更深入地理解其背后的工作原理。
25 1
|
1月前
|
消息中间件 Kubernetes 监控
实时计算 Flink版操作报错合集之在编译源码时遇到报错:无法访问,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
开发者 Python
深入解析Python `httpx`源码,探索现代HTTP客户端的秘密!
深入解析Python `httpx`源码,探索现代HTTP客户端的秘密!
68 1
|
1月前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
2月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
705 7
阿里云实时计算Flink在多行业的应用和实践
|
1月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章

推荐镜像

更多