大家 flinkcdc 读取的date类型的数据是一串数值,这个怎么处理呢?
Flink CDC 读取到的 date 类型的数据,实际上是经过了从数据库传输协议中的二进制编码转换为了 Flink 中的 Long 类型的数值。
对于这种情况,处理的方法就是将 Long 类型的数值转换为对应的日期格式。可以使用 Flink 的时间 API 来完成这个转换,具体方法如下:
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import java.text.SimpleDateFormat;
import java.util.Properties;
import java.util.TimeZone;
public class FlinkCDCDateTypeDemo {
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd");
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-group");
props.setProperty("auto.offset.reset", "earliest");
TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
Types.LONG,
Types.STRING,
Types.STRING
};
String[] fieldNames = new String[] {
"id",
"name",
"datetime"
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
FlinkKafkaConsumer<Row> kafkaConsumer = new FlinkKafkaConsumer<>(
"test",
rowTypeInfo,
props
);
tEnv.createTemporaryFunction("toDate", new ToDateFunction());
TableResult table = tEnv.fromDataStream(
env.addSource(kafkaConsumer),
"id, name, toDate(datetime) as datetime"
);
table.execute().print();
env.execute("FlinkCDCDateTypeDemo");
}
public static class ToDateFunction extends ScalarFunction {
public String eval(long date) {
DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));
return DATE_FORMAT.format(date);
}
}
}
以上代码通过调用 Flink 自带的
SimpleDateFormat
类将 Long 类型的数值转换为对应的日期字符串。其中TimeZone
参数可以根据需要进行调整以适应不同时区的场景。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。