Flink的APIs

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink的APIs

往期内容:Flink入门指引Flink应用

分层API

   Flink提供了三个分层的API。 如下所示,每个API针对不同的使用场景在简洁性和表达性之间提供了不同的权衡。

96447814-120fc980-1245-11eb-938d-6ea408716c72.png

1. ProcessFunctions

   Flink提供了ProcessFunctions来处理来自一个或两个输入流或在一个窗口中分组的事件的单个事件。ProcessFunctions提供对时间和状态的细粒度控制。ProcessFunction可以任意修改其状态并注册计时器,这些计时器将来会触发回调函数。因此,ProcessFunctions可以根据许多有状态事件驱动的应用程序的需要,实现复杂的每事件业务逻辑。

   下面的示例显示一个在KeyedStream上运行并匹配START和END事件的KeyedProcessFunction。接收到START事件时,该函数会记住其时间戳状态,并在四个小时内注册一个计时器。如果在计时器触发之前收到END事件,则该函数将计算END和START事件之间的持续时间,清除状态并返回该值。否则,计时器将触发并清除状态。

/**
 * Matches keyed START and END events and computes the difference between 
 * both elements' timestamps. The first String field is the key attribute, 
 * the second String attribute marks START and END events.
 */
public static class StartEndDuration
    extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {
  private ValueState<Long> startTime;
  @Override
  public void open(Configuration conf) {
    // obtain state handle
    startTime = getRuntimeContext()
      .getState(new ValueStateDescriptor<Long>("startTime", Long.class));
  }
  /** Called for each processed event. */
  @Override
  public void processElement(
      Tuple2<String, String> in,
      Context ctx,
      Collector<Tuple2<String, Long>> out) throws Exception {
    switch (in.f1) {
      case "START":
        // set the start time if we receive a start event.
        startTime.update(ctx.timestamp());
        // register a timer in four hours from the start event.
        ctx.timerService()
          .registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
        break;
      case "END":
        // emit the duration between start and end event
        Long sTime = startTime.value();
        if (sTime != null) {
          out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
          // clear the state
          startTime.clear();
        }
      default:
        // do nothing
    }
  }
  /** Called when a timer fires. */
  @Override
  public void onTimer(
      long timestamp,
      OnTimerContext ctx,
      Collector<Tuple2<String, Long>> out) {
    // Timeout interval exceeded. Cleaning up the state.
    startTime.clear();
  }
}

2. DataStream API

   DataStream API为许多常见的流处理操作提供了原语,例如窗口化,一次记录转换和通过查询外部数据存储来丰富事件。DataStream API可用于Java和Scala,并且基于诸如map(),reduce()和aggregate()之类的函数。函数可以通过扩展接口定义,也可以定义为Java或Scala lambda函数。

   以下示例显示了如何对点击流进行会话化并计算每个会话的点击次数。

// a stream of website clicks
DataStream<Click> clicks = ...
DataStream<Tuple2<String, Long>> result = clicks
  // project clicks to userId and add a 1 for counting
  .map(
    // define function by implementing the MapFunction interface.
    new MapFunction<Click, Tuple2<String, Long>>() {
      @Override
      public Tuple2<String, Long> map(Click click) {
        return Tuple2.of(click.userId, 1L);
      }
    })
  // key by userId (field 0)
  .keyBy(0)
  // define session window with 30 minute gap
  .window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
  // count clicks per session. Define function as lambda function.
  .reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));

3. SQL & Table API

   Flink具有两个关系API,即表API和SQL。这两个API都是用于批处理和流处理的统一API,即查询在无限制的实时流或有限制的记录流上以相同的语义执行,并产生相同的结果。Table API和SQL利用Apache Calcite进行解析,验证和查询优化。它们可以与DataStream和DataSet API无缝集成,并支持用户定义的标量,聚合和表值函数。

   Flink的关系API旨在简化数据分析,数据管道和ETL应用程序的定义。

   以下示例显示了SQL查询,以对点击流进行会话并计算每个会话的点击次数。这与DataStream API示例中的用例相同。

SELECT userId, COUNT(*)
FROM clicks
GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
10月前
|
存储 SQL API
Flink教程(23)- Flink高级特性(Streaming File Sink)
Flink教程(23)- Flink高级特性(Streaming File Sink)
480 0
|
4月前
|
监控 Apache 开发工具
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
102 0
|
4月前
|
SQL 关系型数据库 Apache
Flink CDC 是一个基于 Apache Flink 的开源库
Flink CDC 是一个基于 Apache Flink 的开源库
151 7
|
4月前
|
流计算
Flink 写es
Flink 写es
60 0
|
11月前
|
消息中间件 Kafka API
Flink-看完就会flink基础API
Flink-看完就会flink基础API
70 1
|
Prometheus 监控 Cloud Native
Flink Metrics 简介
Fink Metrics 是 Flink 内部的一个度量系统,除了可以在 Flink UI 上展示运行时的指标,也可以对外暴露接口集成到第三方系统,本文详述了这两方面的应用
|
消息中间件 NoSQL 关系型数据库
Flink常用Sink(elasticsearch(es)Sink、RedisSink、KafkaSink、MysqlSink、FileSink)
Flink常用Sink(elasticsearch(es)Sink、RedisSink、KafkaSink、MysqlSink、FileSink)
Flink常用Sink(elasticsearch(es)Sink、RedisSink、KafkaSink、MysqlSink、FileSink)
|
消息中间件 传感器 NoSQL
【Flink】(三)详解 Flink DataStream API(Environment、Source、Transform、Sink)1
【Flink】(三)详解 Flink DataStream API(Environment、Source、Transform、Sink)1
330 0
|
存储 SQL 传感器
【Flink】(04)Apache Flink 漫谈系列 —— 实时计算 Flink 与 Alibaba Cloud Realtime Compute 剖析2
【Flink】(04)Apache Flink 漫谈系列 —— 实时计算 Flink 与 Alibaba Cloud Realtime Compute 剖析2
585 0
【Flink】(04)Apache Flink 漫谈系列 —— 实时计算 Flink 与 Alibaba Cloud Realtime Compute 剖析2
|
SQL 消息中间件 分布式计算
【Flink】(04)Apache Flink 漫谈系列 —— 实时计算 Flink 与 Alibaba Cloud Realtime Compute 剖析1
【Flink】(04)Apache Flink 漫谈系列 —— 实时计算 Flink 与 Alibaba Cloud Realtime Compute 剖析1
326 0
【Flink】(04)Apache Flink 漫谈系列 —— 实时计算 Flink 与 Alibaba Cloud Realtime Compute 剖析1