开发者社区> 问答> 正文

pyflink创建KafkaSource的方法是什么?

pyflink创建KafkaSource的方法是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:27:41 418 0
1 条回答
写回答
取消 提交回答
  • env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///Users/bitekong/PycharmProjects/dp_stream/flink-sql-connector-kafka_2.12-1.13.0.jar")
    kafka_consumer = FlinkKafkaConsumer(
      topics='test',
      deserialization_schema=SimpleStringSchema(),
      properties={'bootstrap.servers': 'hadoop1:9092,hadoop2:9092,hadoop3:9092',
                  'auto.offset.reset': 'earliest'})
    ds = env.add_source(kafka_consumer)
    
    
    
    2021-12-07 15:28:00
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Monitoring the Dynamic Resource Usage of Scala and Python Spark Jobs in Yarn 立即下载
R&D To Product Pipeline Using Apache Spark in Adtech 立即下载
Building Competing Models using Spark DataFrames 立即下载