开发者社区> 问答> 正文

sql执行的这种模式,是否可以考虑添加job名称的设置呢?

你好! >      我通过表环境执行insert into语句提交作业,我该如何设置我的job名称呢? > > > 程序: > EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); > TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings); > > String sourceDDL = "CREATE TABLE datagen (  " + >        " f_random INT,  " + >        " f_random_str STRING,  " + >        " ts AS localtimestamp,  " + >        " WATERMARK FOR ts AS ts  " + >        ") WITH (  " + >        " 'connector' = 'datagen',  " + >        " 'rows-per-second'='10',  " + >        " 'fields.f_random.min'='1',  " + >        " 'fields.f_random.max'='5',  " + >        " 'fields.f_random_str.length'='10'  " + >        ")"; > > bsTableEnv.executeSql(sourceDDL); > Table datagen = bsTableEnv.from("datagen"); > > System.out.println(datagen.getSchema()); > > String sinkDDL = "CREATE TABLE print_table (" + >        " f_random int," + >        " c_val bigint, " + >        " wStart TIMESTAMP(3) " + >        ") WITH ('connector' = 'print') "; > bsTableEnv.executeSql(sinkDDL); > > System.out.println(bsTableEnv.from("print_table").getSchema()); > > Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by TUMBLE(ts, INTERVAL '5' second), f_random"); > bsTableEnv.executeSql("insert into print_table select * from " + table); 你好! >      我通过表环境执行insert into语句提交作业,我该如何设置我的job名称呢? > > > 程序: > EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); > TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings); > > String sourceDDL = "CREATE TABLE datagen (  " + >        " f_random INT,  " + >        " f_random_str STRING,  " + >        " ts AS localtimestamp,  " + >        " WATERMARK FOR ts AS ts  " + >        ") WITH (  " + >        " 'connector' = 'datagen',  " + >        " 'rows-per-second'='10',  " + >        " 'fields.f_random.min'='1',  " + >        " 'fields.f_random.max'='5',  " + >        " 'fields.f_random_str.length'='10'  " + >        ")"; > > bsTableEnv.executeSql(sourceDDL); > Table datagen = bsTableEnv.from("datagen"); > > System.out.println(datagen.getSchema()); > > String sinkDDL = "CREATE TABLE print_table (" + >        " f_random int," + >        " c_val bigint, " + >        " wStart TIMESTAMP(3) " + >        ") WITH ('connector' = 'print') "; > bsTableEnv.executeSql(sinkDDL); > > System.out.println(bsTableEnv.from("print_table").getSchema()); > > Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by TUMBLE(ts, INTERVAL '5' second), f_random"); > bsTableEnv.executeSql("insert into print_table select * from " + table);

*来自志愿者整理的flink邮件归档

展开
收起
游客sadna6pkvqnz6 2021-12-07 17:10:32 542 0
1 条回答
写回答
取消 提交回答
  • 据我所知,这种执行方式目前没法设置 jobName*来自志愿者整理的flink

    2021-12-07 20:31:57
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载