Hi,all 使用flink版本1.10.0,在hive catalog下建了映射kafka的表: CREATE TABLE x.log.yanfa_log ( dt TIMESTAMP(3), conn_id STRING, sequence STRING, trace_id STRING, span_info STRING, service_id STRING, msg_id STRING, servicename STRING, ret_code STRING, duration STRING, req_body MAP<String,String>, res_body MAP<STRING,STRING>, extra_info MAP<STRING,STRING>, WATERMARK FOR dt AS dt - INTERVAL '60' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'x-log-yanfa_log', 'connector.properties.bootstrap.servers' = '******:9092', 'connector.properties.zookeeper.connect' = '******:2181', 'connector.properties.group.id' = 'testGroup', 'connector.startup-mode' = 'group-offsets', 'update-mode' = 'append', 'format.type' = 'json', 'format.fail-on-missing-field' = 'true' ); 消费表x.log.yanfa_log程序如下: Catalog myCatalog = new HiveCatalog("x", "default", "D:\conf", "1.1.0"); tEnv.registerCatalog("x", myCatalog); Table rs = tEnv.sqlQuery("select * from x.log.yanfa_log"); tEnv.toAppendStream(rs, Row.class).print();
然后针对同一个程序启动了2个job,结果都输出了相同的结果。我的疑问是kafka topic的同一个partition不是只能被group下至多一个consumer消费吗?为什么2个job会输出相同结果呢?
来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档
Flink的Kafka Connector的实现是用的Kafka lower api,也就是会自己去获取当前的partition信息,自己来分配那些subtask读取那个partition。 所以如果有两个任务,他们互相之间是没有关系的,也不会相互感知到。(只有一点,就是如果你配置了相同的group id,他们提交offset可能会互相覆盖。) 你说的那个模式是Kafka high-level api。
来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。