自定义序列化器,我把date和datetime都搞定了,但是timestamp类型搞不定有破解招数嘛?
package com.darcytech.debezium.converter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import lombok.extern.slf4j.Slf4j;
import java.time.*; import java.time.format.DateTimeFormatter; import java.util.Properties; import java.util.function.Consumer;
/** * 处理Debezium时间转换的问题 * Debezium默认将MySQL中datetime类型转成UTC的时间戳({@link io.debezium.time.Timestamp}),时区是写死的没法儿改, * 导致数据库中设置的UTC+8,到kafka中变成了多八个小时的long型时间戳 * Debezium默认将MySQL中的timestamp类型转成UTC的字符串。 * | mysql | mysql-binlog-connector | debezium | * | ----------------------------------- | ---------------------------------------- | --------------------------------- | * | date
(2021-01-28) | LocalDate
(2021-01-28) | Integer
(18655) | * | time
(17:29:04) | Duration
(PT17H29M4S) | Long
(62944000000) | * | timestamp
(2021-01-28 17:29:04) | ZonedDateTime
(2021-01-28T09:29:04Z) | String
(2021-01-28T09:29:04Z) | * | Datetime
(2021-01-28 17:29:04) | LocalDateTime
(2021-01-28T17:29:04) | Long
(1611854944000) | * * @see io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter */ @Slf4j public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
private ZoneId timestampZoneId = ZoneId.systemDefault();
@Override
public void configure(Properties props) {
readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z));
}
private void readProps(Properties properties, String settingKey, Consumer<String> callback) {
String settingValue = (String) properties.get(settingKey);
if (settingValue == null || settingValue.length() == 0) {
return;
}
try {
callback.accept(settingValue.trim());
} catch (IllegalArgumentException | DateTimeException e) {
log.error("The \"{}\" setting is illegal:{}", settingKey, settingValue);
throw e;
}
}
@Override
public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
String sqlType = column.typeName().toUpperCase();
SchemaBuilder schemaBuilder = null;
Converter converter = null;
if ("DATE".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
converter = this::convertDate;
}
if ("TIME".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
converter = this::convertTime;
}
if ("DATETIME".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
converter = this::convertDateTime;
}
if ("TIMESTAMP".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
converter = this::convertTimestamp;
}
if (schemaBuilder != null) {
registration.register(schemaBuilder, converter);
log.info("register com.darcytech.debezium.converter for sqlType {} to schema {}", sqlType, schemaBuilder.name());
}
}
private String convertDate(Object input) {
if (input instanceof LocalDate) {
return dateFormatter.format((LocalDate) input);
}
if (input instanceof Integer) {
LocalDate date = LocalDate.ofEpochDay((Integer) input);
return dateFormatter.format(date);
}
return null;
}
private String convertTime(Object input) {
if (input instanceof Duration) {
Duration duration = (Duration) input;
long seconds = duration.getSeconds();
int nano = duration.getNano();
LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
return timeFormatter.format(time);
}
return null;
}
private String convertDateTime(Object input) {
if (input instanceof LocalDateTime) {
return datetimeFormatter.format((LocalDateTime) input);
}
return null;
}
private String convertTimestamp(Object input) {
if (input instanceof ZonedDateTime) {
// mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间
ZonedDateTime zonedDateTime = (ZonedDateTime) input;
LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
return timestampFormatter.format(localDateTime);
}
return null;
}
}
此答案整理自钉群“Flink CDC 社区”
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。