开发者社区> 问答> 正文

在追加模式下激活水印和窗口

下面结构化的流媒体代码水印和窗口数据,24小时间隔,15分钟幻灯片。代码在附加模式下仅生成空的批处理0。在更新模式下,结果会正确显示。需要附加模式,因为S3接收器仅在附加模式下工作。

String windowDuration = "24 hours";
String slideDuration = "15 minutes";
Dataset sliding24h = rowData

    .withWatermark(eventTimeCol, slideDuration)
    .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
            col(nameCol)).count();

sliding24h

    .writeStream()
    .format("console")
    .option("truncate", false)
    .option("numRows", 1000)
    .outputMode(OutputMode.Append())
    //.outputMode(OutputMode.Complete())
    .start()
    .awaitTermination();

以下是完整的测试代码:

public static void main(String [] args) throws StreamingQueryException {

 SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();

 ArrayList<String> rl = new ArrayList<>();
 for (int i = 0; i < 200; ++i) {
     long t = 1512164314L + i * 5 * 60;
     rl.add(t + ",qwer");
 }

 String nameCol = "name";
 String eventTimeCol = "eventTime";
 String eventTimestampCol = "eventTimestamp";

 MemoryStream<String> input = new MemoryStream<>(42, spark.sqlContext(), Encoders.STRING());
 input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
 Dataset<Row> stream = input.toDF().selectExpr(
         "cast(split(value,'[,]')[0] as long) as " + eventTimestampCol,
         "cast(split(value,'[,]')[1] as String) as " + nameCol);

 System.out.println("isStreaming: " +  stream.isStreaming());

 Column eventTime = functions.to_timestamp(col(eventTimestampCol));
 Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);

 String windowDuration = "24 hours";
 String slideDuration = "15 minutes";
 Dataset<Row> sliding24h = rowData
         .withWatermark(eventTimeCol, slideDuration)
         .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
                 col(nameCol)).count();

 sliding24h
         .writeStream()
         .format("console")
         .option("truncate", false)
         .option("numRows", 1000)
         .outputMode(OutputMode.Append())
         //.outputMode(OutputMode.Complete())
         .start()
         .awaitTermination();

}

展开
收起
社区小助手 2018-12-11 17:21:20 2663 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    这是2.4.0中已解决的错误请参阅:https : //issues.apache.org/jira/browse/SPARK-26167 https://issues.apache.org/jira/browse/SPARK-24156

    2019-07-17 23:19:55
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载