问题一:flink算子类在多个subtask中是各自初始化1个实例对象吗?
Hi,all:
flink算子在多个并行度的job中,每个算子(假如1个算子都有一个对应的java类)在多个subtask中会共享1个java类实例吗?还是每个subtask都会各自的实例?
我做了一个简单测试,写了一个flatmap类,在构造方法和open方法中打印类的toString方法,发现输出都不同,是否可以证明每个subtask都初始化了自己的类实例?
希望有朋友能解释下算子在job运行中初始化的过程。
测试相关代码如下:
// flink 1.10.2版本,并行度为3 @Slf4j public class PersonFlatMap extends RichFlatMapFunction<Tuple2<String, String>, Person> { private transient ValueState state;
public PersonFlatMap(){ log.info(String.format("PersonFlatMap【%s】: 创建实例",this.toString())); }
@Override public void open(Configuration parameters) throws IOException { //略去无关代码... log.info(String.format("PersonFlatMap【%s】:初始化状态!", this.toString())); }
@Override
public void flatMap(Tuple2<String, String> t, Collector collector) throws Exception { Person p = JSONUtil.toObject(t.f1,Person.class); collector.collect(p); if(state.value() == null){state.update(0);} state.update(state.value() + 1); log.info("state: "+state.value()); } }
//测试日志输出 ... flink-10.2 - [2020-11-16 13:41:54.360] - INFO [main] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@ba8d91c】: 创建实例
//此处略去无关日志...
flink-10.2 - [2020-11-16 13:42:00.326] - INFO [Flat Map -> Sink: Print to Std. Out (1/3)] org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory. flink-10.2 - [2020-11-16 13:42:00.351] - INFO [Flat Map -> Sink: Print to Std. Out (1/3)] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c9d895d】:初始化状态! flink-10.2 - [2020-11-16 13:42:00.354] - INFO [Flat Map -> Sink: Print to Std. Out (3/3)] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c489c40】:初始化状态! flink-10.2 - [2020-11-16 13:42:00.356] - INFO [Flat Map -> Sink: Print to Std. Out (2/3)] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@1cd4f5c9】:初始化状态! ...*来自志愿者整理的flink邮件归档
参考答案:
可以这么认为,大体上你可以认为每个并发有自己的环境。
技术上,算子对象是每个并发会实例化一个,而 static 变量的【共享】程度跟你设置的 slot per TM
值还有其他一些调度相关的参数有关,但是最好不要依赖这种实现层面的东西。
一种常见的误解是我创建一个 static HashMap 就神奇地拥有了全局的键值存储,这当然是不对的,只有在同一个 JVM 实例上也就是同一个 TM
上的任务才会看到同一个 HashMap 对象,而这几乎是不可控的。
可以看一下这篇文档[1]对物理部署的实际情况有一个基本的认知。
Best,
tison.
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/364551?spm=a2c6h.13066369.question.13.6ad26382p9RmWX
问题二:flink 1.11.2 如何配置时区
你好! 我使用的是flink sql 1.11.2版本,通过proctime()在源上添加处理时间,发现生成的时间为UTC时间,而我需要的是+08的时间;而我通过设计env.java.opts参数设计jvm的时区参数也没有解决,请问我如何配置才可以拿到+08的时间?
我的程序的数据是json格式输出*来自志愿者整理的flink邮件归档
参考答案:
- 现在 proctime() 在设计上确实有问题,目前返回类型是 timestamp, 而不是 timestamp with local time
zone, 所以不会考虑 session time zone,转成 string 会用 utc 时区。这个问题会在 FLINK-20162 1
中修复。
- 可以看下这个文档[2].
Best,
Jark
[2]:
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/364550?spm=a2c6h.13066369.question.14.6ad26382jcSTQP
问题三:flink1.11 sql ddl 连接kafka读取数据,使用事件时间窗口,无法触发,相同的sql
您好,请教您一个问题
flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发 create table kafka_table ( log_id string, event_date timestamp(3), process_time as PROCTIME(), ts as event_date, watermark for ts as ts - interval '1' second ) with ( 'connector' = 'kafka', 'topic' = 'kafka_table', 'properties.bootstrap.servers' = '10.2.12.3:9092', 'properties.group.id' = 'tmp-log-consumer003', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) 执行的sql是 select TUMBLE_START(kafka_table.event_date, INTERVAL '10' SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10' SECOND),src_ip,count(dest_ip) from kafka_table group by TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip
select log_id,process_time,ts from kafka_table查询的表结构如下 表结构为 root |-- log_id: STRING |-- process_time: TIMESTAMP(3) NOT NULL PROCTIME |-- ts: TIMESTAMP(3) ROWTIME
输入数据为 log_id,process_time,ts 13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806 13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806 13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806 13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806 13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806 13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806*来自志愿者整理的flink邮件归档
参考答案:
重复的问题。我将刚刚的回答也贴在这里。
如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发:
- 保证所有 partition 都有数据。
- 且每个 partition 数据的 event time 都在前进
- 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s =
11s
以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/364549?spm=a2c6h.13066369.question.15.6ad263820deoHJ
问题四:Re: flink-1.11 使用 application 模式时 jobid 问题
这个问题目前有 Issue 在跟踪吗?在 1.12 测试发现还是有这个问题,有办法解决吗?*来自志愿者整理的flink邮件归档
参考答案:
可以参考一下这篇文章: https://mp.weixin.qq.com/s/S_Spm88eDtbza1QoLKiWlg
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/364548?spm=a2c6h.13066369.question.14.6ad26382qM0Gyw
问题五:Flink与Yarn的状态一致性问题
最近在使用Flink-1.11.1 On Yarn Per Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn application仍处于运行状态
疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢*来自志愿者整理的flink邮件归档
参考答案:
PerJob 模式下,在作业完全挂掉之后,是会上报 YARN RM 应用自己失败的状态的。
当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。
你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/364547?spm=a2c6h.13066369.question.15.6ad26382asj0Fk