大家好, 升级到1.9后有几个问题: 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011
val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties) 但是现在这个类已经找不到了
2.所以我使用了 FlinkKafkaConsumer val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties) 不知道这个consumer背后对应的kafka版本是多少
3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version}
不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory
引入flink-table-api-java-bridge_${scala.binary.version}后还是报错: Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336) at com.test.StreamingJob$.main(StreamingJob.scala:52) at com.test.StreamingJob.main(StreamingJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.
The following properties are requested: batch-mode=false
The following factories have been considered: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
我的pom文件如下:
org.apache.flink flink-scala_${scala.binary.version} ${flink.version} provided org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} provided
org.scala-lang scala-library ${scala.version} providedorg.apache.flink flink-connector-kafka_2.11 ${flink.version} compile org.apache.flink flink-table-common ${flink.version}
org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-scala-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version}
谢谢大家*来自志愿者整理的flink邮件归档
Hi,
你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting 声明 blink planner。 详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。