问题一:Flink这个错,如何定位到底是那个文件的问题呢?
Flink这个错,如何定位到底是那个文件的问题呢?
参考答案:
根据报错信息建议检查如下:
- 所有的开始标签(start tags)都有对应的结束标签(end tags)。
- [row,col{unknow-source}]:[1,3681],这表示问题发生在文档的第1行,第3681个字符。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/623579
问题二:flink云顶一段时间之后就挂了,找不到原因,请问一下谁能帮我看一下?
flink云顶一段时间之后就挂了,找不到原因,请问一下谁能帮我看一下?
参考答案:
异常直接原因是 TaskManager 心跳超时,进一步原因可能有:
- 进程已退出,可能自身发生错误,或者受到 YARN RM 或 NM 上抢占机制影响,需要进一步追查 TaskManager 日志或 YARN RM/NM 日志;
- 进程仍在运行,集群网络问题造成失联,连接超时会自行退出,JobManager 在该异常后会 Failover 自行恢复(重新申请资源并启动新的 TaskManager);
- 进程 GC 时间过长,可能是内存泄露或内存资源配置不合理造成,可以借助Grafana上的监控来看对应的JM/TM的GC时长。
——参考链接。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/623567
问题三:Flink有知道这是啥问题的吗?
Flink有知道这是啥问题的吗?sumStream.sinkTo(new Elasticsearch6SinkBuilder()
.setBulkFlushMaxActions(1)
.setHosts(new HttpHost("10.237.226.35", 9600, "http"))
.setEmitter((element, context, indexer) -> indexer.add(Requests.indexRequest().index(ES_INDEX_NAME).type("_doc").source(element))).build());
参考答案:
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators()方法这个方法是出错的关键,恢复状态的时候中类型不匹配。查代码看看有没有数据类型与定义的操作符和转换器冲突的地方
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/659036
问题四:Flink需求: 两个设备日志流通过设备id和相近时间进行关联 请问我可能哪一步出问题了?
Flink需求:
两个设备日志流通过设备id和相近时间进行关联
设备日志流在kafka的aTopic和bTopic里面,由于有灰度设置,灰度设置分别对应aTopicGray,bTopicGray,他们互为灰度
所有flink的job消费的时候需要同时消费全量和灰度的topic。
会发生的情况:topic有多个分区, 切换过程中灰度的topic和非灰度的topic都有新数据,全量之后原Topic不会有新数据
并且topic里面数据只保留一段时间,过了几天之后数据会过期被清除
实现:
1,通过kafka消费两个日志流
KafkaSource source = KafkaSource.builder()
.setBootstrapServers(brokers)
.setTopics("aTopic,aTopicGray")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
2,将stream转成table,并且设置时间戳和水印,创建临时表。
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
.build());
tableEnv.createTemporaryView("MyView", dataStream);
3,使用sqlQuery来进行interval join,并且设置时间戳和水印,创建临时表
Table joinedTable =
tableEnv.sqlQuery(
"SELECT U.name, O.amount " +
"FROM UserTable U left join OrderTable O " +
"WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES");
DataStream join_table_stream = tableEnv.toDataStream(joinedTable);
Table join_table_stream_watermark =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("joinrowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("joinrowtime", "joinrowtime - INTERVAL '10' SECOND")
.build());
tEnv.createTemporaryView("join_table_stream_watermark", join_table_stream_watermark);
4,使用Window Top-N 来对关联进行去重,只保留关联到的第一条记录
SELECT *
FROM (
SELECT bidtime, price, item, supplier_id, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end,objId,aTime ORDER BY bTime DESC) as rownum
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
) WHERE rownum <= 1;
5,进行后续操作
遇到的问题:
在测试环境没有灰度和单分区的环境是正常运行的,到了生产环境就在Window Top-N最后这一步遇到的时间戳推进缓慢,并且输出的关联到的数据少
1,请问我可能哪一步出问题了?是什么问题?可以怎么解决?
2,当灰度和全量topic有一个topic数据都过期了的情况下是不是会影响?
3,Window Top-N对通过sqlQuery产生的joinTable这种临时表是否能够正常作用?
4,flink的水印时间和水印在我这里面的datastream和table之间转换和sqlQuery之后流的转换过程中是怎样流转和变化的?flink 1.17.2
参考答案:
如果只是仅仅保证最新一条数据的话,我觉得还不如直接一个topic一个分区一个并行度,更新写,当然如果数据量大,可以并发写多个字段,每个并行度写一个,然后取多个字段的最新的数据再合并,这个只是我的思路,因为我不知道你具体想要做啥,但是有主键就可以,不过还是要看你们底层的数据库是啥,我们是doris 可以这么干。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/621326
问题五:flink突然报错;无法启动了怎么解决?
flink突然报错;无法启动了怎么解决?
参考答案:
flink中有这样一个参数:akka.framesize,
是JobManager和TaskManager之间通信时发送的消息大小的最大值
可以在启动flink的时候加上 或者 在conf/flink-conf.yaml 配置上
启动应用的时候指定该参数格式如下:
flink run ... -yD akka.framesize="xxx" \ ...
——参考链接。
关于本问题的更多回答可点击进行查看: