Flink需求:
两个设备日志流通过设备id和相近时间进行关联
设备日志流在kafka的aTopic和bTopic里面,由于有灰度设置,灰度设置分别对应aTopicGray,bTopicGray,他们互为灰度
所有flink的job消费的时候需要同时消费全量和灰度的topic。
会发生的情况:topic有多个分区, 切换过程中灰度的topic和非灰度的topic都有新数据,全量之后原Topic不会有新数据
并且topic里面数据只保留一段时间,过了几天之后数据会过期被清除
实现:
1,通过kafka消费两个日志流
KafkaSource source = KafkaSource.builder()
.setBootstrapServers(brokers)
.setTopics("aTopic,aTopicGray")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
2,将stream转成table,并且设置时间戳和水印,创建临时表。
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
.build());
tableEnv.createTemporaryView("MyView", dataStream);
3,使用sqlQuery来进行interval join,并且设置时间戳和水印,创建临时表
Table joinedTable =
tableEnv.sqlQuery(
"SELECT U.name, O.amount " +
"FROM UserTable U left join OrderTable O " +
"WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES");
DataStream join_table_stream = tableEnv.toDataStream(joinedTable);
Table join_table_stream_watermark =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("joinrowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("joinrowtime", "joinrowtime - INTERVAL '10' SECOND")
.build());
tEnv.createTemporaryView("join_table_stream_watermark", join_table_stream_watermark);
4,使用Window Top-N 来对关联进行去重,只保留关联到的第一条记录
SELECT *
FROM (
SELECT bidtime, price, item, supplier_id, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end,objId,aTime ORDER BY bTime DESC) as rownum
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
) WHERE rownum <= 1;
5,进行后续操作
遇到的问题:
在测试环境没有灰度和单分区的环境是正常运行的,到了生产环境就在Window Top-N最后这一步遇到的时间戳推进缓慢,并且输出的关联到的数据少
1,请问我可能哪一步出问题了?是什么问题?可以怎么解决?
2,当灰度和全量topic有一个topic数据都过期了的情况下是不是会影响?
3,Window Top-N对通过sqlQuery产生的joinTable这种临时表是否能够正常作用?
4,flink的水印时间和水印在我这里面的datastream和table之间转换和sqlQuery之后流的转换过程中是怎样流转和变化的?flink 1.17.2
如果只是仅仅保证最新一条数据的话,我觉得还不如直接一个topic一个分区一个并行度,更新写,当然如果数据量大,可以并发写多个字段,每个并行度写一个,然后取多个字段的最新的数据再合并,这个只是我的思路,因为我不知道你具体想要做啥,但是有主键就可以,不过还是要看你们底层的数据库是啥,我们是doris 可以这么干。此回答来自钉群Flink CDC 社区。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。