Flink问题之程序直接结束如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:Flink1.11执行sql当判空使用<> null,程序直接结束怎么办?


环境:flink1.11: 代码如下: val dataStreamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv val tableEnv: StreamTableEnvironment = FlinkUtils.streamTableEnv val sql = """SELECT CASE WHEN kafka_table.log_type = 'detect' AND kafka_table.event_level = 3 THEN 3 ELSE 0 END as weight, kafka_table.src_ip as kafka_table_src_ip_0, kafka_table.dev_type as kafka_table_dev_type_0 FROM kafka_table WHERE kafka_table.event_time >= unix_timestamp() - 60 * 60 * 5 AND kafka_table.src_ip <> null AND kafka_table.event_level > 0 AND kafka_table.dev_type = 1

val data:Table = tableEnv.sqlQuery(sql) val result = tableEnv.toRetractStreamRow result.print("====>") """

现象:如果判空条件为kafka_table.src_ip <> null,则程序直接结束,没有任何报错,而使用kafka_table.src_ip is not null 可以正常运行并一直产生数据。

疑问:我明白is not null是正确的用法,问题是用<> null 为什么程序会直接结束而且没有任何报错,感觉像是当作批处理去运行了。*来自志愿者整理的flink邮件归档


参考回答:

这是因为kafka_table.src_ip <>

null是恒等于false的,所以这个计算过程就被优化掉了,最后你的作业的逻辑就变成了一个单纯的values,里面没有一条数据。

关于为什么kafka_table.src_ip <> null,这个可以了解一下关于three-value-logic[1].

简单来说,在标准SQL里面,boolean类型是有三种值的,正常的= <>这种算子跟null比较的时候,结果都是unknown,

然后这个在filter条件里面会被视作false。

[1] https://modern-sql.com/concept/three-valued-logic


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359292?spm=a2c6h.13262185.0.0.51804c79pMhZZN


问题二:关于statement输出结果疑问


from pyflink.table import EnvironmentSettings, StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()

table_env = StreamTableEnvironment.create(environment_settings=env_settings)

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])

table_env.create_temporary_view("simple_source", table)

table_env.execute_sql("""

CREATE TABLE first_sink_table (

id BIGINT,

data VARCHAR

) WITH (

'connector' = 'print'

)

""")

table_env.execute_sql("""

CREATE TABLE second_sink_table (

id BIGINT,

data VARCHAR

) WITH (

'connector' = 'print'

)

""")

‘# 创建一个statement对象

statement_set = table_env.create_statement_set()

‘# 使用TABLE API 将table表插进first_sink_table表里面

statement_set.add_insert("first_sink_table", table)

‘# 使用SQL将table表插进second_sink_table表里面

statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")

’# 执行查询

statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4> +I(1,Hi)

4> +I(1,Hi)

4> +I(2,Hello)

4> +I(2,Hello)*来自志愿者整理的flink邮件归档


参考回答:

奥,那你理解错了。这里面其实细分成2种情况: - sink1和sink2,通过operator chain之后,和source节点位于同一个物理节点:由于执行的时候,在一个JVM里,每处理一条数据,先发送给其中一个sink,再发送给另外一个sink。然后处理下一条数据 - sink1 和sink2,和source节点处于不同一个物理节点:这个时候是纯异步的,每处理一条数据,会分别通过网络发送给sink1和sink2,同时网络有buffer,所以sink1和sink2收到数据的顺序是完全不确定的。

但是不管是上面哪种情况,都不是先把数据全部发送给其中一个sink;再全部发送给第二个sink。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359295?spm=a2c6h.13262185.0.0.51804c79pMhZZN


问题三:pyflink UDTF有问题想要求助


定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 毫无头绪、有大佬遇到过吗?谢谢! 好像是 udf 和 udtf 一起使用时出现的~下面有可复现的例子,谢谢

class myKerasMLP(ScalarFunction):

def eval(self, *args): ...

返回预测结果

return str(trueY[0][0]) + '|' + str(trueY[0][1])

注册UDF函数 myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING())

class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1] yield str_arr[0], str_arr[1]

注册UDTF函数 splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()])

t_env.register_function('train_and_predict', myKerasMLP) t_env.register_function("splitStr", splitStr)

==================

t_env.sql_query(""" select A.hotime , A.before_ta , A.before_rssi , A.after_ta , A.after_rssil , A.nb_tath , A.nb_rssith , nbr_rssi , nbr_ta from (SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source) as A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta) """).insert_into("predict_sink")

==================== 报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 Traceback (most recent call last): File "C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py", line 280, in t_env.execute('NT重连预测参数') File "D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py", line 1057, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line 1286, in call answer, self.gateway_client, self.target_id, self.name) File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco return f(*a, **kw) File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute. : java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 at java.util.ArrayList.rangeCheck(ArrayList.java:657)

==================== 这段SQL可以执行 t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source """).insert_into("print_table")


+I(37,14,-66,92,-74,24,-65,22.621065|-64.12096) +I(291,136,-76,136,-78,22,-65,19.479145|-65.958)

====================== 简单可复现的例子 ========================

=======================SQL 源================= /* Navicat MySQL Data Transfer

Source Server : localhost Source Server Version : 50717 Source Host : localhost:3306 Source Database : nufront-nt

Target Server Type : MYSQL Target Server Version : 50717 File Encoding : 65001

Date: 2021-03-13 14:23:41 */

SET FOREIGN_KEY_CHECKS=0;


-- Table structure for test


DROP TABLE IF EXISTS test; CREATE TABLE test ( hotime varchar(5) DEFAULT NULL, before_ta varchar(5) DEFAULT NULL, before_rssi varchar(10) DEFAULT NULL, after_ta varchar(5) DEFAULT NULL, after_rssil varchar(10) DEFAULT NULL, nb_tath varchar(5) DEFAULT NULL, nb_rssith varchar(10) DEFAULT NULL, predict varchar(50) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;


-- Records of test


INSERT INTO test VALUES ('35', '8', '-62', '136', '-65', '20', '-65', '22.30014|-63.884907'); INSERT INTO test VALUES ('43', '8', '-71', '248', '-73', '20', '-65', '20.598848|-65.127464'); INSERT INTO test VALUES ('82', '216', '-74', '208', '-74', '20', '-65', '14.919615|-66.15158');

================== 程序 ===================

-- coding: utf-8 -

import logging import os

from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes from pyflink.table.udf import ScalarFunction, TableFunction, udf, udtf

env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

设置该参数以使用 UDF

t_env.get_config().get_configuration().set_boolean("python.fn-execution.4memory.managed", True) t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", "80m")

class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1]

splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()]) t_env.register_function("splitStr", splitStr)

class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1]

class myKerasMLP(ScalarFunction): def eval(self, *args):

拼接参数

a = '' for u in args: a += u + "|" return a

myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) t_env.register_function('train_and_predict', myKerasMLP)

t_env.execute_sql(""" CREATE TABLE print_table ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , predict STRING ) WITH ( 'connector' = 'print' ) """)

t_env.execute_sql(""" CREATE TABLE source ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , predict STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/nufront-nt', 'table-name' = 'test', 'username' = 'root', 'password' = '123456' ) """)

t_env.execute_sql(""" CREATE TABLE predict_sink ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , nbr_rssi STRING , nbr_ta STRING ) WITH ( 'connector' = 'print' ) """)

#######################################

可执行

#######################################

t_env.sql_query("""

select A.hotime ,

A.before_ta ,

A.before_rssi ,

A.after_ta ,

A.after_rssil ,

A.nb_tath ,

A.nb_rssith ,

nbr_rssi ,

nbr_ta

from (SELECT

hotime ,

before_ta ,

before_rssi ,

after_ta ,

after_rssil ,

nb_tath ,

nb_rssith ,

predict

FROM

source) as A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)

""").insert_into("predict_sink")

#######################################

执行报错

#######################################

t_env.sql_query("""

select A.hotime ,

A.before_ta ,

A.before_rssi ,

A.after_ta ,

A.after_rssil ,

A.nb_tath ,

A.nb_rssith ,

nbr_rssi ,

nbr_ta

from (SELECT

hotime ,

before_ta ,

before_rssi ,

after_ta ,

after_rssil ,

nb_tath ,

nb_rssith ,

train_and_predict(nb_tath, nb_rssith) predict

FROM

source) as A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)

""").insert_into("predict_sink")

####################################### ##可执行 ####################################### t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta) predict FROM source """).insert_into("print_table")

t_env.execute('pyflink UDTF')*来自志愿者整理的flink邮件归档


参考回答:

经过排查,这个确实一个bug。问题出在没有正确处理在sub-query中使用的python udf。我已经创建JIRA[1] 来记录这个问题了。目前的workaroud方案是使用Table API。 具体可以参考下面的代码:

a = t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(nb_tath, nb_rssith) predict FROM source """) result = a.join_lateral("splitStr(predict) as (nbr_rssi, nbr_ta)") [1] https://issues.apache.org/jira/browse/FLINK-21856


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359298?spm=a2c6h.13262185.0.0.51804c79pMhZZN


问题四:Flink1.11如何实现Tumble Window后基于event time倒序取第一条作统计?


Hi 大家好 现在想对5分钟的kafka数据开窗,因为是DTS同步消息数据,会有update 和 delete,所以需要对相同user_id的数据根据事件时间倒序第一条,统计最后一次status(状态字段)共有多少人。 marketingMapDS: DataStream[(String, String, Long)] | tEnv.createTemporaryView("test", marketingMapDS,"status","status","status", "upd_user_id", $"upd_time".rowtime) val resultSQL = """ |SELECT t.status, | COUNT(t.upd_user_id) as num |FROM ( | SELECT *, | ROW_NUMBER() OVER (PARTITION BY upd_user_id ORDER BY upd_time DESC) as row_num | FROM test |) t |WHERE t.row_num = 1 |GROUP BY t.status, TUMBLE(t.upd_time, INTERVAL '5' MINUTE) |""".stripMargin val table2 = tEnv.sqlQuery(resultSQL) val resultDS = tEnv.toRetractStreamRow |

这样写后会报以下错: | Exception in thread "main" org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[upd_user_id], orderBy=[upd_time DESC], select=[status, upd_user_id, upd_time]) |

所以想实现该需求,请问还可以怎么实现。。。

TABLE API 可以实现 类似 ROW_NUMBER() OVER 这样功能吗? | val table = tEnv.fromDataStream(marketingMapDS, "status","status","status", "upd_user_id", "updtime".rowtime).window(Tumbleover5.millison"updtime".rowtime).window(Tumbleover5.millison"upd_time".rowtime) .window(Tumble over 5.millis on "upd_time" as "w") .groupBy($"w") ??? |

Flink新手一个。。。请大佬指点~*来自志愿者整理的flink邮件归档


参考回答:

GroupWindowAggregate不支持update或者delete的datasource。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359300?spm=a2c6h.13262185.0.0.51804c79pMhZZN


问题五:Flink sql 如何实现全局row_number()分组排序?


在做实时数仓的时候,有需求要使用flink sql实现全局的row_number(),请教下各位有啥方案吗?

目前想的是,将流进行row number处理后存储到hbase中,然后每次处理流数据都和hbase进行关联,row_number处理后将最新结果存入hbase中,即通过对hbase的实时读写实现全局row_number(). 请问以上方法可行不,,实时读hbase关联,然后在写入最新数据到hbase,效率会有问题吗,这样能满足实时的需求吗?*来自志愿者整理的flink邮件归档


参考回答:

直接 SQL Top-N 即可: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359314?spm=a2c6h.13262185.0.0.54e839c0D2mgIx

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
消息中间件 分布式计算 大数据
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
48 0
|
2月前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
2月前
|
分布式计算 监控 大数据
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
54 0
|
4月前
|
Java 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何在程序因故停掉后能从之前的Binlog位置继续读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
45 0
|
7月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之程序初始化mysql没有完成就报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
291 58
|
5月前
|
SQL Java 数据库
实时计算 Flink版产品使用问题之Spring Boot集成Flink可以通过什么方式实现通过接口启动和关闭Flink程序
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
Java 调度 流计算
基于多线程的方式优化 FLink 程序
这篇内容介绍了线程的基本概念和重要性。线程是程序执行的最小单位,比进程更细粒度,常用于提高程序响应性和性能。多线程可以实现并发处理,利用多核处理器,实现资源共享和复杂逻辑。文章还讨论了线程的五种状态(NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING和TERMINATED)以及如何在Java中创建和停止线程。最后提到了两种停止线程的方法:使用标识和中断机制。
186 5
|
7月前
|
流计算
实时计算 Flink版操作报错之程序在idea跑没问题,打包在服务器跑就一直报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
7月前
|
消息中间件 NoSQL MongoDB
实时计算 Flink版产品使用合集之抽取数据时,表结构发生变化,从而导致程序抛出异常,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 实时计算 Flink版