Flink 1.4.2 版本踩过的坑

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

0x1 摘要

最近业务要实时统计半小时维度的UV、PV数据,经过调研准备用Flink时间窗来实现,主要是Flink对eventTime的支持,可以做到更精准的统计,由于第一次尝试使用Flink,所以过程中遇到不少问题,记录下来方便后续查阅。

0x2 执行计划输出JSON问题

Flink对执行计划分析提供了支持,可以通过代码将执行计划打出来,并利用官网提供的图生成工具可以方便分析,通过env.getExecutionPlan()方法可以获取JSON格式的执行计划,将JSON字符串拷贝到http://flink.apache.org/visualizer/网站文本框就可以查看。
但我们在项目中调用env.getExecutionPlan()方法后报以下异常信息:

Caused by: java.lang.IllegalArgumentException: Comparison method violates its general contract!
    at java.util.TimSort.mergeLo(TimSort.java:777)
    at java.util.TimSort.mergeAt(TimSort.java:514)
    at java.util.TimSort.mergeForceCollapse(TimSort.java:457)
    at java.util.TimSort.sort(TimSort.java:254)
    at java.util.Arrays.sort(Arrays.java:1512)
    at java.util.ArrayList.sort(ArrayList.java:1454)
    at java.util.Collections.sort(Collections.java:175)
    at org.apache.flink.streaming.api.graph.JSONGenerator.getJSON(JSONGenerator.java:61)
    at org.apache.flink.streaming.api.graph.StreamGraph.getStreamingPlanAsJSON(StreamGraph.java:663)
    ... 2 more

通过异常信息可以知道问题发生在TimSort排序算法,意思就是比较方法违反约束,具体约束规范大家可以自行网上查阅:自反性、传递性、对称性。
我们来看一下Flink的源码,只要看JSONGenerator61行就可以:

public String getJSON() throws JSONException {
    JSONObject json = new JSONObject();
    JSONArray nodes = new JSONArray();
    json.put("nodes", nodes);
    List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());
    Collections.sort(operatorIDs, new Comparator<Integer>() {
        @Override
        public int compare(Integer o1, Integer o2) {
            // put sinks at the back
            if (streamGraph.getSinkIDs().contains(o1)) {
                return 1;
            } else if (streamGraph.getSinkIDs().contains(o2)) {
                return -1;
            } else {
                return o1 - o2;
            }
        }
    });
    visit(nodes, operatorIDs, new HashMap<Integer, Integer>());
    return json.toString();
}

从源码可以看在对操作ID做排序时Flink自己实现compare方法,具体这个方法的实际意义不是很明白,有明白的赐教一下。后来通过网上查阅已经有人提过此issues,地址:https://issues.apache.org/jira/browse/FLINK-8498,但状态是关闭的,也没有回复什么时候解决,但我们通过查看Flink GitHub源码发现,此处实现已经发生变更,源码地址https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
新实现源码:

public String getJSON() {
    ObjectNode json = mapper.createObjectNode();
    ArrayNode nodes = mapper.createArrayNode();
    json.put("nodes", nodes);
    List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());
    Collections.sort(operatorIDs, new Comparator<Integer>() {
        @Override
        public int compare(Integer idOne, Integer idTwo) {
            boolean isIdOneSinkId = streamGraph.getSinkIDs().contains(idOne);
            boolean isIdTwoSinkId = streamGraph.getSinkIDs().contains(idTwo);
            // put sinks at the back
            if (isIdOneSinkId == isIdTwoSinkId) {
                return idOne.compareTo(idTwo);
            } else if (isIdOneSinkId) {
                return 1;
            } else {
                return -1;
            }
        }
    });
    visit(nodes, operatorIDs, new HashMap<Integer, Integer>());
    return json.toString();
}

我猜测是因为违反了自反性导致的错误,那这个问题怎么解决呢?有两种方案:

  • 方案一:去掉env.getExecutionPlan()不打印执行计划
  • 方案二:设置JVM参数-Djava.util.Arrays.useLegacyMergeSort=true

0x3 不能连续split问题

场景描述,直接看拓扑图:
screenshot
希望达到上图的流拆分,但我开开心心把代码写后发布线上运行没有任何异常,等到验证数据时才发现最终统计数据不准A-1A-2的结果都是一样的,
issues地址:https://issues.apache.org/jira/browse/FLINK-5031

0x4 不能先process操作再split

在发现不能连续split后,只能想其他办法,将拓扑图改为:
screenshot
改为此方案后,线下运行直接报错,异常信息:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.api.graph.StreamGraph.addOutputSelector(StreamGraph.java:444)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformSplit(StreamGraphGenerator.java:267)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:176)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformSelect(StreamGraphGenerator.java:282)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:178)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformPartition(StreamGraphGenerator.java:241)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:184)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:527)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:132)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:124)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1528)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1540)
    ... 2 more

网上查阅发现存在issues,地址:https://issues.apache.org/jira/browse/FLINK-9141
最终改为先splitprocess方法搞定,拓扑图如下:
screenshot

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
7月前
|
SQL 流计算
Flink CDC 1.12版本引入了对SQL Server的支持
【1月更文挑战第26天】【1月更文挑战第124篇】Flink CDC 1.12版本引入了对SQL Server的支持
84 1
|
7月前
|
Java 数据库连接 数据库
,从Flink 1.13版本开始,Flink Connector JDBC已经被移到了一个独立的仓库
,从Flink 1.13版本开始,Flink Connector JDBC已经被移到了一个独立的仓库
344 1
|
3月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
7月前
|
SQL 算法 关系型数据库
实时计算 Flink版产品使用合集之哪个版本支持使用不锁表功能
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
关系型数据库 Java 分布式数据库
实时计算 Flink版操作报错合集之在使用 Python UDF 时遇到 requests 包的导入问题,提示 OpenSSL 版本不兼容如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
114 5
|
7月前
|
算法 关系型数据库 MySQL
实时计算 Flink版产品使用合集之哪个版本可以做增量快照算法
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用合集之2.2.1版本同步mysql数据写入doris2.0 ,同步完了之后增量的数据延迟能达到20分钟甚至一直不写入如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
124 1
|
7月前
|
消息中间件 资源调度 分布式计算
实时计算 Flink版产品使用合集之1.13版本上部署一个flink1.17为什么任务启动一直accepted状态yarn的,有什么排查方向吗资源什么的都是充足的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
Java Maven 流计算
在Docker跑通Flink分布式版本的WordCount
在Docker跑通Flink分布式版本的WordCount
63 0
|
7月前
|
关系型数据库 MySQL 数据库连接
实时计算 Flink版产品使用合集之是否支持MySQL 5.7以下的版本
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。