开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

大家 flinkcdc 读取的date类型的数据是一串数值,这个怎么处理呢?

大家 flinkcdc 读取的date类型的数据是一串数值,这个怎么处理呢?

展开
收起
十一0204 2023-04-10 20:50:51 237 0
1 条回答
写回答
取消 提交回答
  • 坚持这件事孤独又漫长。
    • Flink CDC 读取到的 date 类型的数据,实际上是经过了从数据库传输协议中的二进制编码转换为了 Flink 中的 Long 类型的数值。

    • 对于这种情况,处理的方法就是将 Long 类型的数值转换为对应的日期格式。可以使用 Flink 的时间 API 来完成这个转换,具体方法如下:

    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.typeutils.RowTypeInfo;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableResult;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.functions.ScalarFunction;
    import org.apache.flink.types.Row;
    import java.text.SimpleDateFormat;
    import java.util.Properties;
    import java.util.TimeZone;
    
    public class FlinkCDCDateTypeDemo {
    
        private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd");
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
    
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9092");
            props.setProperty("group.id", "my-group");
            props.setProperty("auto.offset.reset", "earliest");
    
            TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
                    Types.LONG,
                    Types.STRING,
                    Types.STRING
            };
    
            String[] fieldNames = new String[] {
                    "id",
                    "name",
                    "datetime"
            };
    
            RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
    
            FlinkKafkaConsumer<Row> kafkaConsumer = new FlinkKafkaConsumer<>(
                    "test",
                    rowTypeInfo,
                    props
            );
    
            tEnv.createTemporaryFunction("toDate", new ToDateFunction());
    
            TableResult table = tEnv.fromDataStream(
                    env.addSource(kafkaConsumer),
                    "id, name, toDate(datetime) as datetime"
            );
    
            table.execute().print();
    
            env.execute("FlinkCDCDateTypeDemo");
        }
    
        public static class ToDateFunction extends ScalarFunction {
            public String eval(long date) {
                DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));
                return DATE_FORMAT.format(date);
            }
        }
    }
    

    以上代码通过调用 Flink 自带的 SimpleDateFormat 类将 Long 类型的数值转换为对应的日期字符串。其中 TimeZone 参数可以根据需要进行调整以适应不同时区的场景。

    2023-04-10 22:01:04
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

热门讨论

热门文章

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载