Flink报错问题之使用debezium-json format报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink sql报错 Could not find any factory for identif

各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. 请问是什么原因导致的呢?

代码如下:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);

tenv.executeSql("CREATE TABLE test_table (\n" + " id INT,\n" + " name STRING,\n" + " age INT,\n" + " create_at TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test_json',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'format' = 'json',\n" + " 'scan.startup.mode' = 'latest-offset'\n" + ")"); Table table = tenv.sqlQuery("select * from test_table"); tenv.toRetractStream(table, Row.class).print(); env.execute("flink 1.11.0 demo");

pom 文件如下:

<scala.binary.version>2.11</scala.binary.version> <flink.version>1.11.0</flink.version> org.apache.flink

flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink

flink-table-runtime-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.flink

flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink

flink-connector-kafka-0.11_${scala.binary.version} ${flink.version} org.apache.flink

flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-core ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-table-common ${flink.version}

*来自志愿者整理的flink邮件归档



参考答案:

这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream connector 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka datastream connector 同时引用是会冲突的,请根据你的需要使用。

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370166?spm=a2c6h.12873639.article-detail.98.6f9243783Lv0fl



问题二:使用Flink Array Field Type

Flink 1.10.0 问题描述:source表中有个test_array_string ARRAY 字段,在DML语句用test_array_string[0]获取数组中的值会报数组越界异常。另外测试过Array 也是相同错误,Array ,Array 等类型也会报数组越界问题。 请问这是Flink1.10的bug吗?

SQL: CREATETABLE source ( …… test_array_string ARRAY ) WITH ( 'connector.type'='kafka', 'update-mode'='append', 'format.type'='json' …… );

CREATETABLE sink( v_string string ) WITH ( …… );

INSERTINTO sink SELECT test_array_string[0] as v_string from source;

kafka样例数据:{"id":1,"test_array_string":["ff”]}

Flink 执行的时候报以下错误: java.lang.ArrayIndexOutOfBoundsException: 33554432 at org.apache.flink.table.runtime.util.SegmentsUtil.getByteMultiSegments(SegmentsUtil.java:598) at org.apache.flink.table.runtime.util.SegmentsUtil.getByte(SegmentsUtil.java:590) at org.apache.flink.table.runtime.util.SegmentsUtil.bitGet(SegmentsUtil.java:534) at org.apache.flink.table.dataformat.BinaryArray.isNullAt(BinaryArray.java:117) at StreamExecCalc$9.processElement(UnknownSource) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SourceConversion$1.processElement(UnknownSource) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:408) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)

*来自志愿者整理的flink邮件归档



参考答案:

SQL 中数据下标是从1开始的,不是从0,所以会有数组越界问题。建议使用数组时通过 select arr[5] from T where CARDINALITY(arr) >= 5 这种方式防止数组访问越界。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370165?spm=a2c6h.12873639.article-detail.99.6f9243783Lv0fl



问题三:flink 1.11 es未定义pk的sink问题

根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义: 不确定是我配置使用的方式不对,还是确实存在bug。。

CREATE TABLE ES6_SENSORDATA_OUTPUT ( event varchar, user_id varchar, distinct_id varchar, _date varchar, _event_time varchar, recv_time varchar, _browser_version varchar, path_name varchar, _search varchar, event_type varchar, _current_project varchar, message varchar, stack varchar, component_stack varchar, _screen_width varchar, _screen_height varchar ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<ES_YUNTU.SERVERS>', 'index' = 'flink_sensordata_target_event', 'document-type' = 'default', 'document-id.key-delimiter' = '$', 'sink.bulk-flush.interval' = '1000', 'failure-handler' = 'fail', 'format' = 'json' )

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling

*来自志愿者整理的flink邮件归档



参考答案:

如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].

所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。

[1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102

[2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370163?spm=a2c6h.12873639.article-detail.100.6f9243783Lv0fl



问题四:Table options do not contain an option key 'connec

Table options do not contain an option key 'connector' for discovering a connector.flink 1.11 sink hive table的connector设置为什么啊,尝试设置 WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='success-file'); 也报错误 query: streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) streamTableEnv.executeSql( """ | | |CREATE TABLE hive_table ( | user_id STRING, | age INT |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', | 'sink.partition-commit.trigger'='partition-time', | 'sink.partition-commit.delay'='1 h', | 'sink.partition-commit.policy.kind'='metastore,success-file' |) | |""".stripMargin)

streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) streamTableEnv.executeSql( """ | |CREATE TABLE kafka_table ( | uid VARCHAR, | -- uid BIGINT, | sex VARCHAR, | age INT, | created_time TIMESTAMP(3), | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND |) WITH ( | 'connector.type' = 'kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'user', | -- 'connector.topic' = 'user_long', | 'connector.startup-mode' = 'latest-offset', | 'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'connector.properties.group.id' = 'user_flink', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |) |""".stripMargin)

streamTableEnv.executeSql( """ | |INSERT INTO hive_table |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH') |FROM kafka_table | |""".stripMargin)

streamTableEnv.executeSql( """ | |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' | |""".stripMargin) .print() 错误栈: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.hive_table'.

Table options are:

'hive.storage.file-format'='parquet' 'is_generic'='false' 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' 'sink.partition-commit.delay'='1 h' 'sink.partition-commit.policy.kind'='metastore,success-file' 'sink.partition-commit.trigger'='partition-time' at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) at org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) ... 19 more

*来自志愿者整理的flink邮件归档



参考答案:

你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog

不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370161?spm=a2c6h.12873639.article-detail.101.6f9243783Lv0fl



问题五:flink使用debezium-json format报错

log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the log4j system properly. Exception in thread "main" org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table=[[default_catalog, default_database, ddd]], fields=[id, age]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitChildren$2(FlinkChangelogModeInferenceProgram.scala:626) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:614) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitSink$1(FlinkChangelogModeInferenceProgram.scala:690) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)

*来自志愿者整理的flink邮件归档



参考答案:

这是 changgelog 里的一个bug[1], 在1.11.1和master上已经修复,1.11.1社区已经在准备中了。

[1] https://issues.apache.org/jira/browse/FLINK-18461 https://issues.apache.org/jira/browse/FLINK-18461

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370159?spm=a2c6h.12873639.article-detail.102.6f9243783Lv0fl

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
JSON Java 关系型数据库
Java更新数据库报错:Data truncation: Cannot create a JSON value from a string with CHARACTER SET 'binary'.
在Java中,使用mybatis-plus更新实体类对象到mysql,其中一个字段对应数据库中json数据类型,更新时报错:Data truncation: Cannot create a JSON value from a string with CHARACTER SET 'binary'.
121 4
Java更新数据库报错:Data truncation: Cannot create a JSON value from a string with CHARACTER SET 'binary'.
|
5月前
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
5月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
5月前
|
SQL Java Apache
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之遇到了关于MySqIValidator类缺失的错误,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL 存储 资源调度
实时计算 Flink版操作报错合集之启动项目时报错缺少MySqlValidator类,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL Oracle NoSQL
实时计算 Flink版操作报错合集之报错“找不到对应的归档日志文件”,怎么处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到iava.lang.NoClassDefFoundError: ververica/cdc/common/utils/StrinaUtils错误,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
Oracle 关系型数据库 数据库连接
实时计算 Flink版操作报错合集之为什么使用StartupOptions.latest()能够正常启动而切换到StartupOptions.specificOffset时遇到报错
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

相关产品

  • 实时计算 Flink版