开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC实时采集mongodb,是不是只支持mongo集群?

2023-11-24 06:56:17,588 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: ftp_logs[1] (1/1) (00a811d7e67b1745da551cb75b99613e_bc764cd8ddf7a0cff126f51c16239658_0_37) switched from RUNNING to FAILED on 10.44.0.16:40629-417690 @ 10.44.0.16 (dataPort=37924).
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: com.mongodb.MongoCommandException: Command failed with error 40573 (Location40573): 'The $changeStream stage is only supported on replica sets' on server 10.191.20.100:27017. The full response is {"ok": 0.0, "errmsg": "The $changeStream stage is only supported on replica sets", "code": 40573, "codeName": "Location40573"}
at com.mongodb.kafka.connect.source.MongoSourceTask.setCachedResultAndResumeToken(MongoSourceTask.java:566) ~[flink-sql-connector-mongodb-cdc-2.3.0.jar:2.3.0]
at com.mongodb.kafka.connect.source.MongoSourceTask.start(MongoSourceTask.java:196) ~[flink-sql-connector-mongodb-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.start(MongoDBConnectorSourceTask.java:101) ~[flink-sql-connector-mongodb-cdc-2.3.0.jar:2.3.0]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:759) ~[flink-sql-connector-mongodb-cdc-2.3.0.jar:2.3.0]

Flink CDC实时采集mongodb,是不是只支持mongo集群?

展开
收起
真的很搞笑 2023-12-01 08:33:52 177 0
3 条回答
写回答
取消 提交回答
  • Flink CDC实时采集MongoDB时,确实只支持MongoDB的副本集(replica sets)。这是因为$changeStream阶段是MongoDB的副本集特性,用于捕获数据变更。如果使用非副本集的MongoDB实例,将无法使用Flink CDC进行实时数据采集。

    2023-12-02 17:14:38
    赞同 展开评论 打赏
  • MongoDB CDC 连接器支持通过副本集或分片集架构模式读取阿里云云数据库MongoDB版的数据,也支持读取自建MongoDB数据库的数据 。MongoDB CDC连接器的支持信息如下:https://help.aliyun.com/zh/flink/developer-reference/mongodb-cdc-connector?spm=a2c4g.11186623.0.i220

    image.png

    2023-12-02 11:24:07
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    是的,Flink CDC 实时采集 MongoDB 数据时,只支持在 MongoDB 集群上使用。这是因为 MongoDB 的 Change Streams 功能(用于捕获数据变更)仅在 replica set 或者 sharded cluster 中可用。

    如果你正在尝试从单节点 MongoDB 实例中收集数据,你需要将其转换为一个 replica set 或者 sharded cluster。以下是一些基本步骤:

    1. 创建 replica set

      • 在 MongoDB shell 中运行 rs.initiate() 命令以初始化一个新的 replica set。
      • 将其他 mongod 实例添加到该 replica set 中,以便提高容错性和可用性。
    2. 配置 Flink CDC 连接器

      • 更新 Flink CDC 连接器的配置,使其指向新的 replica set。
    3. 重新启动作业

      • 重新启动你的 Flink CDC 作业,它现在应该能够成功地从 MongoDB replica set 中读取数据了。
    2023-12-01 15:06:20
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载