有了这个Flink CDC,已经到增量阶段了,为啥数据库的processlist还是没减少,并行度是4, 到增量阶段,理论上数据库的process list应该是1, 对吧?// 节省资源
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("scan.incremental.close-idle-reader.enabled", "true");return new MySqlSourceBuilder()
.hostname(hostPortPair.getLeft())
.port(Integer.parseInt(hostPortPair.getRight()))
.databaseList(databaseTablePair.getLeft())
.tableList(databaseTablePair.getRight())
.username(fromDatabaseSetting.getUserName())
.serverTimeZone(DEFAULT_TIME_ZONE)
.jdbcProperties(jdbcProperties)
.debeziumProperties(debeziumProperties)
.serverId(drsSetting.getServerIdRange())
.password(fromDatabaseSetting.getPassword())
.deserializer(new ChangeRecordDebeziumDeserializationSchema()
.setDrsSetting(drsSetting)
.setOptions(options)
.setDingTalkManager(dingTalkManager)
.setDrsJobManager(drsJobManager))
.startupOptions(startupOptions)
.scanNewlyAddedTableEnabled(true)
// 忽略DDL事件
.includeSchemaChanges(false)
.build();
根据你提供的代码片段,我注意到你使用的是 MySQL CDC(MySqlSourceBuilder),而不是 MongoDB CDC。因此,关于 MongoDB 的 processlist 问题可能不适用于你的场景。
针对 MySQL CDC 的情况,在增量阶段数据库的 processlist 数量通常会保持在连接数的范围内,而不一定会减少到 1。这是因为 Flink CDC 在增量模式下会维护与 MySQL 数据库的持久连接,并通过 binlog 解析获取增量数据。因此,在并行度为 4 且启用多线程读取 binlog 的情况下,会创建多个连接来处理并发的 binlog 事件。
同时,Flink CDC 中的 scan.incremental.close-idle-reader.enabled
配置项是用来控制是否在空闲状态下关闭 binlog 读取器(reader)的。默认情况下,该选项值为 true
,意味着当没有新的 binlog 事件时,Flink CDC 会关闭空闲的 reader,以节省资源。但是,即使关闭了空闲的 reader,仍然会保留其他 reader 进行持续的 binlog 监听和数据读取操作。
因此,根据你的代码和描述,数据库的 processlist 没有减少到 1 是正常现象,因为 Flink CDC 在增量阶段会保持多个连接以处理并发的 binlog 事件,以提高处理效率。
请确保你的 MySQL 数据库的连接配置和资源配额能够满足 Flink CDC 的需求,以确保正常运行。如果你仍然遇到问题,可以提供更多相关的配置和上下文信息,以便我们更好地帮助你解决问题。
如果已经使用Flink CDC进入增量阶段,那么在增量阶段,数据库的processlist应该会减少。但是,如果在增量阶段,数据库的processlist没有减少,可能是由于以下原因之一导致的:
Flink作业可能没有正确地配置为增量模式。请确保已经将scan.incremental.enabled属性设置为true,并且已经将scan.incremental.max.records属性设置为一个足够小的值,以便只处理增量数据。
Flink作业可能正在并行处理多个任务。请确保Flink作业只有一个任务正在运行,并且该任务的并行度设置为4。
Flink作业可能正在处理大量的数据。请确保Flink作业具有足够的资源来处理这些数据。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。