开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

有了这个Flink CDC,已经到增量阶段了,为啥数据库的processlist还是没减少,并行度?

有了这个Flink CDC,已经到增量阶段了,为啥数据库的processlist还是没减少,并行度是4, 到增量阶段,理论上数据库的process list应该是1, 对吧?// 节省资源
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("scan.incremental.close-idle-reader.enabled", "true");return new MySqlSourceBuilder()
.hostname(hostPortPair.getLeft())
.port(Integer.parseInt(hostPortPair.getRight()))
.databaseList(databaseTablePair.getLeft())
.tableList(databaseTablePair.getRight())
.username(fromDatabaseSetting.getUserName())
.serverTimeZone(DEFAULT_TIME_ZONE)
.jdbcProperties(jdbcProperties)
.debeziumProperties(debeziumProperties)
.serverId(drsSetting.getServerIdRange())
.password(fromDatabaseSetting.getPassword())
.deserializer(new ChangeRecordDebeziumDeserializationSchema()
.setDrsSetting(drsSetting)
.setOptions(options)
.setDingTalkManager(dingTalkManager)
.setDrsJobManager(drsJobManager))
.startupOptions(startupOptions)
.scanNewlyAddedTableEnabled(true)
// 忽略DDL事件
.includeSchemaChanges(false)
.build();

展开
收起
真的很搞笑 2023-07-31 14:35:16 100 0
2 条回答
写回答
取消 提交回答
  • 根据你提供的代码片段,我注意到你使用的是 MySQL CDC(MySqlSourceBuilder),而不是 MongoDB CDC。因此,关于 MongoDB 的 processlist 问题可能不适用于你的场景。

    针对 MySQL CDC 的情况,在增量阶段数据库的 processlist 数量通常会保持在连接数的范围内,而不一定会减少到 1。这是因为 Flink CDC 在增量模式下会维护与 MySQL 数据库的持久连接,并通过 binlog 解析获取增量数据。因此,在并行度为 4 且启用多线程读取 binlog 的情况下,会创建多个连接来处理并发的 binlog 事件。

    同时,Flink CDC 中的 scan.incremental.close-idle-reader.enabled 配置项是用来控制是否在空闲状态下关闭 binlog 读取器(reader)的。默认情况下,该选项值为 true,意味着当没有新的 binlog 事件时,Flink CDC 会关闭空闲的 reader,以节省资源。但是,即使关闭了空闲的 reader,仍然会保留其他 reader 进行持续的 binlog 监听和数据读取操作。

    因此,根据你的代码和描述,数据库的 processlist 没有减少到 1 是正常现象,因为 Flink CDC 在增量阶段会保持多个连接以处理并发的 binlog 事件,以提高处理效率。

    请确保你的 MySQL 数据库的连接配置和资源配额能够满足 Flink CDC 的需求,以确保正常运行。如果你仍然遇到问题,可以提供更多相关的配置和上下文信息,以便我们更好地帮助你解决问题。

    2023-07-31 21:27:15
    赞同 展开评论 打赏
  • 存在即是合理

    如果已经使用Flink CDC进入增量阶段,那么在增量阶段,数据库的processlist应该会减少。但是,如果在增量阶段,数据库的processlist没有减少,可能是由于以下原因之一导致的:

    • Flink作业可能没有正确地配置为增量模式。请确保已经将scan.incremental.enabled属性设置为true,并且已经将scan.incremental.max.records属性设置为一个足够小的值,以便只处理增量数据。

    • Flink作业可能正在并行处理多个任务。请确保Flink作业只有一个任务正在运行,并且该任务的并行度设置为4。

    • Flink作业可能正在处理大量的数据。请确保Flink作业具有足够的资源来处理这些数据。

    2023-07-31 14:47:18
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    DTCC 2022大会集锦《云原生一站式数据库技术与实践》 立即下载
    阿里云瑶池数据库精要2022版 立即下载
    2022 DTCC-阿里云一站式数据库上云最佳实践 立即下载