Flink 双流 Join 的3种操作示例

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作。同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双流 join,分别是:1、join();2、coGroup();3、intervalJoin()

在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作。同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双流 join,分别是:


  • join()
  • coGroup()
  • intervalJoin()


本文举例说明它们的使用方法,顺便聊聊比较特殊的 interval join 的原理。


准备数据


从 Kafka 分别接入点击流和订单流,并转化为 POJO。


DataStream<String> clickSourceStream = env
  .addSource(new FlinkKafkaConsumer011<>(
    "ods_analytics_access_log",
    new SimpleStringSchema(),
    kafkaProps
  ).setStartFromLatest());
DataStream<String> orderSourceStream = env
  .addSource(new FlinkKafkaConsumer011<>(
    "ods_ms_order_done",
    new SimpleStringSchema(),
    kafkaProps
  ).setStartFromLatest());
DataStream<AnalyticsAccessLogRecord> clickRecordStream = clickSourceStream
  .map(message -> JSON.parseObject(message, AnalyticsAccessLogRecord.class));
DataStream<OrderDoneLogRecord> orderRecordStream = orderSourceStream
  .map(message -> JSON.parseObject(message, OrderDoneLogRecord.class));


join()


join() 算子提供的语义为"Window join",即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间两种时间特征。以下示例以10秒滚动窗口,将两个流通过商品 ID 关联,取得订单流中的售价相关字段。


640.png

clickRecordStream
  .join(orderRecordStream)
  .where(record -> record.getMerchandiseId())
  .equalTo(record -> record.getMerchandiseId())
  .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
  .apply(new JoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
    @Override
    public String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception {
      return StringUtils.join(Arrays.asList(
        accessRecord.getMerchandiseId(),
        orderRecord.getPrice(),
        orderRecord.getCouponMoney(),
        orderRecord.getRebateAmount()
      ), '\t');
    }
  })
  .print().setParallelism(1);


简单易用。


coGroup()


只有 inner join 肯定还不够,如何实现 left/right outer join 呢?答案就是利用 coGroup() 算子。它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。


以下的例子就实现了点击流 left join 订单流的功能,是很朴素的 nested loop join 思想(二重循环)。


clickRecordStream
  .coGroup(orderRecordStream)
  .where(record -> record.getMerchandiseId())
  .equalTo(record -> record.getMerchandiseId())
  .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
  .apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2<String, Long>>() {
    @Override
    public void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords, Iterable<OrderDoneLogRecord> orderRecords, Collector<Tuple2<String, Long>> collector) throws Exception {
      for (AnalyticsAccessLogRecord accessRecord : accessRecords) {
        boolean isMatched = false;
        for (OrderDoneLogRecord orderRecord : orderRecords) {
          // 右流中有对应的记录
          collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice()));
          isMatched = true;
        }
        if (!isMatched) {
          // 右流中没有对应的记录
          collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null));
        }
      }
    }
  })
  .print().setParallelism(1);


intervalJoin()


join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:


right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]


2.jpg

interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。


示例代码如下。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。


clickRecordStream
  .keyBy(record -> record.getMerchandiseId())
  .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
  .between(Time.seconds(-30), Time.seconds(30))
  .process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
    @Override
    public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
      collector.collect(StringUtils.join(Arrays.asList(
        accessRecord.getMerchandiseId(),
        orderRecord.getPrice(),
        orderRecord.getCouponMoney(),
        orderRecord.getRebateAmount()
      ), '\t'));
    }
  })
  .print().setParallelism(1);


由上可见,interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。


interval join 的实现原理


以下是 KeyedStream.process(ProcessJoinFunction) 方法调用的重载方法的逻辑。


public <OUT> SingleOutputStreamOperator<OUT> process(
        ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
        TypeInformation<OUT> outputType) {
    Preconditions.checkNotNull(processJoinFunction);
    Preconditions.checkNotNull(outputType);
    final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
    final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
        new IntervalJoinOperator<>(
            lowerBound,
            upperBound,
            lowerBoundInclusive,
            upperBoundInclusive,
            left.getType().createSerializer(left.getExecutionConfig()),
            right.getType().createSerializer(right.getExecutionConfig()),
            cleanedUdf
        );
    return left
        .connect(right)
        .keyBy(keySelector1, keySelector2)
        .transform("Interval Join", outputType, operator);
}


可见是先对两条流执行 connect() 和 keyBy() 操作,然后利用 IntervalJoinOperator 算子进行转换。在 IntervalJoinOperator 中,会利用两个 MapState 分别缓存左流和右流的数据。


private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
@Override
public void initializeState(StateInitializationContext context) throws Exception {
    super.initializeState(context);
    this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
        LEFT_BUFFER,
        LongSerializer.INSTANCE,
        new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
    ));
    this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
        RIGHT_BUFFER,
        LongSerializer.INSTANCE,
        new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
    ));
}


其中 Long 表示事件时间戳,List> 表示该时刻到来的数据记录。当左流和右流有数据到达时,会分别调用 processElement1() 和 processElement2() 方法,它们都调用了 processElement() 方法,代码如下。


@Override
public void processElement1(StreamRecord<T1> record) throws Exception {
    processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}
@Override
public void processElement2(StreamRecord<T2> record) throws Exception {
    processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}
@SuppressWarnings("unchecked")
private <THIS, OTHER> void processElement(
        final StreamRecord<THIS> record,
        final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
        final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
        final long relativeLowerBound,
        final long relativeUpperBound,
        final boolean isLeft) throws Exception {
    final THIS ourValue = record.getValue();
    final long ourTimestamp = record.getTimestamp();
    if (ourTimestamp == Long.MIN_VALUE) {
        throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
                "interval stream joins need to have timestamps meaningful timestamps.");
    }
    if (isLate(ourTimestamp)) {
        return;
    }
    addToBuffer(ourBuffer, ourValue, ourTimestamp);
    for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
        final long timestamp  = bucket.getKey();
        if (timestamp < ourTimestamp + relativeLowerBound ||
                timestamp > ourTimestamp + relativeUpperBound) {
            continue;
        }
        for (BufferEntry<OTHER> entry: bucket.getValue()) {
            if (isLeft) {
                collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
            } else {
                collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
            }
        }
    }
    long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
    if (isLeft) {
        internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
    } else {
        internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
    }
}


这段代码的思路是:


  1. 取得当前流 StreamRecord 的时间戳,调用 isLate() 方法判断它是否是迟到数据(即时间戳小于当前水印值),如是则丢弃。
  2. 调用 addToBuffer() 方法,将时间戳和数据一起插入当前流对应的 MapState。
  3. 遍历另外一个流的 MapState,如果数据满足前述的时间区间条件,则调用 collect() 方法将该条数据投递给用户定义的 ProcessJoinFunction 进行处理。collect() 方法的代码如下,注意结果对应的时间戳是左右流时间戳里较大的那个。


private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
    final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
    collector.setAbsoluteTimestamp(resultTimestamp);
    context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
    userFunction.processElement(left, right, context, collector);
}


  1. 调用 TimerService.registerEventTimeTimer() 注册时间戳为 timestamp + relativeUpperBound 的定时器,该定时器负责在水印超过区间的上界时执行状态的清理逻辑,防止数据堆积。注意左右流的定时器所属的 namespace 是不同的,具体逻辑则位于 onEventTime() 方法中。


@Override
public void onEventTime(InternalTimer<K, String> timer) throws Exception {
    long timerTimestamp = timer.getTimestamp();
    String namespace = timer.getNamespace();
    logger.trace("onEventTime @ {}", timerTimestamp);
    switch (namespace) {
        case CLEANUP_NAMESPACE_LEFT: {
            long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
            logger.trace("Removing from left buffer @ {}", timestamp);
            leftBuffer.remove(timestamp);
            break;
        }
        case CLEANUP_NAMESPACE_RIGHT: {
            long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
            logger.trace("Removing from right buffer @ {}", timestamp);
            rightBuffer.remove(timestamp);
            break;
        }
        default:
            throw new RuntimeException("Invalid namespace " + namespace);
    }
}


本文转载自简书,作者:LittleMagic
原文链接:https://www.jianshu.com/p/45ec888332df


社区二维码.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之使用StarRocks作为Lookup Join的表是否合适
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
1月前
|
存储 监控 Oracle
实时计算 Flink版产品使用问题之如何解决双流Join导致的状态膨胀和资源压力问题
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 传感器 数据处理
"揭秘实时流式计算:低延迟、高吞吐量的数据处理新纪元,Apache Flink示例带你领略实时数据处理的魅力"
【8月更文挑战第10天】实时流式计算即时处理数据流,低延迟捕获、处理并输出数据,适用于金融分析等需即时响应场景。其框架(如Apache Flink)含数据源、处理逻辑及输出目标三部分。例如,Flink可从数据流读取信息,转换后输出。此技术优势包括低延迟、高吞吐量、强容错性及处理逻辑的灵活性。
52 4
|
2月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
存储 JSON Kubernetes
实时计算 Flink版操作报错合集之 写入hudi时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
SQL 流计算
实时计算 Flink版操作报错合集之怎么向一个未定义列的表中写入数据
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
资源调度 分布式计算 Hadoop
实时计算 Flink版操作报错合集之perjob提交给yarn,报错显示无法连接yarn- Connecting to ResourceManager,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之运行mysql to doris pipeline时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之cdc postgres数据库,当表行记录修改后报错,该如何修改
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

相关产品

  • 实时计算 Flink版