对于Flink CDC连接MongoDB,确实需要正确的版本和权限配置。从MongoDB 4.0版本开始,仅支持pv1模式,这是使用MongoDB 3.2或更高版本创建的所有新副本集的默认值。同时,为了使用MongoDB Kafka连接器,必须具有changeStream和读取权限。此外,如果启用了MongoDB的鉴权功能,还需要赋予用户一些额外的权限,如splitVector权限、listDatabases权限、listCollections权限、collStats权限、find权限等。
另外,需要注意的是,当使用MongoDB CDC Connector时,要适当设置Oplog的容量和过期时间。MongoDB的oplog是一个特殊的有容量集合,当其容量达到最大值后,会丢弃历史数据。Change Streams通过resume token来进行恢复,如果oplog的容量设置得过小,可能会导致数据丢失。因此,正确地配置和使用Flink CDC以及MongoDB是确保数据同步和变更捕获功能正常运作的关键。
在使用 Flink CDC 和 MongoDB 时,确实需要注意 MongoDB 的版本和权限设置。根据引用的内容,MongoDB CDC Connector 要求 MongoDB 的最小可用版本是 3.6,比较推荐 4.0.8 及以上版本。此外,必须使用集群部署模式。
在权限方面,需要确保有足够的权限来访问 MongoDB 数据库和集合。具体来说,需要创建一个名为 "changeStream" 的角色,并为该角色授予对所有数据库和集合的 "changeStream" 权限。然后,需要将这个角色分配给 Flink CDC 用于读取数据的用户。
如果已经检查了这些方面,但仍然读取不到数据,可以通过以下方式来排查问题:启用 Flink CDC 的 debug 日志,并查看日志中的信息;使用 Debezium 的 CLI 工具来查看 MongoDB 的数据库变更;使用 Flink CDC 的测试用例来验证配置是否正确。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。