问题一:JobManager responsible for xxx lost the leadersh
JobManager responsible for ff2118284beed21ac220ee7cc0a639c0 lost the
leadership.
这种错误原因是什么,会导致任务重启,本身压力大,突然重启使用10分钟前的ckpt,压力更大了。*来自志愿者整理的flink邮件归档
参考答案:
你是 on-yarn 的模式吗? JobManager 并不是 worker,只是控制 Checkpoint ,接收 TM 的心跳等,可以看下在这个之前的其它日志。 还可以看下 ZK 是否正常等。 On-yarn 的话,也可以看下 NM 对这个AM处理 的日志。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371626?spm=a2c6h.13066369.question.79.6ad26382lyqdWM
问题二:keyBy的数据均衡性
我这边遇到一个情况比较奇怪。
(1)一整天数据的统计信息如下:
sid+subid+browser+ip: 13068577
sid+subid+browser+uid: 2962237
如上,sid和subid是渠道和子渠道,browser是浏览器,ip和uid都是一个相对变化较大的维度。
数字是distinct count信息。
(2)任务逻辑
流A,分别基于sid+subid+browser+ip和sid+subid+browser+uid组合维护做统计。window算子并发都是10。结果是sid+subid+browser+ip的window算子收到数据基本均衡(1.09G~1.48G),此处是指运行一段时间后。但sid+subid+browser+uid算子收到数据却很不均衡(230MB~6.84G)。
我的疑问是,虽然keyBy不能完全均衡,这很好理解。但是差距也太奇葩了。230MB和6.84G。
而且从统计信息看uid的确没有ip区分度大。但 sid+subid+browser+uid 的组合数达到 296w,并发才10,会这么不均衡的嘛?*来自志愿者整理的flink邮件归档
参考答案:
感觉好像有道理哈哈。
分析下:sid+subid+browser+uid 一共大约300w假设,*sid+subid+browser *则假设是300个。
那么uid=0的存在300种组合,即 *300w种组合 *中有 *300种组合(uid=0) *是相对大概率出现的。
那么这300种大概率出现的组合如果碰巧分布不够均衡,就会导致window算子部分不均衡。
之前我考虑了uid的问题,但想的是hash是一堆字段一起哈希,uid自身不均衡不会导致问题。但基于如上分析,貌似是有问题的。因为uid=0的组合数的
规模太小(300),如果这个规模稍微大点的话,uid的不均衡就不会导致这个问题了可能。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371627?spm=a2c6h.13066369.question.78.6ad26382oTAPra
问题三:union all 丢失部分数据
开发者好: 目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all ,但是实际情况发现 union all的时候会丢一部分数据,要么是各个部门的数据少了,要么是所有部门的总收入少了 如果把union all 的两段SQL 分别独立出来,插入同一张表,那么数据就是正常的,不知道是否是bug还是使用方法不对
原sql :
insert into dws_XXXX
select 0 as id ,cast (DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHH') as bigint) as ftime ,case when dept_name like '%XX%' then 'X1' when dept_name = 'xXX' then 'X2' else 'X3' end as paytype ,count(orderid) as paynum_h ,round(sum(amt)) as paymoney_h from dwd_XXX where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') group by DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHH'), case when dept_name like '%XX%' then 'X1' when dept_name = 'xXX' then 'X2' else 'X3' end ;
union all
select 0 as id ,cast (DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHH') as int) as ftime ,'all' as paytype ,count(orderid) as paynum_h ,round(sum(amt)) as paymoney_h from dwd_XXX where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') group by DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHH') ;*来自志愿者整理的flink邮件归档
参考答案:
你的 flink 版本是什么呢? 根据你的 SQL,如果是版本是 <= 1.10 的话,会根据 MetaDataHander 识别出你的 group by 后面的 key 作为 upsert key,这样就会产生覆盖的情况。 你看下结果是否是这种情况的?*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371628?spm=a2c6h.13066369.question.81.6ad26382TqkdaS
问题四:flink 1.11.0 chk超时
http://apache-flink.147419.n8.nabble.com/file/t538/QQ%E6%88%AA%E5%9B%BE20201105165123.jpg http://apache-flink.147419.n8.nabble.com/file/t538/QQ%E6%88%AA%E5%9B%BE20201105165200.jpg chk的历史如图,第三个subtask未能ack,同时在TM中只能找到如下信息:
2020-11-05 13:13:38,101 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 2 checkpointing for checkpoint with id=16 (max part counter=6). 2020-11-05 13:13:38,143 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 1 checkpointing for checkpoint with id=16 (max part counter=0). 2020-11-05 13:14:37,779 WARN org.apache.kafka.clients.NetworkClient [] - Connection to node -3 could not be established. Broker may not be available. 2020-11-05 13:14:37,786 WARN org.apache.kafka.clients.NetworkClient [] - Connection to node -2 could not be established. Broker may not be available. 2020-11-05 13:33:38,115 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task Source: Custom Source -> Process -> (Sink: ***, Sink: *** sink, Sink: ***) (3/3) (68bfa6305a9aa5a7381b9ca4a8fef2fa).
请路过的大佬们指点下排查方式,多谢(当前日志级别暂无法改成debug)*来自志愿者整理的flink邮件归档
参考答案:
CP 超时的原因一般是因任务而议的。从你提供的 2 张截图来看,卡在第二个 operator 的 subtask3 上。 上下两个 operator 之间的关系是 forworad 还是 reblance 呢?如果是 forward 的话,可以看下是不是数据倾斜,subtask3 需要处理的数据量比较多。 如果是 reblance 的话,以为 subtask1 和 subtask2 都成功了,所以上游的 barrier 应该都往下发了,所以 subtask3也收到了上游的 barrier,而 reblance 数据量都一样,所以可以看下是不是 sink 出去太慢导致。 查看任务一般可以看下任务的日志, GC,采堆栈,画火焰图等。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371633?spm=a2c6h.13066369.question.82.6ad26382EgonTS
问题五:flink-1.11 写 hive 报错
flink 读 kafka 写 hive,之前运行好好的。在IDEA也能正常运行,打成jar包,提交到 flink 集群,报错如下。请问是什么原因?
2020-11-05 15:34:36 org.apache.flink.connectors.hive.FlinkHiveException: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive RecordWriter at org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:159) at org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47) at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:257) at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:230) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$43.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$19.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive RecordWriter at org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:58) at org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151) ... 40 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55) ... 41 more Caused by: java.lang.NoSuchFieldError: IGNORE_CLIENT_LOCALITY at org.apache.hadoop.hdfs.DFSOutputStream. (DFSOutputStream.java:204) at org.apache.hadoop.hdfs.DFSOutputStream. (DFSOutputStream.java:247) at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:313) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1182) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1161) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1099) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:464) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:461) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:475) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:402) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083) at org.apache.parquet.hadoop.ParquetFileWriter. (ParquetFileWriter.java:218) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:312) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288) at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper. (ParquetRecordWriterWrapper.java:67) at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:126) at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:115) at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:284) ... 45 more*来自志愿者整理的flink邮件归档
参考答案:
感觉像是依赖冲突,hive和Hadoop的版本是什么呢?*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371647?spm=a2c6h.13066369.question.83.6ad26382ZFlwVi