开发者社区> 问答> 正文

pyflink中Transform里的map的使用方法是什么?

pyflink中Transform里的map的使用方法是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:33:09 256 0
1 条回答
写回答
取消 提交回答
  • class MyMapFunction(MapFunction):
      def open(self, runtime_context: RuntimeContext):
        # 数据量
        self.count_state = runtime_context.get_state(ValueStateDescriptor("count_state", Types.INT()))
    
        def map(self, value: Row):
          self.count_state.update((self.count_state.value() or 0) + 1)  # count_state += 1
          return value
    
    data_stream = data_stream.key_by(lambda x: x["id"]) \
        .map(MyMapFunction(), output_type=Types.TUPLE([Types.STRING(), Types.STRING()]))
    
    
    2021-12-07 15:33:26
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
Data Wrangling with PySpark for Data Scientists Who Know Pandas 立即下载
From Python Scikit learn to Scala Spark 立即下载
Building Competing Models using Spark DataFrames 立即下载