开发者社区> 问答> 正文

使用datagen connector生成无界数据,一秒时间窗口的聚合操作,一直没有数据输出打印

以下程序运行,控制台一直没有数据输出1. 程序package kafka;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DataGenTest {

public static void main(String[] args) {

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

String sourceTableDDL = "CREATE TABLE datagen ( " + " f_random INT, " + " f_random_str STRING, " + " ts AS localtimestamp, " + " WATERMARK FOR ts AS ts " + ") WITH ( " + " 'connector' = 'datagen', " + " 'rows-per-second'='20', " + " 'fields.f_random.min'='1', " + " 'fields.f_random.max'='10', " + " 'fields.f_random_str.length'='10' " + ")";

bsTableEnv.executeSql(sourceTableDDL);

bsTableEnv.executeSql("SELECT f_random, count(1) " + "FROM datagen " + "GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print();

}

}2. 控制台,log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. +-------------+----------------------+ | f_random | EXPR$1 | +-------------+----------------------+*来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-07 09:58:28 623 0
1 条回答
写回答
取消 提交回答
  • TableResult.print() 方法目前只支持了 exactly-once 语义,在 streaming 模式下必须设置checkpoint才能work, 你配置下checkpoint之后再试下,支持 At Least Once 的方法在1.12里应该会支持,支持后可以不用设置 checkpoint。*来自志愿者整理的flink邮件归档

    2021-12-07 11:45:40
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink 流式应用中状态的数据结构定义升级 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载