代码本地ide 能正常执行, 有正常输出,
打包成fat-jar包后,提交到yarn-session 上执行 报: Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found.
请教下是什么原因?
lib目录下文件为: flink-dist_2.11-1.9.1.jar flink-sql-connector-kafka-0.10_2.11-1.9.0.jar flink-sql-connector-kafka_2.11-1.9.0.jar log4j-1.2.17.jar flink-json-1.9.0-sql-jar.jar flink-sql-connector-kafka-0.11_2.11-1.9.0.jar flink-table_2.11-1.9.1.jar slf4j-log4j12-1.7.15.jar flink-shaded-hadoop-2-uber-2.6.5-7.0.jar flink-sql-connector-kafka-0.9_2.11-1.9.0.jar flink-table-blink_2.11-1.9.1.jar
代码:
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.types.Row
object StreamingTable2 extends App{
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
env.setParallelism(2)
val sourceDDL1 =
"""create table kafka_json_source(
`timestamp` BIGINT,
id int,
name varchar
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'hbtest2',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'bootstrap.servers',
'connector.properties.0.value' = '192.168.1.160:19092',
'connector.properties.1.key' = 'group.id',
'connector.properties.1.value' = 'groupId1',
'connector.properties.2.key' = 'zookeeper.connect',
'connector.properties.2.value' = '192.168.1.160:2181',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
)
"""
tEnv.sqlUpdate(sourceDDL1)
tEnv.sqlQuery("select * from kafka_json_source").toAppendStream[Row].print()
env.execute("table-example2")
}
```*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。