0x1 摘要
Hive离线数仓中为了查询分析方便,几乎所有表都会划分分区,最为常见的是按天分区,Flink通过以下配置把数据写入HDFS,
BucketingSink<Object> sink = new BucketingSink<>(path);
//通过这样的方式来实现数据跨天分区
sink.setBucketer(new DateTimeBucketer<>("yyyy/MM/dd"));
sink.setWriter(new StringWriter<>());
sink.setBatchSize(1024 * 1024 * 256L);
sink.setBatchRolloverInterval(30 * 60 * 1000L);
sink.setInactiveBucketThreshold(3 * 60 * 1000L);
sink.setInactiveBucketCheckInterval(30 * 1000L);
sink.setInProgressSuffix(".in-progress");
sink.setPendingSuffix(".pending");
0x2 问题点
如果要做到数据完全正确的落到相应分区,那必须用eventTime
来划分,我们先来看看DateTimeBucketer
桶实现代码,
public class DateTimeBucketer<T> implements Bucketer<T> {
private static final long serialVersionUID = 1L;
private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
private final String formatString;
private final ZoneId zoneId;
private transient DateTimeFormatter dateTimeFormatter;
/**
* Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"} using JVM's default timezone.
*/
public DateTimeBucketer() {
this(DEFAULT_FORMAT_STRING);
}
/**
* Creates a new {@code DateTimeBucketer} with the given date/time format string using JVM's default timezone.
*
* @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
* the bucket path.
*/
public DateTimeBucketer(String formatString) {
this(formatString, ZoneId.systemDefault());
}
/**
* Creates a new {@code DateTimeBucketer} with the given date/time format string using the given timezone.
*
* @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
* the bucket path.
* @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket path.
*/
public DateTimeBucketer(String formatString, ZoneId zoneId) {
this.formatString = Preconditions.checkNotNull(formatString);
this.zoneId = Preconditions.checkNotNull(zoneId);
this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(zoneId);
}
@Override
public Path getBucketPath(Clock clock, Path basePath, T element) {
//分桶关键代码在这里,通过clock获取当前时间戳后格式
String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(clock.currentTimeMillis()));
return new Path(basePath + "/" + newDateTimeString);
}
}
以上代码clock
实例是在BucketingSink#open
方法中实例化,代码如下:
this.clock = new Clock() {
@Override
public long currentTimeMillis() {
//直接返回当前处理时间
return processingTimeService.getCurrentProcessingTime();
}
};
结合以上源码分析发现,使用DateTimeBucketer
分桶是采用当前处理时间,采用当前处理时间必然会跟事件事件存在差异,因此会导致数据跨分区落入HDFS文件,举个例子,假设有一条数据事件时间是2019-09-29 23:59:58
,那这条数据应该落在2019/09/29
分区,但由于这条数据延迟了3秒过来,当处理过来时当前处理时间已经是2019-09-30 00:00:01
,所以这条数据会被落到2019/09/30
分区,针对一些重要场景数据这样的结果是不可接受的。
0x3 解决方案
从以上第二节源码分析可以看出,解决问题的核心在getBucketPath
方法中时间的获取,只要把这里的时间改为事件即可,而正好这个方法的第三参数就是element
,代表每一条记录,只要记录中有事件时间就可以获取。既然现有的实现源码不好改,那我们可以自己基于Bucketer
接口实现一个EventTimeBucketer
分桶器,实现源码如下:
public class EventTimeBucketer implements Bucketer<BaseCountVO> {
private static final String DEFAULT_FORMAT_STRING = "yyyy/MM/dd";
private final String formatString;
private final ZoneId zoneId;
private transient DateTimeFormatter dateTimeFormatter;
public EventTimeBucketer() {
this(DEFAULT_FORMAT_STRING);
}
public EventTimeBucketer(String formatString) {
this(formatString, ZoneId.systemDefault());
}
public EventTimeBucketer(ZoneId zoneId) {
this(DEFAULT_FORMAT_STRING, zoneId);
}
public EventTimeBucketer(String formatString, ZoneId zoneId) {
this.formatString = formatString;
this.zoneId = zoneId;
this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(this.zoneId);
}
//记住,这个方法一定要加,否则dateTimeFormatter对象会是空,此方法会在反序列的时候调用,这样才能正确初始化dateTimeFormatter对象
//那有的人问了,上面构造函数不是初始化了吗?反序列化的时候是不走构造函数的
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
this.dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
}
@Override
public Path getBucketPath(Clock clock, Path basePath, BaseCountVO element) {
String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(element.getTimestamp()));
return new Path(basePath + "/" + newDateTimeString);
}
}
大家实际项目中可以把BaseCountVO
改成自己的实体类即可,使用的时候只要换一下setBucketer
值,代码如下:
BucketingSink<Object> sink = new BucketingSink<>(path);
//通过这样的方式来实现数据跨天分区
sink.setBucketer(new EventTimeBucketer<>("yyyy/MM/dd"));
sink.setWriter(new StringWriter<>());
sink.setBatchSize(1024 * 1024 * 256L);
sink.setBatchRolloverInterval(30 * 60 * 1000L);
sink.setInactiveBucketThreshold(3 * 60 * 1000L);
sink.setInactiveBucketCheckInterval(30 * 1000L);
sink.setInProgressSuffix(".in-progress");
sink.setPendingSuffix(".pending");