flink版本1.12.0:
我想在sql-client-defaults.yaml中配置一张表,配置如下:
tables:
type: source-table
connector:
property-version: 1
type: kafka
version: universal
topic: ods.userAnalysis.user_profile
startup-mode: latest-offset
properties:
bootstrap.servers: hostname:9092
group.id: flink-analysis
format:
type: debezium-avro-confluent
property-version: 1
debezium-avro-confluent.schema-registry.url: http://hostname:8081
#schema-registry.url: http://hostname:8081
schema:
data-type: STRING
data-type: STRING
data-type: STRING
data-type: INT
data-type: STRING
data-type: STRING
data-type: STRING
data-type: STRING
data-type: STRING
我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)
at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.
Reason: Required context properties mismatch.
The following properties are requested:
connector.properties.bootstrap.servers=henghe66:9092
connector.properties.group.id=flink-analysis
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=ods.userAnalysis.user_profile
connector.type=kafka
connector.version=universal
format.debezium-avro-confluent.schema-registry.url= http://192.168.101.43:8081
format.property-version=1
format.type=debezium-avro-confluent
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=userId
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=province
schema.2.data-type=VARCHAR(2147483647)
schema.2.name=city
schema.3.data-type=INT
schema.3.name=age
schema.4.data-type=VARCHAR(2147483647)
schema.4.name=education
schema.5.data-type=VARCHAR(2147483647)
schema.5.name=jobType
schema.6.data-type=VARCHAR(2147483647)
schema.6.name=marriage
schema.7.data-type=VARCHAR(2147483647)
schema.7.name=sex
schema.8.data-type=VARCHAR(2147483647)
schema.8.name=interest
The following factories have been considered:
org.apache.flink.formats.avro.AvroRowFormatFactory
org.apache.flink.formats.csv.CsvRowFormatFactory
org.apache.flink.formats.json.JsonRowFormatFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)
at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)
at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)
at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)
at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)
at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644)
at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)
at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:185)
at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:138)
at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)
... 3 more
此过程我不在sql-client-defaults.yaml中配置,而是启动sql cli后用DDL创建表是可以正常启动的。
所以难道是我在sql-client-defaults.yaml中配置错了吗?
请知道的大佬告知。
祝好!*来自志愿者整理的flink邮件归档
YAML file 中定义的 source sink 是通过老的 factory 来寻找的,debezium format 只实现了新接口,所以会找不到。 目前也没有计划在 YAML 中支持新接口,因为 YAML 的方式已经被废弃了。 可以看下这个issue: https://issues.apache.org/jira/browse/FLINK-20260
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。