本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的), sql如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
Catalog catalog = new HiveCatalog("x", "default", "D:\conf", "1.1.0");
tEnv.registerCatalog("x", catalog);
TableResult execute = tEnv.executeSql("select * from x.ods.ods_binlog_test_trip_create_t_order_1");
execute.print();
建表语句如下:
CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 (
_type STRING,
order_no STRING,
order_time STRING,
dt as TO_TIMESTAMP(order_time),
proctime as PROCTIME(),
WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.properties.bootstrap.servers' = '***',
'connector.properties.zookeeper.connect' = '****',
'connector.version' = 'universal',
'format.type' = 'json',
'connector.properties.group.id' = 'testGroup',
'connector.startup-mode' = 'group-offsets',
'connector.topic' = 'test'
)
*来自志愿者整理的flink邮件归档
1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下, 都是exactly once语义,需要配置checkpoint才能得到结果。
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。