各位, 请问: flink cdc, 用 flink sql 的方式 sink 到 kafka 可以指定输出 schema 信息嘛? 看到好像只有 api 中可以指定 .deserializer(new JsonDebeziumDeserializationSchema(true)). flink sql 没办法嘛?
flink-cdc-mysql2kafka 建立同步任务,可以使用sql如下: insert into product_view_kafka_sink select * from product_view_source; 这个时候是可以退出flink sql-client的,然后进入flink web-ui,可以看到mysql表数据已经同步到kafka中了,对mysql进行插入,kafka都是同步更新的。 image-20220914171441498 通过kafka控制台消费,可以看到数据已经从mysql同步到kafka了
通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列。除了输出到文件,也可以输出到 Kafka。
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, Schema}
/**
* @Package
* @author 大数据老哥
* @date 2020/12/18 16:51
* @version V1.0
*/
object FlinkSqlSourceFileSinkKafka {
def main(args: Array[String]): Unit = {
// 构建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 构建表运行环境
val tableEnv = StreamTableEnvironment.create(env)
// 读取文件数据
tableEnv.connect(new FileSystem().path("./data/user.txt"))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("name", DataTypes.STRING())
).createTemporaryTable("FileInput")
// 设置kafka的输出
tableEnv.connect(new Kafka()
.version("0.11") // 设置kafka的版本
.topic("FlinkSqlTest") // 设置要连接的主题
.property("zookeeper.connect", "node01:2181,node02:2181,node03:2181") //设置zookeeper的连接地址跟端口号
.property("bootstrap.servers", "node01:9092,node02:9092,node03:9092") //设置kafka的连接地址跟端口号
).withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("name", DataTypes.STRING())
).createTemporaryTable("outPutKafka")
val res = tableEnv.sqlQuery("select * from FileInput")
res.insertInto("outPutKafka")
env.execute("FlinkSqlSourceFileSinkKafka")
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。