Flink报错问题之insert语句报错如何解决

简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:关于filesystem connector的一点疑问

Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交 现在有这样的场景: 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? 有大佬知道吗,有实际验证过吗 感谢

附上简单sql: CREATE TABLE kafka ( a STRING, b STRING, c BIGINT, process_time BIGINT, e STRING, f STRING, g STRING, h INT, i STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = 'x', 'properties.group.id' = 'test-1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.flink.partition-discovery.interval-millis' = '300000' );

CREATE TABLE filesystem ( day STRING, hour STRING, a STRING, b STRING, c BIGINT, d BIGINT, e STRING, f STRING, g STRING, h INT, i STRING ) PARTITIONED BY (day, hour) WITH ( 'connector' = 'filesystem', 'format' = 'parquet', 'path' = 'hdfs://xx', 'parquet.compression'='SNAPPY', 'sink.partition-commit.policy.kind' = 'success-file' );

insert into filesystem select from_unixtime(process_time,'yyyy-MM-dd') as day, from_unixtime(process_time,'HH') as hour, a, b, c, d, e, f, g, h, i from kafka;

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger*来自志愿者整理的flink邮件归档



参考答案:

按照我的理解,partition time提交分区,是会在current watermark > partition time + commit delay 时机触发分区提交,得看你的sink.partition-commit.delay 设置的多久,如果超过之后,应当默认是会丢弃的吧。

https://cloud.tencent.com/developer/article/1707182

这个连接可以看一下 *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364541?spm=a2c6h.13066369.question.21.6ad263827fpfa7



问题二:flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

我使用的flink 1.11.0版本 代码如下 StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv); tableEnvironment.executeSql(" " + " CREATE TABLE mySource ( " + " a bigint, " + " b bigint " + " ) WITH ( " + " 'connector.type' = 'kafka', " + " 'connector.version' = 'universal', " + " 'connector.topic' = 'mytesttopic', " + " 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " + " 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " + " 'connector.properties.group.id' = 'flink-test-cxy', " + " 'connector.startup-mode' = 'latest-offset', " + " 'format.type' = 'json' " + " ) "); tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " + " id bigint, " + " game_id varchar, " + " PRIMARY KEY (id) NOT ENFORCED " + " ) " + " with ( " + " 'connector.type' = 'jdbc', " + " 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + " 'connector.username' = 'root' , " + " 'connector.password' = 'root', " + " 'connector.table' = 'mysqlsink' , " + " 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " + " 'connector.write.flush.interval' = '2s', " + " 'connector.write.flush.max-rows' = '300' " + " )"); tableEnvironment.executeSql("insert into mysqlsink (id,game_id) values (select a,cast(b as varchar) b from mySource)");

问题一 : 上面的insert语句会出现如下错误 Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(BIGINT A, VARCHAR(2147483647) B)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)'

问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错 Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY'*来自志愿者整理的flink邮件归档



参考答案:

Hi,

看你还是用的还是 " 'connector.type' = 'jdbc', …. " ,这是老的option,使用老的option参数还是需要根据query推导主键, 需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式.

Best Leonard [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371059?spm=a2c6h.13066369.question.22.6ad26382Kh6myX



问题三:Flink sql 转义字符问题

SPLIT_INDEX(${xxx}, ';', 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~*来自志愿者整理的flink邮件归档



参考答案:

SPLIT_INDEX(${xxx}, ‘;’, 0)

‘;’ 分号不是特殊字符,编译时应该不会报错的,我在Flink 1.11.1 用DDL 测试了下, 能够work的,不知道你的环境是怎样的。 U&'\003B' 是 ; 的 unicode编码,所以用这个unicode编码是可以的,但一般这种用法是在需要用不可见字符分割时我们这样使用, 比如 \n 对应的s是 U&'\000A’ ,\r 对应的是 U&'\000D’, 对于分号这种可见字符来讲,不需要用unicode编码就可以的。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371063?spm=a2c6h.13066369.question.25.6ad26382Ri4TKS



问题四:flink kafka SQL Connectors 传递kerberos 参数

flink v1.11.1 kafka使用了kerberos 下面DDL 是支持 kerberos 参数

CREATETABLEkafkaTable( ... )WITH('connector'='kafka', 'topic'='user_behavior', 'properties.bootstrap.servers'='localhost:9092', 'properties.group.id'='testGroup', 'security.protocol'='SASL_PLAINTEXT', 'sasl.mechanism'='GSSAPI', 'sasl.kerberos.service.name'='kafka', 'format'='csv', 'scan.startup.mode'='earliest-offset' )

是否支持上面的参数?*来自志愿者整理的flink邮件归档



参考答案:

不知道你的问题是能否通过这个解决

我看了下目前文档里缺少了传递kafka properties 的部分,我建了个issue[1]把文档补齐

Best Leonard [1] https://issues.apache.org/jira/browse/FLINK-18768 https://issues.apache.org/jira/browse/FLINK-18768*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371068?spm=a2c6h.13066369.question.26.6ad26382omnRLj



问题五:Flink使用Kafka作为source时checkpoint成功提交offset的机制

大家好,请教一个问题, 当Flink在使用Kafka作为source,并且checkpoint开启时,提交的offset是在消息被链路最后一个算子处理之后才会把这条offset提交吗? 还是说这个消息只要从Kafka source读取后,不论链路处理到哪个算子, checkpoint成功时就会把它的offset提交呢?

另外有大神指路这段代码具体在哪个文件吗? 谢谢!

-- Best Wishes, Shuwen Zhou*来自志愿者整理的flink邮件归档



参考答案:

似乎楼主一开始说的checkpoint成功是指source 算子的checkpoint成功?但notifyCheckpointComplete函数要求的是整个链路的chk成功。 这个时候offset为100的消息必然已经被sink算子处理完成了,因为触发chk的屏障消息必然在offset100的消息之后到达sink算子。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371486?spm=a2c6h.13066369.question.25.6ad26382XYUvkP

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
SQL Java Apache
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之遇到了关于MySqIValidator类缺失的错误,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
SQL 存储 资源调度
实时计算 Flink版操作报错合集之启动项目时报错缺少MySqlValidator类,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
SQL Oracle NoSQL
实时计算 Flink版操作报错合集之报错“找不到对应的归档日志文件”,怎么处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到iava.lang.NoClassDefFoundError: ververica/cdc/common/utils/StrinaUtils错误,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
Oracle 关系型数据库 数据库连接
实时计算 Flink版操作报错合集之为什么使用StartupOptions.latest()能够正常启动而切换到StartupOptions.specificOffset时遇到报错
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
存储 缓存 Java
实时计算 Flink版操作报错合集之怎么处理在运行作业时遇到报错::ClassCastException
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

相关产品

  • 实时计算 Flink版