Flink sql 里面非binlog Source表,可以设置轮询时间吗?想着从hologre里面的A表,间隔两分钟,使用flinksql加工下,再存入hologres的B表,定时调度时间间隔最小5分钟,想把时间间隔缩小到1-2分钟。比如想做出一个召回集,给用户做推荐用;最初想着,间隔1分钟差不多;如果无限流读binlog相当于实时,这样的话,holo的B表的读写性能,我不确定会不会影响。
在Flink SQL中,可以通过设置source.interval
参数来控制非binlog Source表的轮询时间。例如,如果你想让从Hologres中的A表每隔2分钟读取一次数据,可以这样设置:
CREATE TABLE A_table (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'hologres',
'url' = 'jdbc:hive2://localhost:10000/default',
'username' = 'hive',
'password' = 'hive',
'table-name' = 'A',
'scan.startup.timestamp-millis' = '1629450000000', -- 这里设置你的起始时间戳
'scan.interval' = '2m' -- 这里设置轮询间隔为2分钟
);
然后,你可以使用Flink SQL对A表进行加工处理,并将结果存入Hologres的B表。关于读写性能的影响,由于Hologres是基于Hadoop的数据仓库,因此读写性能会受到Hadoop集群和硬件资源的限制。如果你发现性能不佳,可以考虑优化查询语句、调整并行度或者升级硬件资源。
支持不了。我在想你自己在sourcefunction中实现一个根据某一个Hologres A表的时间列定时polling,是不是也行,hologres的driver我理解pg就行。此回答整理自钉群“实时计算Flink产品交流群”
Apache Flink SQL本身并不直接支持非binlog源表设置轮询时间间隔,即不支持以分钟级别的定时任务方式从Hologres或其他数据库中定期拉取数据。Flink SQL更适合处理流式数据或者持续变更的数据源,例如通过CDC(Change Data Capture)读取binlog来实现实时数据同步。
针对你的需求,可以考虑以下变通方案:
使用外部调度系统:
可以利用阿里云DataWorks、Airflow、Zeppelin Notebook等外部调度工具,每隔1-2分钟触发一次Flink SQL作业,该作业从Hologres的A表查询数据并加工后写入B表。
内部循环+checkpoint机制:
如果一定要在Flink程序内部实现类似定时任务的功能,可以通过自定义SourceFunction或Table API结合事件时间窗口,并结合checkpoint和状态管理,模拟出轮询效果。但这需要编写额外的DataStream程序而非纯SQL作业。
Flink CDC + 定制化插件:
虽然标准的Flink CDC插件通常用于捕获binlog,但理论上如果能开发一个针对Hologres的定制化CDC插件,使其能够按照一定的时间间隔(如每两分钟)获取增量数据,再进行后续处理,这可能是一个可行的解决方案,但这通常超出了开箱即用功能的范畴。
考虑到Hologres的读写性能,即使采用实时流处理的方式,只要设计合理的并发度和数据分区策略,同时确保下游处理逻辑高效,通常是可以应对适度的实时数据流的。不过,在决定是否采用实时流处理之前,建议评估实际业务场景对推荐系统的实时性要求以及Hologres的实际负载能力,以确定最佳的数据更新频率和存储方案。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。