开发者社区> 问答> 正文

SQL Cli中找不到DeserializationSchemaFactory

flink版本1.12.0:

我想在sql-client-defaults.yaml中配置一张表,配置如下:

tables:

  • name: t_users

type: source-table

connector:

property-version: 1

type: kafka

version: universal

topic: ods.userAnalysis.user_profile

startup-mode: latest-offset

properties:

bootstrap.servers: hostname:9092

group.id: flink-analysis

format:

type: debezium-avro-confluent

property-version: 1

debezium-avro-confluent.schema-registry.url: http://hostname:8081

#schema-registry.url: http://hostname:8081

schema:

  • name: userId

data-type: STRING

  • name: province

data-type: STRING

  • name: city

data-type: STRING

  • name: age

data-type: INT

  • name: education

data-type: STRING

  • name: jobType

data-type: STRING

  • name: marriage

data-type: STRING

  • name: sex

data-type: STRING

  • name: interest

data-type: STRING

我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.

at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)

at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in

the classpath.

Reason: Required context properties mismatch.

The following properties are requested:

connector.properties.bootstrap.servers=henghe66:9092

connector.properties.group.id=flink-analysis

connector.property-version=1

connector.startup-mode=latest-offset

connector.topic=ods.userAnalysis.user_profile

connector.type=kafka

connector.version=universal

format.debezium-avro-confluent.schema-registry.url= http://192.168.101.43:8081

format.property-version=1

format.type=debezium-avro-confluent

schema.0.data-type=VARCHAR(2147483647)

schema.0.name=userId

schema.1.data-type=VARCHAR(2147483647)

schema.1.name=province

schema.2.data-type=VARCHAR(2147483647)

schema.2.name=city

schema.3.data-type=INT

schema.3.name=age

schema.4.data-type=VARCHAR(2147483647)

schema.4.name=education

schema.5.data-type=VARCHAR(2147483647)

schema.5.name=jobType

schema.6.data-type=VARCHAR(2147483647)

schema.6.name=marriage

schema.7.data-type=VARCHAR(2147483647)

schema.7.name=sex

schema.8.data-type=VARCHAR(2147483647)

schema.8.name=interest

The following factories have been considered:

org.apache.flink.formats.avro.AvroRowFormatFactory

org.apache.flink.formats.csv.CsvRowFormatFactory

org.apache.flink.formats.json.JsonRowFormatFactory

at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)

at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)

at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)

at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)

at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)

at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)

at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)

at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)

at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)

at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)

at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646)

at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)

at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644)

at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)

at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:185)

at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:138)

at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)

... 3 more

此过程我不在sql-client-defaults.yaml中配置,而是启动sql cli后用DDL创建表是可以正常启动的。

所以难道是我在sql-client-defaults.yaml中配置错了吗?

请知道的大佬告知。

祝好!*来自志愿者整理的flink邮件归档

展开
收起
说了是一只鲳鱼 2021-12-06 11:29:45 879 0
1 条回答
写回答
取消 提交回答
  • YAML file 中定义的 source sink 是通过老的 factory 来寻找的,debezium format 只实现了新接口,所以会找不到。 目前也没有计划在 YAML 中支持新接口,因为 YAML 的方式已经被废弃了。 可以看下这个issue: https://issues.apache.org/jira/browse/FLINK-20260

    *来自志愿者整理的flink邮件归档

    2021-12-06 14:05:53
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server 2017 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载