开发者社区> 问答> 正文

直接使用ddl定义kafka数据源出现了问题怎么办?

大家好,我直接使用ddl定义kafka数据源出现了问题。

kafka里是logstash采上来的json格式数据。

ddl如下:

CREATE TABLE vpn_source (

c_real_ip VARCHAR,

d_real_ip VARCHAR,

c_real_port INT,

d_real_port INT,

logtype INT,

user VARCHAR,

host_ip VARCHAR

) WITH (

'connector.type' = 'kafka',

'connector.version' = 'universal',

'connector.topic' = 'vpnlog',

'connector.properties.zookeeper.connect' = 'localhost:2181',

'connector.properties.bootstrap.servers' = 'localhost:9092',

'connector.properties.group.id' = 'flink_test',

'format.type' = 'json'

)

报错如下:

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in

the classpath.

Reason: Required context properties mismatch.

The following properties are requested:

connector.properties.bootstrap.servers=10.208.0.73:9092

connector.properties.group.id=flink_test

connector.properties.zookeeper.connect=10.208.0.73:2181

connector.topic=vpnlog

connector.type=kafka

connector.version=universal

format.type=json

schema.0.data-type=VARCHAR(2147483647)

schema.0.name=c_real_ip

schema.1.data-type=VARCHAR(2147483647)

schema.1.name=d_real_ip

schema.2.data-type=INT

schema.2.name=c_real_port

schema.3.data-type=INT

schema.3.name=d_real_port

schema.4.data-type=INT

schema.4.name=logtype

schema.5.data-type=VARCHAR(2147483647)

schema.5.name=user

schema.6.data-type=VARCHAR(2147483647)

schema.6.name=host_ip

The following factories have been considered:

org.apache.flink.table.sources.CsvBatchTableSourceFactory

org.apache.flink.table.sources.CsvAppendTableSourceFactory

org.apache.flink.table.filesystem.FileSystemTableFactory

flink环境

本地源码编译的flink1.11,直接通过start-cluster.sh启动的本地环境。*来自志愿者整理的flink邮件归档

展开
收起
游客sadna6pkvqnz6 2021-12-07 17:01:40 898 0
1 条回答
写回答
取消 提交回答
  • 你的DDL没有问题,问题应该是你没有把kafka的jar包添加进来。你可以到 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html 这里下载kafaka的universal版本的jar包。关于如何把jar包添加到pyflink里面使用,你可以参考文档 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/faq.html#adding-jar-files*来自志愿者整理的flink

    2021-12-07 20:23:19
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载