开发者社区> 问答> 正文

pyflink中使用file->JDBC的流式处理的方法是什么?

pyflink中使用file->JDBC的流式处理的方法是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:30:39 734 0
1 条回答
写回答
取消 提交回答
  • from apache_beam.io.fileio import FileSink
    from pyflink.common import WatermarkStrategy, Row
    from pyflink.common.serialization import Encoder
    from pyflink.common.typeinfo import Types, RowTypeInfo
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors import FileSource, StreamFormat, FileSink, JdbcSink, JdbcConnectionOptions
    
    if __name__ == '__main__':
        # 创建流式处理环境
        env = StreamExecutionEnvironment.get_execution_environment()
        # 导入必要的包
        env.add_jars("file:///Users/xiangyang/PycharmProjects/jyyc_dp_stream/flink-connector-jdbc_2.11-1.13.0.jar")
        env.add_jars("file:///Users/xiangyang/PycharmProjects/jyyc_dp_stream/mysql-connector-java-8.0.21.jar")
    
        # 创建source
        file_source = FileSource \
            .for_record_stream_format(StreamFormat.text_line_format(), "./test.log") \
            .build()
    
        # 将source添加到环境中,环境会生成一个datastream,也就是我们进行操作的数据类
        ds = env.from_source(file_source, WatermarkStrategy.for_monotonous_timestamps(), "test")
    
        # transform
        ds = ds.map(lambda x: [x], output_type=Types.ROW([Types.STRING()]))
    
        # sink
        ds.print()
    
        jdbc_options = JdbcConnectionOptions.JdbcConnectionOptionsBuilder() \
            .with_user_name("xxxxxx") \
            .with_password("xxxxxx") \
            .with_driver_name("com.mysql.cj.jdbc.Driver") \
            .with_url("jdbc:mysql://localhost:3306/test_db") \
            .build()
    
        ds.add_sink(JdbcSink.sink("insert test_table(id, message) VALUES(null, ?)",
                                  type_info=Types.ROW([Types.STRING()]),
                                  jdbc_connection_options=jdbc_options))
        # 真正执行代码
        env.execute("ANY_NAME")
    
    
    
    2021-12-07 15:30:51
    赞同 展开评论 打赏
问答标签:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
Scaling 30 TB’s of Data Lake with Apache HBase and Scala DSL at Production 立即下载
Scaling 30 TB\'s of Data lake with Apache HBase and Scala DSL at Production 立即下载
File Format Benchmark- Avro, J 立即下载