Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(七)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面

一个常见的应用场景就是,如果我们希望连接到一个外部数据库进行读写操作,那么将连接操作放在map()中显然不是个好选择——因为每来一条数据就会重新连接一次数据库;所以我们可以在open()中建立连接,在map()中读写数据,而在close()中关闭连接。

publicclass MyFlatMap extends RichFlatMapFunction<IN, OUT>> {
@Override
public void open(Configuration configuration) {
  // 做一些初始化工作
  // 例如建立一个和MySQL的连接
}
@Override
public void flatMap(IN in, Collector<OUT out) {
  // 对数据库进行读写
}
@Override
public void close() {
  // 清理工作,关闭和MySQL数据库的连接。
}
}

9.3.4 物理分区算子(Physical Partitioning)

常见的物理分区策略有随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)。

  1. 随机分区(shuffle)

最简单的重分区方式就是直接“洗牌”。通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。

随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区,如图5-9所示。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。

640.png

经过随机分区之后,得到的依然是一个DataStream。

我们可以做个简单测试:将数据读入之后直接打印到控制台,将输出的并行度设置为2,中间经历一次shuffle。执行多次,观察结果是否相同。

publicclass ShuffleExample {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<Integer> stream = env.fromElements(1, 2, 3, 4).setParallelism(1);
    stream.shuffle().print().setParallelism(2);
    env.execute();
  }
}
  1. 轮询分区(Round-Robin)

轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,如图5-10所示。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是Round-Robin负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

640.png

  1. 重缩放分区(rescale)

重缩放分区和轮询分区非常相似。当调用rescale()方法时,其实底层也是使用Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,如图5-11所示。也就是说,“发牌人”如果有多个,那么rebalance的方式是每个发牌人都面向所有人发牌;而rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。

aa3c1d4d581a119f7363574498f2f1b2.png

从底层实现上看,rebalance和rescale的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;而rescale仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源。

  1. 广播(broadcast)

这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

  1. 全局分区(global)

全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1,所以使用这个操作需要非常谨慎,可能对程序造成很大的。

  1. 自定义分区(Custom)

当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。

9.4  输出算子(Sink)

5a458a5b3957d229e300b2d72d7aafcf.png

Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

9.4.1 连接到外部系统

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。

与Source算子非常类似,除去一些Flink预实现的Sink,一般情况下Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

stream.addSink(newSinkFunction(…));

addSource的参数需要实现一个SourceFunction接口;类似地,addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

当然,SinkFuntion多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如图5-13所示,列出了Flink官方目前支持的第三方系统连接器:

d443729366fea9d42f50be8d71c1b2ed.png

我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、文件系统(FileSystem)、JDBC等数据存储系统,则只提供了输出写入的sink连接器。

除Flink官方之外,Apache Bahir作为给Spark和Flink提供扩展支持的项目,也实现了一些其他第三方系统与Flink的连接器,如图所示。

0dd8e18db222e13bde78c2045bef4e35.png

除此以外,就需要用户自定义实现sink连接器了。

9.4.2 输出到文件

Flink专门提供了一个流式文件系统的连接器:StreamingFileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

StreamingFileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink的静态方法:

行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)。
 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)。

下面我们就以行编码为例,将一些测试数据直接写入文件:

publicclass SinkFile {
 public static void main(String[] args) throws Exception {
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.setParallelism(1);
   DataStream<String> input = env.fromElements("hello world", "hello flink");
   final StreamingFileSink<String> sink = StreamingFileSink
       .forRowFormat(new Path("./output"), 
new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(
           DefaultRollingPolicy.builder()
             .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
             .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
             .withMaxPartSize(1024 * 1024 * 1024)
             .build())
       .build();
   input.addSink(sink);
   env.execute();
 }
}

这里我们创建了一个简单的文件Sink,通过.withRollingPolicy()方法指定了一个“滚动策略”。因为文件会有内容持续不断地写入,所以我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面的代码设置了在以下3种情况下,我们就会滚动分区文件:

至少包含15分钟的数据
最近5分钟没有收到新的数据
文件大小已达到1 GB

9.4.3 输出到Kafka

  • 添加Kafka 连接器依赖

由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。

  • 启动Kafka集群
  • 编写输出到Kafka的示例代码

我们可以直接将用户行为数据保存为文件clicks.csv,读取后不做转换直接写入Kafka,主题(topic)命名为“clicks”。

publicclass SinkKafka {
 public static void main(String[] args) throws Exception {
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.setParallelism(1);
   Properties properties = new Properties();
   properties.put("bootstrap.servers", "hadoop102:9092");
   DataStreamSource<String> stream = env.readTextFile("input/clicks.csv");
   stream
       .addSink(new FlinkKafkaProducer<String>(
           "clicks",
           new SimpleStringSchema(),
           properties
       ));
   env.execute();
 }
}
  • 运行代码,在Linux主机启动一个消费者, 查看是否收到数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic clicks

9.4.4 输出到Redis

Flink没有直接提供官方的Redis连接器,不过Bahir项目还是担任了合格的辅助角色,为我们提供了Flink-Redis的连接工具。但版本升级略显滞后,目前连接器版本为1.0,支持的Scala版本最新到2.11。由于我们的测试不涉及到Scala的相关版本变化,所以并不影响使用。在实际项目应用中,应该以匹配的组件版本运行。

具体测试步骤如下:

  • 导入的Redis连接器依赖
<dependency>
 <groupId>org.apache.bahir</groupId>
 <artifactId>flink-connector-redis_2.11</artifactId>
 <version>1.0</version>
</dependency>
  • 启动Redis集群

这里我们为方便测试,只启动了单节点Redis。

  • 编写输出到Redis的示例代码

连接器为我们提供了一个RedisSink,它继承了抽象类RichSinkFunction,这就是已经实现好的向Redis写入数据的SinkFunction。我们可以直接将Event数据输出到Redis:

publicclass SinkRedis {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop200").build();
    env.addSource(new ClickSource())
        .addSink(new RedisSink<Event>(conf, new MyRedisMapper()));
    env.execute();
  }
}

这里RedisSink的构造方法需要传入两个参数:

JFlinkJedisConfigBase:Jedis的连接配置
 RedisMapper:Redis映射类接口,说明怎样将数据转换成可以写入Redis的类型

接下来主要就是定义一个Redis的映射类,实现RedisMapper接口。

publicstaticclass MyRedisMapper implements RedisMapper<Event> {
  @Override
  public String getKeyFromData(Event e) {
    return e.user;
  }
  @Override
  public String getValueFromData(Event e) {
    return e.url;
  }
  @Override
  public RedisCommandDescription getCommandDescription() {
    returnnew RedisCommandDescription(RedisCommand.HSET, "clicks");
  }
}

在这里我们可以看到,保存到Redis时调用的命令是HSET,所以是保存为哈希表(hash),表名为“clicks”;保存的数据以user为key,以url为value,每来一条数据就会做一次转换。

  • 运行代码,Redis查看是否收到数据。

9.4.5 输出到Elasticsearch

Flink为ElasticSearch专门提供了官方的Sink 连接器。

写入数据的ElasticSearch的测试步骤如下。

  • 添加Elasticsearch 连接器依赖
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
  • 启动Elasticsearch集群
  • 编写输出到Elasticsearch的示例代码
publicclass SinkToEs {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    ArrayList<HttpHost> httpHosts = new ArrayList<>();
    httpHosts.add(new HttpHost("hadoop102", 9200, "http"));
    ElasticsearchSink.Builder<Event> esBuilder = new ElasticsearchSink.Builder<>(
        httpHosts,
        new ElasticsearchSinkFunction<Event>() {
          @Override
          public void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
            HashMap<String, String> data = new HashMap<>();
            data.put(event.user, event.url);
            IndexRequest indexRequest = Requests
                .indexRequest()
                .index("clicks")
               .type("type")
                .source(data);
            requestIndexer.add(indexRequest);
          }
        }
    );
    DataStream<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L));
    stream.addSink(esBuilder.build());
    env.execute();
  }
}

与RedisSink类似,连接器也为我们实现了写入到Elasticsearch的SinkFunction——ElasticsearchSink。区别在于,这个类的构造方法是私有(private)的,我们需要使用ElasticsearchSink的Builder内部静态类,调用它的build()方法才能创建出真正的SinkFunction。

而Builder的构造方法中又有两个参数:

httpHosts:连接到的Elasticsearch集群主机列表
 elasticsearchSinkFunction:这并不是我们所说的SinkFunction,而是用来说明具体处理逻辑、准备数据向Elasticsearch发送请求的函数

具体的操作需要重写中elasticsearchSinkFunction中的process方法,我们可以将要发送的数据放在一个HashMap中,包装成IndexRequest向外部发送HTTP请求。

  • 运行代码,访问Elasticsearch查看是否收到数据。

9.4.6 输出到MySQL(JDBC)

写入数据的MySQL的测试步骤如下。

  • 添加依赖
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.47</version>
</dependency>
  • 启动MySQL,在test库下建表clicks
mysql> createtable clicks(
  -> uservarchar(20) notnull,
  -> urlvarchar(100) notnull);
  • 编写输出到MySQL的示例代码
publicclass SinkToMySQL {
public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(1);
  env
      .fromElements(
          new Event("Mary", "./home", 1000L),
          new Event("Bob", "./cart", 2000L)
      )
      .addSink(
          JdbcSink.sink(
              "INSERT INTO clicks (user, url) VALUES (?, ?)",
              (statement, r) -> {
                statement.setString(1, r.user);
                statement.setString(2, r.url);
              },
              JdbcExecutionOptions.builder()
                  .withBatchSize(1000)
                  .withBatchIntervalMs(200)
                  .withMaxRetries(5)
                  .build(),
              new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                  .withUrl("jdbc:mysql://localhost:3306/test")
                  // 使用MySQL 5.7的话,没有cj
                  .withDriverName("com.mysql.cj.jdbc.Driver")
                  .withUsername("username")
                  .withPassword("password")
                  .build()
          )
      );
  env.execute();
}
}
  • 运行代码,用客户端连接MySQL,查看是否成功写入数据。

9.4.7 自定义Sink输出

如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

stream.addSink(new MySinkFunction<String>());

在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

十、Flink中的时间和窗口

在流数据处理应用中,一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来我们就深入了解一下Flink中的时间语义和窗口的应用。

10.1 时间语义

10.1.1 Flink中的时间语义

da511c701734b7f91cc378fb3ccd2625.png

如图所示,在事件发生之后,生成的数据被收集起来,首先进入分布式消息队列,然后被Flink系统中的Source算子读取消费,进而向下游的转换算子(窗口算子)传递,最终由窗口算子进行计算处理。

很明显,这里有两个非常重要的时间点:一个是数据产生的时间,我们把它叫作“事件时间”(Event Time);另一个是数据真正被处理的时刻,叫作“处理时间”(Processing Time)。我们所定义的窗口操作,到底是以那种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有所滞后。

  1. 处理时间(Processing Time)

处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。处理时间是最简单的时间语义。

  1. 事件时间(Event Time)
事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。
数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实就是这条数据记录的“时间戳”(Timestamp)。
在事件时间语义下,我们对于时间的衡量,就不看任何机器的系统时间了,而是依赖于数据本身。由于分布式系统中网络传输延迟的不确定性,实际应用中我们要面对的数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在Flink中把它叫作事件时间的“水位线”(Watermarks)。

10.1.2 哪种时间语义更重要

  1. 从《星球大战》说起为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例子:电影《星球大战》。

a63415710a52edf283e7b0c0d78f1e05.png

如图所示,我们会发现,看电影其实就是处理影片中数据的过程,所以影片的上映时间就相当于“处理时间”;而影片的数据就是所描述的故事,它所发生的背景时间就相当于“事件时间”。两种时间语义都有各自的用途,适用于不同的场景。

  1. 数据处理系统中的时间语义

在计算机系统中,考虑数据处理的“时代变化”是没什么意义的,我们更关心的,显然是数据本身产生的时间。

所以在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。

在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从1.12版本开始,Flink已经将事件时间作为了默认的时间语义。

10.2 水位线(Watermark)

10.2.1 事件时间和窗口

ded9add8a71fe974cd2e52257b9caf6d.png

在窗口的处理过程中,我们可以基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。

这样的好处在于,计算的过程可以完全不依赖处理时间(系统时间),不论什么时候进行统计处理,得到的结果都是正确的。而一般实时流处理的场景中,事件时间可以基本与处理时间保持同步,只是略微有一点延迟,同时保证了窗口计算的正确性。

10.2.2 什么是水位线

在Flink中,用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

d377bb95e85407e387edbd11584edd6e.png

  1. 有序流中的水位线

在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;也就是说,它们处理的过程会保持原先的顺序不变,遵守先来后到的原则。这样的话我们从每个数据中提取时间戳,就可以保证总是从小到大增长的,从而插入的水位线也会不断增长、事件时钟不断向前推进。

实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同,同时涌来的数据时间差会非常小(比如几毫秒),往往对处理计算也没什么影响。所以为了提高效率,一般会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳,如图6-6所示。所以这时的水位线,其实就是有序流中的一个周期性出现的时间标记。

02e68bc5aa7b46128754406dae9df7f7.png

  1. 乱序流中的水位线

在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是所谓的“乱序数据”。

这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,主要就是基于数据的产生时间而言的。如图6-7所示,一个7秒时产生的数据,生成时间自然要比9秒的数据早;但是经过数据缓存和传输之后,处理任务可能先收到了9秒的数据,之后7秒的数据才姗姗来迟。这时如果我们希望插入水位线,来指示当前的事件时间进展,又该怎么做呢?

266119b99882674eb606a255bb091062.png

解决思路也很简单:我们还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。不过现在的情况是数据乱序,所以插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线,如图6-8所示。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

9ad861f4b39da0849fd1184f3b0519a8.png

如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线,如下图所示。

cb2a2617e1e5f2dd66c10c88948202ec.png

但是这样做会带来一个非常大的问题:我们无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据,我们也可以等上一段时间,比如2秒;也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳,如下图所示。这样的话,9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,这时迟到数据也都已收集齐,0~9秒的窗口就可以正确计算结果了。

979c535937d22f2c786fff687888d5cb.png

  1. 水位线的特性

现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。

总结一下水位线的特性:

水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
水位线是基于数据的时间戳生成的
水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
水位线可以通过设置延迟,来保证正确处理乱序数据
一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t, 这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’ ≤ t的数据
水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。

10.2.3 如何生成水位线

水位线是用来保证窗口处理结果的正确性的,如果不能正确处理所有乱序数据,可以尝试调大延迟的时间。

  1. 生成水位线的总体原则

完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。

如果我们希望计算结果能更加准确,那可以将水位线的延迟设置得更高一些,等待的时间越长,自然也就越不容易漏掉数据。不过这样做的代价是处理的实时性降低了,我们可能为极少数的迟到数据增加了很多不必要的延迟。

如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。

所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

  1. 水位线生成策略(Watermark Strategies)

在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
WatermarkStrategy<T> watermarkStrategy)

具体使用时,直接用DataStream调用该方法即可,与普通的transform方法完全一样。

DataStream<Event> stream = env.addSource(new ClickSource());
DataStream<Event> withTimestampsAndWatermarks = 
stream.assignTimestampsAndWatermarks(*<watermark strategy>*);

.assignTimestampsAndWatermarks()方法需要传入一个WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy中包含了一个“时间戳分配器” TimestampAssigner和一个“水位线生成器” WatermarkGenerator。

publicinterface WatermarkStrategy<T> 
  extends TimestampAssignerSupplier<T>,
      WatermarkGeneratorSupplier<T>{
  @Override
  TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
  @Override
  WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。
在WatermarkGenerator接口中,主要又有两个方法:onEvent()和onPeriodicEmit()。
onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作
onPeriodicEmit:周期性调用的方法,可以由WatermarkOutput发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms。
env.getConfig().setAutoWatermarkInterval(60 * 1000L);
  1. Flink内置水位线生成器

WatermarkStrategy这个接口是一个生成水位线策略的抽象,而Flink提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程,而且也为我们自定义水位线策略提供了模板。

这两个生成器可以通过调用WatermarkStrategy的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。

  • 有序流

对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。

stream.assignTimestampsAndWatermarks(
    WatermarkStrategy.<Event>forMonotonousTimestamps()
        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
          @Override
          public long extractTimestamp(Event element, long recordTimestamp) {
            return element.timestamp;
          }
        })
);

上面代码中我们调用.withTimestampAssigner()方法,将数据中的timestamp字段提取出来,作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略。这样,提取出的数据时间戳,就是我们处理计算的事件时间。

这里需要注意的是,时间戳和水位线的单位,必须都是毫秒。

  • 乱序流

由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。

代码示例如下:

publicclass WatermarkExample {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env
        .addSource(new ClickSource())
        // 插入水位线的逻辑
        .assignTimestampsAndWatermarks(
             // 针对乱序流插入水位线,延迟时间设置为5s
              WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
       .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                  // 抽取时间戳的逻辑
                  @Override
                  public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                  }
                })
        )
        .print();
    env.execute();
  }
}

上面代码中,我们同样提取了timestamp字段作为时间戳,并且以5秒的延迟时间创建了处理乱序流的水位线生成器。

  1. 自定义水位线策略

一般来说,Flink内置的水位线生成器就可以满足应用需求了。不过有时我们的业务逻辑可能非常复杂,这时对水位线生成的逻辑也有更高的要求,就必须自定义实现水位线策略WatermarkStrategy了。

在WatermarkStrategy中,时间戳分配器TimestampAssigner都是大同小异的,指定字段提取时间戳就可以了;而不同策略的关键就在于WatermarkGenerator的实现。整体说来,Flink有两种不同的生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)。

WatermarkGenerator接口中有两个方法,onEvent()和onPeriodicEmit(),前者是在每个事件到来时调用,而后者由框架周期性调用。周期性调用的方法中发出水位线,自然就是周期性生成水位线;而在事件触发的方法中发出水位线,自然就是断点式生成了。两种方式的不同就集中体现在这两个方法的实现上。

  • 周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。

下面是一段自定义周期性生成水位线的代码:

import com.atguigu.bean.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// 自定义水位线的产生
publicclass CustomPeriodicWatermarkExample {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env
        .addSource(new ClickSource())
        .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
        .print();
    env.execute();
  }
  publicstaticclass CustomWatermarkStrategy implements WatermarkStrategy<Event> {
    @Override
    public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
      returnnew SerializableTimestampAssigner<Event>() {
        @Override
        public long extractTimestamp(Event element, long recordTimestamp) {
          return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
        }
      };
    }
    @Override
    public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
      returnnew CustomBoundedOutOfOrdernessGenerator();
    }
  }
  publicstaticclass CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {
    private Long delayTime = 5000L; // 延迟时间
    private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳
    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
      // 每来一条数据就调用一次
      maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
    }
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
      // 发射水位线,默认200ms调用一次
      output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
    }
  }
}

我们在onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认200ms一次。

  • 断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过onPeriodicEmit()发出水位线。

自定义的断点式水位线生成器代码如下:

publicclass PunctuatedGenerator implements WatermarkGenerator<Event> {
  @Override
  public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {
// 只有在遇到特定的itemId时,才发出水位线
    if (r.user.equals("Mary")) {
      output.emitWatermark(new Watermark(r.timestamp - 1));
    }
  }
  @Override
  public void onPeriodicEmit(WatermarkOutput output) {
    // 不需要做任何事情,因为我们在onEvent方法中发射了水位线
  }
}

我们在onEvent()中判断当前事件的user字段,只有遇到“Mary”这个特殊的值时,才调用output.emitWatermark()发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。

  1. 在自定义数据源中发送水位线

我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。示例程序如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Random;
publicclass EmitWatermarkInSourceFunction {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.addSource(new ClickSource()).print();
    env.execute();
  }
  // 泛型是数据源中的类型
  publicstaticclass ClickSource implements SourceFunction<Event> {
    privateboolean running = true;
    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
      Random random = new Random();
      String[] userArr = {"Mary", "Bob", "Alice"};
      String[] urlArr  = {"./home", "./cart", "./prod?id=1"};
      while (running) {
        long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳
        String username = userArr[random.nextInt(userArr.length)];
        String url    = urlArr[random.nextInt(urlArr.length)];
        Event event = new Event(username, url, currTs);
        // 使用collectWithTimestamp方法将数据发送出去,并指明数据中的时间戳的字段
        sourceContext.collectWithTimestamp(event, event.timestamp);
        // 发送水位线
        sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
        Thread.sleep(1000L);
      }
    }
    @Override
    public void cancel() {
      running = false;
    }
  }
}

在自定义水位线中生成水位线相比assignTimestampsAndWatermarks方法更加灵活,可以任意的产生周期性的、非周期性的水位线,以及水位线的大小也完全由我们自定义。所以非常适合用来编写Flink的测试程序,测试Flink的各种各样的特性。

10.2.4 水位线的传递

55544acffd1a5d5563310ab6cfae3896.png

在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收刀多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。

如图所示,当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三个并行子任务,所以会向三个分区发出水位线。具体过程如下:

(1)上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线”(Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。
(2)当有一个新的水位线(第一分区的4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的3,于是当前任务时钟就推进到了3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务。
(3)再次收到新的水位线(第二分区的7)后,执行同样的处理流程。首先将第二个分区时钟更新为7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。
(4)同样道理,当又一次收到新的水位线(第三分区的6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的4,所以当前任务的时钟推进到4,并发出时间戳为4的水位线,广播到下游各个分区任务。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
存储 分布式计算 API
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
95 0
|
28天前
|
SQL 存储 数据库
【赵渝强老师】基于Flink的流批一体架构
本文介绍了Flink如何实现流批一体的系统架构,包括数据集成、数仓架构和数据湖的流批一体方案。Flink通过统一的开发规范和SQL支持,解决了传统架构中的多套技术栈、数据链路冗余和数据口径不一致等问题,提高了开发效率和数据一致性。
100 7
|
2月前
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
75 1
|
3月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
1月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1077 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
24天前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
142 56
|
5月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
851 7
阿里云实时计算Flink在多行业的应用和实践
|
4月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 运维 数据可视化
阿里云实时计算Flink版产品体验测评
阿里云实时计算Flink基于Apache Flink构建,提供一站式实时大数据分析平台,支持端到端亚秒级实时数据分析,适用于实时大屏、实时报表、实时ETL和风控监测等场景,具备高性价比、开发效率、运维管理和企业安全等优势。
|
2月前
|
运维 搜索推荐 数据安全/隐私保护
阿里云实时计算Flink版测评报告
阿里云实时计算Flink版在用户行为分析与标签画像场景中表现出色,通过实时处理电商平台用户行为数据,生成用户兴趣偏好和标签,提升推荐系统效率。该服务具备高稳定性、低延迟、高吞吐量,支持按需计费,显著降低运维成本,提高开发效率。
77 1