开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

自定义序列化器,我把date和datetime都搞定了,但是timestamp类型搞不定有破解招数嘛

自定义序列化器,我把date和datetime都搞定了,但是timestamp类型搞不定有破解招数嘛?

展开
收起
圆葱猪肉包 2023-03-29 16:35:34 418 0
1 条回答
写回答
取消 提交回答
  • package com.darcytech.debezium.converter;

    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import lombok.extern.slf4j.Slf4j;

    import java.time.*; import java.time.format.DateTimeFormatter; import java.util.Properties; import java.util.function.Consumer;

    /** * 处理Debezium时间转换的问题 * Debezium默认将MySQL中datetime类型转成UTC的时间戳({@link io.debezium.time.Timestamp}),时区是写死的没法儿改, * 导致数据库中设置的UTC+8,到kafka中变成了多八个小时的long型时间戳 * Debezium默认将MySQL中的timestamp类型转成UTC的字符串。 * | mysql | mysql-binlog-connector | debezium | * | ----------------------------------- | ---------------------------------------- | --------------------------------- | * | date
    (2021-01-28) | LocalDate
    (2021-01-28) | Integer
    (18655) | * | time
    (17:29:04) | Duration
    (PT17H29M4S) | Long
    (62944000000) | * | timestamp
    (2021-01-28 17:29:04) | ZonedDateTime
    (2021-01-28T09:29:04Z) | String
    (2021-01-28T09:29:04Z) | * | Datetime
    (2021-01-28 17:29:04) | LocalDateTime
    (2021-01-28T17:29:04) | Long
    (1611854944000) | * * @see io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter */ @Slf4j public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
    private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
    private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
    private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
    
    private ZoneId timestampZoneId = ZoneId.systemDefault();
    
    @Override
    public void configure(Properties props) {
        readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p));
        readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p));
        readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));
        readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p));
        readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z));
    }
    
    private void readProps(Properties properties, String settingKey, Consumer<String> callback) {
        String settingValue = (String) properties.get(settingKey);
        if (settingValue == null || settingValue.length() == 0) {
            return;
        }
        try {
            callback.accept(settingValue.trim());
        } catch (IllegalArgumentException | DateTimeException e) {
            log.error("The \"{}\" setting is illegal:{}", settingKey, settingValue);
            throw e;
        }
    }
    
    @Override
    public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
        String sqlType = column.typeName().toUpperCase();
        SchemaBuilder schemaBuilder = null;
        Converter converter = null;
        if ("DATE".equals(sqlType)) {
            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
            converter = this::convertDate;
        }
        if ("TIME".equals(sqlType)) {
            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
            converter = this::convertTime;
        }
        if ("DATETIME".equals(sqlType)) {
            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
            converter = this::convertDateTime;
        }
        if ("TIMESTAMP".equals(sqlType)) {
            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
            converter = this::convertTimestamp;
        }
        if (schemaBuilder != null) {
            registration.register(schemaBuilder, converter);
            log.info("register com.darcytech.debezium.converter for sqlType {} to schema {}", sqlType, schemaBuilder.name());
        }
    }
    
    private String convertDate(Object input) {
        if (input instanceof LocalDate) {
            return dateFormatter.format((LocalDate) input);
        }
        if (input instanceof Integer) {
            LocalDate date = LocalDate.ofEpochDay((Integer) input);
            return dateFormatter.format(date);
        }
        return null;
    }
    
    private String convertTime(Object input) {
        if (input instanceof Duration) {
            Duration duration = (Duration) input;
            long seconds = duration.getSeconds();
            int nano = duration.getNano();
            LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
            return timeFormatter.format(time);
        }
        return null;
    }
    
    private String convertDateTime(Object input) {
        if (input instanceof LocalDateTime) {
            return datetimeFormatter.format((LocalDateTime) input);
        }
        return null;
    }
    
    private String convertTimestamp(Object input) {
        if (input instanceof ZonedDateTime) {
            // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间
            ZonedDateTime zonedDateTime = (ZonedDateTime) input;
            LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
            return timestampFormatter.format(localDateTime);
        }
        return null;
    }
    

    }

    此答案整理自钉群“Flink CDC 社区”

    2023-03-29 17:51:12
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载