开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink 读取mongo的时候,发过来的数据 source.ts_ms 时间戳不够精确(需求,一张

image.png

展开
收起
雪哥哥 2022-10-30 00:09:38 989 0
16 条回答
写回答
取消 提交回答
  • 阿里云Flink 读取MongoDB的时候,会将MongoDB中的时间戳转换为Unix时间戳(毫秒级),并将其作为Flink中的EventTime。但是由于MongoDB中的时间戳只有秒级精度,因此在转换为Unix时间戳后,精度会降低到毫秒级。

    如果您需要更高精度的时间戳,可以考虑在MongoDB中存储更精确的时间戳,例如使用ISODate类型来存储时间戳。这样在读取MongoDB数据时,Flink就可以直接将ISODate类型的时间戳转换为EventTime,避免了精度损失的问题。

    另外,如果您需要使用Flink中的ProcessingTime来处理数据,可以在读取MongoDB数据时,不使用EventTime,而是使用系统时间作为ProcessingTime。这样可以避免时间戳精度的问题,但是需要注意数据处理的顺序可能会受到系统时间的影响。

    2023-05-07 22:44:57
    赞同 展开评论 打赏
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    可能是因为默认情况下,MongoDB的时间戳精度只能达到毫秒级,而Flink在实时计算过程中需要更高的时间戳精度。

    您可以尝试以下方法来解决这个问题:

    1. 使用MongoDB的_bsonType: Timestamp进行日期的存储,这样可以提高精度至微秒级别。

    2. 自定义Flink的时间戳提取器。例如,使用MongoDB记录时间戳的字段作为flink timestamp字段,再通过Flink的时间戳提取器进行精细化处理,从而得到更高的时间精确度。

    3. 修改MongoDB驱动程序的配置。针对MongoDB的时间戳精度问题,MongoDB驱动程序提供了一个配置选项,可将MongoDB的时间戳精度提高到纳秒级别。可以尝试修改MongoDB驱动程序的配置,以提高数据精度。

    2023-05-05 20:09:53
    赞同 展开评论 打赏
  • 如果您在 Flink 中使用官方提供的 MongoDB 连接器(flink-connector-mongodb)读取 MongoDB 数据库中的数据,发现数据的时间戳 source.ts_ms 不够精确,可能是因为 MongoDB 驱动程序默认将 BSON 的日期类型精度限制到毫秒级别,而 Flink 默认使用的是精确到纳秒级别的时间戳。因此,为了保证 MongoDB 中的时间戳精度,您需要对 MongoDB 连接器进行如下配置:

    MongoDBSource<Person> source = MongoDBSource.<Person>builder()
        .hostname("localhost")
        .port(27017)
        .database("mydb")
        .collection("mycollection")
        .parseKafkaRecord(true)
        .withOptions(new MongoClientOptions.Builder()
            .cursorFinalizerEnabled(false)
            .build())
        .timestampFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
        .build();
    
    

    在上面的示例代码中,我们设置了 timestampFormat 参数用于指定时间戳的格式,通过这个参数可以实现更精确的时间戳。在 timestampFormat 参数中,我们使用了 yyyy-MM-dd'T'HH:mm:ss.SSS 这个格式,表示时间戳精确到毫秒级别,并且使用了 ISO-8601 格式的日期和时间表示方法。同时,我们还设置了 MongoDB 驱动程序的配置选项 cursorFinalizerEnabled 为 false,以确保不使用 MongoDB 驱动程序的默认值。

    需要注意的是,使用更加精确的时间戳格式会增加数据的大小和网络传输的负载,因此需要根据实际情况进行权衡。

    除了在配置 MongoDB 连接器时指定时间戳格式之外,您也可以在 Flink 中使用另外一种方式来实现更高精度的时间戳,即使用自定义 Source 从 MongoDB 中读取数据。这种方式需要您自己编写 Flink 应用程序代码,但是可以更加灵活地处理时间戳和数据。

    2023-05-02 08:08:41
    赞同 1 展开评论 打赏
  • 可以考虑使用MongoDB提供的ObjectId来代替时间戳作为排序字段。ObjectId是MongoDB中每个文档的唯一标识符,它包括一个12字节的序列号,其中前4个字节是以秒为单位的时间戳,后面3个字节是机器标识符,2个字节是进程标识符,最后3个字节是一个随机数。

    通过使用ObjectId作为排序字段,可以保证排序的唯一性和精度,并且不需要对数据进行额外的转换和处理。

    在查询时,可以使用MongoDB的ObjectId和$lt操作符来实现分页。例如,查询时间戳大于t的n条记录,可以使用以下语句:

    db.collection.find({ '_id': { '$lt': ObjectId(Math.floor(t/1000).toString(16) + '0000000000000000') } }).sort({ '_id': -1 }).limit(n)

    其中,Math.floor(t/1000).toString(16) + '0000000000000000'表示将时间戳转换为ObjectId的十六进制形式,然后在末尾添加'0000000000000000'来表示后面的三个字节为随机数。 -1表示按倒序排序。

    2023-04-27 22:48:23
    赞同 展开评论 打赏
  • 云端行者觅知音, 技术前沿我独行。 前言探索无边界, 阿里风光引我情。

    在Flink读取MongoDB数据时,MongoDB的时间戳字段默认会被解析为Unix时间戳。如果您需要更精确的时间戳,可以使用Flink的自定义解析器来解析MongoDB数据,并将时间戳转换为更精确的格式。 还有一些方法是使用Flink的MapFunction接口来转换数据。在MapFunction中,您可以访问MongoDB数据中的时间戳字段,并将其转换为更精确的格式。

    2023-04-27 12:11:47
    赞同 展开评论 打赏
  • 从事java行业9年至今,热爱技术,热爱以博文记录日常工作,csdn博主,座右铭是:让技术不再枯燥,让每一位技术人爱上技术

    Flink 默认的时间戳精度是到毫秒级的,如果需要更高精度的话,mongodb支持保存纳秒级时间戳,可以使用Node.js中的Date对象来保存

    let timestamp = new Date(goTimestamp);
    db.collection.insert({
        timestamp: timestamp
    });
    
    2023-04-26 16:21:30
    赞同 展开评论 打赏
  • 目前社区并没有开源的MongoDBSource

    但是Debezium 支持 MongoDB CDC[1],可以了解下: https://debezium.io/documentation/reference/connectors/mongodb.html

     https://debezium.io/documentation/reference/connectors/mongodb.html#mongodb-streaming-changes

    所以可以借助debezium的MongoDB CDC来实现

    2023-04-25 14:21:44
    赞同 展开评论 打赏
  • 根据你提供的这段文本,可以看到MongoDB中存储的时间戳字段为"ts_ms":1666769243000,该值的时间精度是毫秒级别的。如果需要更高的精确度,可以将该字段扩展为包含纳秒级别的时间戳值。在Flink应用程序中,我们可以使用自定义的反序列化器来解析MongoDB中的数据,并手动对时间戳进行转换和处理,以达到应用程序所需的时间精度要求。

    以下是一种实现方式:

    首先,我们可以定义一个类来表示MongoDB中存储的相关数据结构:

    public class MongoData {
        private long tsNs; // 纳秒级别精度的时间戳
        ...
    
        // getters & setters
    }
    

    接下来,我们编写一个自定义的反序列化器,在反序列化时对时间戳进行转换和处理:

    public class MongoDataDeserializer implements DeserializationSchema<MongoData> {
        @Override
        public MongoData deserialize(byte[] message) throws IOException {
            // 解析MongoDB中的数据
            BSONObject bson = BSON.decode(message);
            String id = (String) bson.get("_id");
            long tsMs = ((BSONTimestamp) bson.get("ts_ms")).getTime();
            
            // 将毫秒转换为纳秒
            long tsNs = TimeUnit.MILLISECONDS.toNanos(tsMs);
    
            MongoData data = new MongoData();
            data.setId(id);
            data.setTsNs(tsNs);
            ...
            return data;
        }
    
        @Override
        public boolean isEndOfStream(MongoData nextElement) {
            return false;
        }
    
        @Override
        public TypeInformation<MongoData> getProducedType() {
            return TypeInformation.of(MongoData.class);
        }
    }
    

    这样,我们就可以在Flink应用程序中使用以上自定义的反序列化器来读取MongoDB存储的数据,并得到纳秒精度的时间戳。虽然这种方式增加了代码的复杂性,但是可以提供更高的数据处理精度和灵活性。

    2023-04-24 18:12:49
    赞同 展开评论 打赏
  • 这个问题通常是因为MongoDB中的时间戳精度导致的。MongoDB默认使用64位整数存储时间戳,精度为毫秒级别。这可能对某些应用来说不够精确。 解决这个问题的方法有以下几种: 1. 在MongoDB中使用时间戳的字符串格式存储,例如ISO 8601格式。读取后,在Flink中解析该字符串获取更高精度的时间戳。 例如在MongoDB中存储:

    {
      "ts" : "2020-12-12T12:12:12.123Z" 
    }
    
    

    然后在Flink中可以使用TimestampParser将其解析为Timestamp:

    TimestampParser timestampParser = new TimestampParser("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); 
    Long timestamp = timestampParser.parse("2020-12-12T12:12:12.123Z").getMillisecond();
    
    
    1. 在MongoDB中存储时间戳的整数部分,同时另外存储nanoseconds的值。读取后,在Flink中组合两个值生成高精度时间戳。 例如在MongoDB中:
    {
      "ts_sec" : 1607774332,
      "ts_nano" : 123456000  
    } 
    
    

    然后在Flink中:

    long tsSec = record.getField("ts_sec");
    int tsNano = record.getField("ts_nano");
    Timestamp timestamp = Timestamp.fromEpochMillis(tsSec * 1000).plusNanos(tsNano);
    
    1. 在MongoDB中使用时间戳的64位浮点数存储,精度可以达到纳秒级别。读取后,在Flink中直接使用。 例如在MongoDB中:
    {
      "ts" : 1607774332.123456  
    }
    
    

    然后在Flink中:

    double ts = record.getField("ts"); 
    Timestamp timestamp = Timestamp.fromEpochMilli((long) ts, (int) ((ts % 1) * 1000_000));
    
    

    以上就是几种在MongoDB中存储更高精度时间戳,并在Flink中正确解析获取的方法。具体选择哪种方式,需要根据场景需要来权衡。

    2023-04-24 16:06:05
    赞同 展开评论 打赏
  • 发表文章、提出问题、分享经验、结交志同道合的朋友

    ts_ms 时间戳值 1666769243000 表示自纪元以来的毫秒数,对应的日期时间为 2022-03-04T11:00:43.000Z,这是一个精确到毫秒级的时间戳。

    如果 Flink 程序在读取 MongoDB 数据库中的数据时得到了这样的时间戳,那么在 Flink 数据流中,时间戳精度也是毫秒级的。

    可以通过在 Flink 程序中使用 org.apache.flink.api.common.functions.MapFunction 或 org.apache.flink.streaming.api.functions.ProcessFunction 等转换函数来处理时间戳,例如:

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.util.Collector;
    
    public class MyFunction implements MapFunction<MyObject, MyObjectWithTimestamp> {
        @Override
        public MyObjectWithTimestamp map(MyObject value) throws Exception {
            // 从 MyObject 对象中提取时间戳,注意除以 1000 转换为秒级精度
            long timestamp = value.getTsMs() / 1000;
            return new MyObjectWithTimestamp(value, timestamp);
        }
    }
    
    public class MyProcessFunction extends ProcessFunction<MyObject, MyObjectWithTimestamp> {
        @Override
        public void processElement(MyObject value, Context ctx, Collector<MyObjectWithTimestamp> out) throws Exception {
            // 从 MyObject 对象中提取时间戳,注意除以 1000 转换为秒级精度
            long timestamp = value.getTsMs() / 1000;
            out.collect(new MyObjectWithTimestamp(value, timestamp));
        }
    }
    
    
    2023-04-24 11:40:04
    赞同 展开评论 打赏
  • 如果您使用 Flink 读取 MongoDB 数据库中的数据,并发现时间戳(ts_ms)的精度不够,可以尝试使用 Flink 的时间戳分配器(Timestamp Assigner)来重新分配时间戳。时间戳分配器是 Flink 中用于为数据流中的元素分配事件时间(Event Time)和处理时间(Processing Time)的机制。通过重新分配时间戳,您可以使用更精确的时间戳进行事件时间处理和窗口计算。

    在 Flink 中,可以通过实现 AssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarks 接口来自定义时间戳分配器。其中,AssignerWithPeriodicWatermarks 接口用于定期生成水位线(Watermark),而 AssignerWithPunctuatedWatermarks 接口则是在每个事件上动态生成水位线。具体来说,您可以根据数据流中的元素来计算事件时间,例如从 MongoDB 数据中提取更精确的时间戳,并使用 TimestampsAndWatermarks 工具类来生成水位线。示例代码如下:

    DataStreamSource<Document> mongoSource = ...; // 从 MongoDB 中读取数据流
    SingleOutputStreamOperator<Document> streamWithTimestampsAndWatermarks = mongoSource
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Document>(Time.seconds(5)) {
            @Override
            public long extractTimestamp(Document element) {
                // 从 MongoDB 数据中提取更精确的时间戳
                long tsMs = element.getLong("ts_ms");
                return tsMs;
            }
        });
    

    上述示例中,我们通过实现 BoundedOutOfOrdernessTimestampExtractor 抽象类来实现了 AssignerWithPeriodicWatermarks 接口,并重写了其中的 extractTimestamp 方法来从 MongoDB 数据中提取更精确的时间戳。然后,我们使用 assignTimestampsAndWatermarks 方法将时间戳分配器应用于数据流,并生成水位线。在这个示例中,我们使用了 BoundedOutOfOrdernessTimestampExtractor 类来生成水位线,其中 Time.seconds(5) 表示水位线延迟 5 秒。如果您使用的是 AssignerWithPunctuatedWatermarks 接口,可以使用 emitWatermark 方法在每个事件上动态生成水位线。 另外,为了使用时间戳分配器和水位线,您需要将 MongoDB 数据流转换为 DataStream 类型。在转换数据流时,可以使用 Flink 的 MongoDB 连接器(flink-connector-mongodb)或者自定义的 MongoDB 输入格式(InputFormat)来实现。

    2023-04-24 11:33:17
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    关于 Flink 读取 MongoDB 时时间戳不够精确的问题,可能是由于 MongoDB 默认情况下只提供到秒级精度的时间戳(即 ISODate)导致的。如果您需要更高精度的时间戳,可以在 MongoDB 中使用 Date 类型来存储时间戳。

    同时,在使用 Flink 连接 MongoDB 时,建议使用 BSON 格式的数据源,以便更好地支持 MongoDB 特定的数据类型和操作。具体而言,可以使用 Flink 的 flink-connector-mongodb-bson 扩展来实现。

    在使用 flink-connector-mongodb-bson 连接器时,可以在 Flink SQL 中指定以下选项来配置 MongoDB 数据源:

    sql

    CREATE TABLE myTable ( myField1 INT, myField2 STRING, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 使用水位线 ) WITH ( 'connector' = 'mongodb', 'hosts' = 'mongodb://localhost:27017', 'database' = '', 'collection' = '', 'bsonDecodingEnabled' = 'true', -- 启用 BSON 解码 'connectionOptions'.'authMechanism' = 'SCRAM-SHA-256', -- 配置连接参数

    ...
    

    )

    其中,ts 列指定时间戳属性,WATERMARK 语句用于为时间戳列指定水位线,指定方式与之前回答过的基于事件时间窗口类似。同时,配置选项中的 'bsonDecodingEnabled' 参数用于启用 BSON 解码,'connectionOptions'.'authMechanism' 参数可以指定连接认证方式。

    2023-04-24 09:45:03
    赞同 展开评论 打赏
  • 全栈JAVA领域创作者

    当使用Flink连接MongoDB时,您可能会遇到时间戳(ts_ms)不够精确的问题。这是因为MongoDB中的时间戳精度只有毫秒级别,而Flink默认使用的事件时间精度是纳秒级别。

    为了解决这个问题,您可以使用Flink提供的时间戳提取器(Timestamp Extractor),将MongoDB中的时间戳转换为Flink的事件时间。Flink提供了多个内置的时间戳提取器,例如:

    BsonTimestampExtractor:用于提取MongoDB中的BSON时间戳; JsonTimestampExtractor:用于提取MongoDB中的JSON时间戳; DateTimeFormatTimestampExtractor:用于从字符串中提取事件时间,支持自定义日期格式。 您可以根据您的需求选择合适的时间戳提取器,并使用以下代码来将MongoDB中的时间戳转换为Flink的事件时间:

    sql Copy code DataStream stream = env.addSource( new FlinkMongoDBSource<>( mongoClientURI, new MongoDBInputFormat( new BsonTimestampExtractor("timestamp"), // or new JsonTimestampExtractor("timestamp") // or new DateTimeFormatTimestampExtractor("yyyy-MM-dd HH:mm:ss.SSS", ZoneOffset.UTC) ... ) ) ); 在上述代码中,我们使用FlinkMongoDBSource作为数据源,并指定了BsonTimestampExtractor作为时间戳提取器。您可以根据您的需求选择其他提取器,并相应地调整代码。

    使用时间戳提取器可以将MongoDB中的时间戳转换为Flink的事件时间,并使Flink能够正确地执行事件时间计算。希望这些信息能够帮助您解决问题。

    2023-04-23 20:14:25
    赞同 展开评论 打赏
  • 技术架构师 阿里云开发者社区技术专家博主 CSDN签约专栏技术博主 掘金签约技术博主 云安全联盟专家 众多开源代码库Commiter

    在 Flink 读取 MongoDB 数据时,如果您使用的是 Flink 的官方 MongoDB 连接器 flink-connector-mongodb,则默认情况下,该连接器会将 MongoDB 中的时间戳字段转换为 Java 的 java.util.Date 类型。由于 Java 的 Date 类型只支持毫秒级别的时间戳,因此在 Flink 中处理时,可能会出现时间戳精度不够的情况。

    为了解决这个问题,您可以考虑以下两种方法:

    使用 BSON 时间戳类型:在插入数据到 MongoDB 中时,可以将时间戳字段的值设置为 BSON 时间戳类型(BSON Timestamp),该类型支持更高精度的时间戳,其单位为微秒。然后,在 Flink 中读取数据时,可以使用 MongoDBInputFormat.setDeserializationConverter() 方法来自定义时间戳的转换方式,将 BSON 时间戳类型转换为 Flink 支持的时间戳类型(如 java.time.Instant 类型)。

    自定义时间戳转换器:在 Flink 中,您可以通过实现 TimestampExtractor 接口来自定义时间戳的提取和转换方式。例如,如果您想要使用 MongoDB 中的 _id 字段作为时间戳,可以实现一个自定义的时间戳提取器,同时在 Flink 中使用 MongoDBInputFormat.setTimestampExtractor() 方法来指定该时间戳提取器。

    2023-04-23 16:55:45
    赞同 展开评论 打赏
  • 热爱开发

    表需要使用MongoDB中的时间戳作为事件时间,但是flink读取MongoDB时,发现传过来的数据中的时间戳source.ts_ms不够精确),该怎么处理?

    可以尝试以下两种方法:

    使用MongoDB自带的ObjectId字段作为事件时间,它包含了更加精确的时间戳信息。在Flink中使用BSONDeserializationSchema来解析数据流,获取ObjectId并将其转换为时间戳。 对于已有的数据,可以进行批量的处理,在Flink中进行额外的计算,通过拓展source.ts_ms以获得更加精确的时间戳。比如将原先的时间戳乘以1000,并再加上一个0到999的随机数,以实现毫秒级别的精度。

    2023-04-23 16:55:45
    赞同 展开评论 打赏
  • 在 deserialize 方法中,将 MongoDB 中的时间戳转换为 Flink 中的时间戳,并设置到 Row 中的相应字段上。例如:

    public class MongoDBTimeDeserializationSchema implements DeserializationSchema { @Override public Row deserialize(byte[] message) throws IOException { BSONObject bsonObject = bson.decode(message); // 将 BSONObject 转换为 Flink 中的 Row Row row = ... // 将 BSONObject 中的时间戳取出并转换为 Flink 中的时间戳 long mongoTimestamp = ((BSONTimestamp)bsonObject.get("timestamp")).getTime(); Timestamp flinkTimestamp = new Timestamp(mongoTimestamp); // 将 Flink 中的时间戳设置到 Row 中的某个字段上 row.setField(fieldIndex, flinkTimestamp); return row; } } 这样就可以将 MongoDB 中的时间戳作为 Flink 数据流中的时间戳来使用了。 MongoDB 中的时间戳可能会存在一定的精度误差,因此需要根据具体业务需求来选择合适的时间戳精度。如果 MongoDB 中的时间戳精度不够,可以考虑将 MongoDB 中的时间戳与其他字段结合起来作为 Flink 数据流中的时间戳来使用。

    2023-04-23 16:33:57
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载