Dear All,
我在使用Flink SQL消费kafka并写入hive时报错,代码和报错信息如下
其中,Flink 版本为1.11.2,kafka connector使用的是flink-sql-connector-kafka。 搜索了很久,依然没有解决,相同的sql语句在flink sql clients可以执行,但在Java代码中,提交到hadoop集群后就报错如下:
请指教
Java Code
TableResult tableResult = tableEnvironment.executeSql("DROP TABLE IF EXISTS user_behavior_kafka_table"); tableResult.print(); TableResult tableResult2 = tableEnvironment.executeSql( "CREATE TABLE user_behavior_kafka_table (\r\n" + " user_id
STRING,\r\n" + " item_id
STRING\r\n" + " ) WITH (\r\n" + " 'connector' = 'kafka',\r\n" + " 'topic' = 'TestTopic',\r\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\r\n" + " 'properties.group.id' = 'consumerTest',\r\n" + " 'scan.startup.mode' = 'earliest-offset',\r\n" + " 'format' = 'json'\r\n" + ")"); tableResult2.print();
// 数据写入 tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnvironment.executeSql( "INSERT INTO user_behavior_hive_table SELECT user_id, item_id FROM user_behavior_kafka_table");
POM File
org.apache.flink flink-json ${flink.version}
org.apache.flink flink-streaming-java_2.11 ${flink.version} provided
org.apache.flink flink-clients_${scala.binary.version} ${flink.version} provided
org.apache.flink
flink-table-api-java-bridge_${scala.binary.version} ${flink.version} provided
org.apache.flink
flink-table-planner-blink_${scala.binary.version} ${flink.version} provided
org.apache.flink
flink-sql-connector-kafka_${scala.binary.version} ${flink.version}
org.apache.flink flink-shaded-hadoop-2-uber 2.7.5-10.0 provided
org.apache.flink flink-connector-hive_2.11 ${flink.version} provided
org.apache.hive hive-exec ${hive.version} provided
Error Messge
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='kafka''. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSource(HiveDynamicTableFactory.java:81) ~[flink-connector-hive_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:524) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at com.newegg.flink.sqlkafka.Main.main(Main.java:66) ~[flink-0.0.1-SNAPSHOT.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_191-ojdkbuild] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_191-ojdkbuild] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_191-ojdkbuild] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191-ojdkbuild] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.2.jar:1.11.2] ... 10 more Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.*来自志愿者整理的flink邮件归档
Maybe you miss the kafka connector dependency in your pom, you could refer to this url : https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。