开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

各位, 请问: flink cdc, 用 flink sql 的方式 sink 到 kafka 可以

各位, 请问: flink cdc, 用 flink sql 的方式 sink 到 kafka 可以指定输出 schema 信息嘛? 看到好像只有 api 中可以指定 .deserializer(new JsonDebeziumDeserializationSchema(true)). flink sql 没办法嘛?

展开
收起
雪哥哥 2022-10-30 10:27:13 1336 0
2 条回答
写回答
取消 提交回答
  • flink-cdc-mysql2kafka 建立同步任务,可以使用sql如下: insert into product_view_kafka_sink select * from product_view_source; 这个时候是可以退出flink sql-client的,然后进入flink web-ui,可以看到mysql表数据已经同步到kafka中了,对mysql进行插入,kafka都是同步更新的。 image-20220914171441498 通过kafka控制台消费,可以看到数据已经从mysql同步到kafka了

    2022-11-30 07:47:58
    赞同 展开评论 打赏
  • 通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列。除了输出到文件,也可以输出到 Kafka。

    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.DataTypes
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, Schema}
    
    /**
     * @Package
     * @author 大数据老哥
     * @date 2020/12/18 16:51
     * @version V1.0
     */
    object FlinkSqlSourceFileSinkKafka {
      def main(args: Array[String]): Unit = {
        // 构建运行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // 构建表运行环境
        val tableEnv = StreamTableEnvironment.create(env)
        // 读取文件数据
        tableEnv.connect(new FileSystem().path("./data/user.txt"))
          .withFormat(new Csv())
          .withSchema(new Schema()
            .field("id", DataTypes.STRING())
            .field("name", DataTypes.STRING())
          ).createTemporaryTable("FileInput")
        // 设置kafka的输出
        tableEnv.connect(new Kafka()
          .version("0.11") // 设置kafka的版本
          .topic("FlinkSqlTest") // 设置要连接的主题
          .property("zookeeper.connect", "node01:2181,node02:2181,node03:2181") //设置zookeeper的连接地址跟端口号
          .property("bootstrap.servers", "node01:9092,node02:9092,node03:9092") //设置kafka的连接地址跟端口号
        ).withFormat(new Csv())
          .withSchema(new Schema()
            .field("id", DataTypes.STRING())
            .field("name", DataTypes.STRING())
          ).createTemporaryTable("outPutKafka")
    
        val res = tableEnv.sqlQuery("select  * from  FileInput")
    
        res.insertInto("outPutKafka")
        env.execute("FlinkSqlSourceFileSinkKafka")
      }
    
    }
    
    
    
    2022-11-24 07:30:26
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    SQL Server在电子商务中的应用与实践 立即下载
    GeoMesa on Spark SQL 立即下载
    原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载