开发者社区> 问答> 正文

pyflink中Transform里的connect的使用方法是什么?

pyflink中Transform里的connect的使用方法是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:33:40 319 0
1 条回答
写回答
取消 提交回答
  • 当command_stream没有数据进来的时候,http_data_stream的输出无论如何都是还没配置,当command_stream写入数据后,会执行map2的方法,即更新了配置项,对于接下来的http_data_stream来说就会以新的配置项来执行,输出为已经配置
    
    class MyCoMapFunction(CoMapFunction):
      	message = "还没配置"
    
        def map1(self, value):
          return MyCoMapFunction.test
    
        def map2(self, value):
          # 修改某个配置项
          MyCoMapFunction.test = "已经配置"
          return []
    
    http_data_stream = http_data_stream.connect(command_stream).key_by(lambda x: 0, lambda x: 0).map(
    		MyCoMapFunction(), Types.LIST(Types.STRING()))
    
    
    2021-12-07 15:33:56
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
Improving Python and Spark Performance and Interoperability with Apache Arrow 立即下载
BUILDING REALTIME DATA PIPELINES WITH KAFKA CONNECT AND SPARK STREAMING 立即下载
Monitoring the Dynamic Resource Usage of Scala and Python Spark Jobs in Yarn 立即下载