【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析1

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析1

文章目录


一、前言


二、代码部分


三、构建数据源


四、操作数据流

4.1 flatMap 转换

4.2 keyBy 转换

4.3 timeWindow 转换

4.4 reduce 转换


五、输出统计结果


一、前言


本文主要是根据一段简单的 SocketWindowWordCount 代码,进而对 Flink 的执行过程进行剖析。话不多说,直接上代码 …


二、代码部分


public class SocketWindowWordCount {
    public static void main(String[] args) throws Exception {
        /** 需要连接的主机名和端口 */
        final String hostname;
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            hostname = params.get("hostname");
            port = params.getInt("port");
        } catch (Exception e) {
            e.printStackTrace();
            System.err.println("Please run 'SocketWindowWordCount --host <host> --port <port>'");
            return;
        }
        /** 获取{@link StreamExecutionEnvironment}的具体实现的实例 */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        /** 通过连接给定地址和端口, 获取数据流的数据源 */
        DataStream<String> text = env.socketTextStream(hostname, port);
        /** 对数据流中的数据进行解析、分组、窗口、以及聚合 */
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });
        /** 打印出分析结果 */
        windowCounts.print();
        /** 懒加载,执行处理程序 */
        env.execute("Socket Window WordCount");
    }
    /** 单词和统计次数的数据结构 */
    public static class WordWithCount {
        public String word;
        public long count;
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

对于上述实现,接下来要分析的内容有:


如何创建从指定host和port接收数据的数据源;

如何对创建好的数据源进行一系列操作来实现所需功能;

如何将分析结果打印出来。


三、构建数据源


数据源的构建是通过 StreamExecutionEnviroment 的具体实现的实例来构建的。


DataStream<String> text = env.socketTextStream(hostname, port);

在 StreamExecutionEnviroment 中:在指定的 host 和 port 上构建了一个接受网络数据的数据源。


public DataStreamSource<String> socketTextStream(String hostname, int port) {
   return socketTextStream(hostname, port, "\n");
}
public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter) {
   return socketTextStream(hostname, port, delimiter, 0);
}
public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
   return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
         "Socket Stream");
}


可以看到会根据传入的hostname、port,以及默认的行分隔符”\n”,和最大尝试次数0,构造一个SocketTextStreamFunction实例,并采用默认的数据源节点名称为”Socket Stream”。


SocketTextStreamFunction 的类继承图如下所示,可以看出其是 SourceFunction 的一个子类,而 SourceFunction 是Flink中数据源的基础接口。


image.png


SourceFunction 内部方法:

@Public
public interface SourceFunction<T> extends Function, Serializable {
   void run(SourceContext<T> ctx) throws Exception;
   void cancel();
   @Public
   interface SourceContext<T> {
      void collect(T element);
      @PublicEvolving
      void collectWithTimestamp(T element, long timestamp);
      @PublicEvolving
      void emitWatermark(Watermark mark);
      @PublicEvolving
      void markAsTemporarilyIdle();
      Object getCheckpointLock();
      void close();
   }
}


run(SourceContex) 方法:就是实现数据获取逻辑的地方,并可以通过传入的参数ctx进行向下游节点的数据转发。


cancel() 方法:则是用来取消数据源的数据产生,一般在run方法中,会存在一个循环来持续产生数据,而cancel方法则可以使得该循环终止。


其内部接口SourceContex则是用来进行数据发送的接口。了解了SourceFunction这个接口的功能后,来看下SocketTextStreamFunction的具体实现,也就是主要看其run方法的具体实现。


public void run(SourceContext<String> ctx) throws Exception {
   final StringBuilder buffer = new StringBuilder();
   long attempt = 0;
   /** 这里是第一层循环,只要当前处于运行状态,该循环就不会退出,会一直循环 */
   while (isRunning) {
      try (Socket socket = new Socket()) {
         /** 对指定的hostname和port,建立Socket连接,并构建一个BufferedReader,用来从Socket中读取数据 */
         currentSocket = socket;
         LOG.info("Connecting to server socket " + hostname + ':' + port);
         socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
         BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
         char[] cbuf = new char[8192];
         int bytesRead;
         /** 这里是第二层循环,对运行状态进行了双重校验,同时对从Socket中读取的字节数进行判断 */
         while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
            buffer.append(cbuf, 0, bytesRead);
            int delimPos;
            /** 这里是第三层循环,就是对从Socket中读取到的数据,按行分隔符进行分割,并将每行数据作为一个整体字符串向下游转发 */
            while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
               String record = buffer.substring(0, delimPos);
               if (delimiter.equals("\n") && record.endsWith("\r")) {
                  record = record.substring(0, record.length() - 1);
               }
               /** 用入参ctx,进行数据的转发 */
               ctx.collect(record);
               buffer.delete(0, delimPos + delimiter.length());
            }
         }
      }
      /** 如果由于遇到EOF字符,导致从循环中退出,则根据运行状态,以及设置的最大重试尝试次数,决定是否进行 sleep and retry,或者直接退出循环 */
      if (isRunning) {
         attempt++;
         if (maxNumRetries == -1 || attempt < maxNumRetries) {
            LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs...");
            Thread.sleep(delayBetweenRetries);
         }
         else {
            break;
         }
      }
   }
   /** 在最外层的循环都退出后,最后检查下缓存中是否还有数据,如果有,则向下游转发 */
   if (buffer.length() > 0) {
      ctx.collect(buffer.toString());
   }
}


run 方法的逻辑如上,逻辑很清晰,就是从指定的hostname和port持续不断的读取数据,按行分隔符划分成一个个字符串,然后转发到下游。


cancel 方法的实现如下,就是将运行状态的标识isRunning属性设置为false,并根据需要关闭当前socket。


public void cancel() {
   isRunning = false;
   Socket theSocket = this.currentSocket;
   /** 如果当前socket不为null,则进行关闭操作 */
   if (theSocket != null) {
      IOUtils.closeSocket(theSocket);
   }
}


对SocketTextStreamFunction的实现清楚之后,回到 StreamExecutionEnvironment 中,看 addSource 方法。


public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
   return addSource(function, sourceName, null);
}
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
   /** 如果传入的输出数据类型信息为null,则尝试提取输出数据的类型信息 */
   if (typeInfo == null) {
      if (function instanceof ResultTypeQueryable) {
         /** 如果传入的function实现了ResultTypeQueryable接口, 则直接通过接口获取 */
         typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
      } else {
         try {
            /** 通过反射机制来提取类型信息 */
            typeInfo = TypeExtractor.createTypeInfo(
                  SourceFunction.class,
                  function.getClass(), 0, null, null);
         } catch (final InvalidTypesException e) {
            /** 提取失败, 则返回一个MissingTypeInfo实例 */
            typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
         }
      }
   }
   /** 根据function是否是ParallelSourceFunction的子类实例来判断是否是一个并行数据源节点 */
   boolean isParallel = function instanceof ParallelSourceFunction;
   /** 闭包清理, 可减少序列化内容, 以及防止序列化出错 */
   clean(function);
   StreamSource<OUT, ?> sourceOperator;
   /** 根据function是否是StoppableFunction的子类实例, 来决定构建不同的StreamOperator */
   if (function instanceof StoppableFunction) {
      sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
   } else {
      sourceOperator = new StreamSource<>(function);
   }
   /** 返回一个新构建的DataStreamSource实例 */
   return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}


通过对 addSource 重载方法的依次调用,最后得到了一个 DataStreamSource 的实例。


TypeInformation 是Flink的类型系统中的核心类,用作函数输入和输出的类型都需要通过TypeInformation来表示,TypeInformation可以看做是数据类型的一个工具,可以通过它获取对应数据类型的序列化器和比较器等。


由于SocketTextStreamFunction不是继承自ParallelSourceFunction,且实现stoppableFunction接口,isParallel的值为false,以及sourceOperator变量对应的是一个StreamSource实例。


StreamSource 的类继承图如下所示:


image.png


上图可以看出 StreamSource 是 StreamOperator 接口的一个具体实现类,其构造函数的入参就是一个 SourceFunction 的子类实例,这里就是前面介绍过的


SocketTextStreamFunciton 的实例,构造过程如下:
public StreamSource(SRC sourceFunction) {
   super(sourceFunction);
   this.chainingStrategy = ChainingStrategy.HEAD;
}
public AbstractUdfStreamOperator(F userFunction) {
   this.userFunction = requireNonNull(userFunction);
   checkUdfCheckpointingPreconditions();
}
private void checkUdfCheckpointingPreconditions() {
   if (userFunction instanceof CheckpointedFunction && userFunction instanceof ListCheckpointed) {
      throw new IllegalStateException("User functions are not allowed to implement AND ListCheckpointed.");
   }
}


把传入的 userFunction 赋值给自己的属性变量,并对传入的 userFunction 做了校验工作,然后将链接策略设置为HEAD。


Flink中为了优化执行效率,会对数据处理链中的相邻节点会进行合并处理,链接策略有三种:


ALWAYS —— 尽可能的与前后节点进行链接;

NEVER —— 不与前后节点进行链接;

HEAD —— 只能与后面的节点链接,不能与前面的节点链接。

作为数据源的源头,是最顶端的节点了,所以只能采用HEAD或者NEVER,对于StreamSource,采用的是HEAD策略。


StreamOperator 是Flink中流操作符的基础接口,其抽象子类 AbstractStreamOperator 实现了一些公共方法,用户自定义的数据处理逻辑会被封装在 StreamOperator 的具体实现子类中。


在 sourceOperator 变量被赋值后,即开始进行 DataStreamSource 的实例构建,并作为数据源构造调用的返回结果。


return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);

DataStreamSource 的类继承图如下所示,是具有一个预定义输出类型的 DataStream。


image.png


在Flink中,DataStream描述了一个具有相同数据类型的数据流,其提供了数据操作的各种API,如map、reduce等,通过这些API,可以对数据流中的数据进行各种操作,DataStreamSource的构建过程如下:


public DataStreamSource(StreamExecutionEnvironment environment,
      TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
      boolean isParallel, String sourceName) {
   super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
   this.isParallel = isParallel;
   if (!isParallel) {
      setParallelism(1);
   }
}
protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
   super(environment, transformation);
}
public DataStream(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
   this.environment = Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
   this.transformation = Preconditions.checkNotNull(transformation, "Stream Transformation must not be null.");
}


可见构建过程就是初始化了DataStream中的environment和transformation这两个属性。


其中 transformation 赋值的是 SourceTranformation 的一个实例,SourceTransformation是 StreamTransformation 的子类,而StreamTransformation则描述了创建一个DataStream的操作。对于每个DataStream,其底层都是有一个StreamTransformation的具体实例的,所以在DataStream在构造初始时会为其属性transformation设置一个具体的实例。并且DataStream的很多接口的调用都是直接调用的StreamTransformation的相应接口,如并行度、id、输出数据类型信息、资源描述等。


通过上述过程,根据指定的hostname和port进行数据产生的数据源就构造完成了,获得的是一个DataStreamSource的实例,描述的是一个输出数据类型是String的数据流的源。


在上述的数据源的构建过程中,出现 Function(SourceFunction)、StreamOperator、StreamTransformation、DataStream 这四个接口:


Function接口:用户通过继承该接口的不同子类来实现用户自己的数据处理逻辑,如上述中实现了SourceFunction这个子类,来实现从指定hostname和port来接收数据,并转发字符串的逻辑;


StreamOperator接口:数据流操作符的基础接口,该接口的具体实现子类中,会有保存用户自定义数据处理逻辑的函数的属性,负责对userFunction的调用,以及调用时传入所需参数,比如在StreamSource这个类中,在调用SourceFunction的run方法时,会构建一个SourceContext的具体实例,作为入参,用于run方法中,进行数据的转发;


StreamTransformation接口:该接口描述了构建一个DataStream的操作,以及该操作的并行度、输出数据类型等信息,并有一个属性,用来持有StreamOperator的一个具体实例;


DataStream:描述的是一个具有相同数据类型的数据流,底层是通过具体的StreamTransformation来实现,其负责提供各种对流上的数据进行操作转换的API接口。


通过上述的关系,最终用户自定义数据处理逻辑的函数,以及并行度、输出数据类型等就都包含在了DataStream中,而DataStream也就可以很好的描述一个具体的数据流了。


上述四个接口的包含关系是这样的:Function –> StreamOperator –> StreamTransformation –> DataStream。


通过数据源的构造,理清Flink数据流中的几个接口的关系后,接下来再来看如何在数据源上进行各种操作,达到最终的数据统计分析的目的。



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
341 33
The Past, Present and Future of Apache Flink
|
3月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
943 13
Apache Flink 2.0-preview released
|
3月前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
143 3
|
3月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
106 1
|
3月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
271 0
|
4月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
2月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1548 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
5天前
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
117 0
Flink CDC 在阿里云实时计算Flink版的云上实践
zdl
|
2月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
186 56

热门文章

最新文章

推荐镜像

更多