开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问flinkcdc下 mysql表里是yyyy-MM-dd,读出来的是数字,怎么把他还原回去?

问题1:请问flinkcdc下 mysql表里是yyyy-MM-dd,读出来的是数字,怎么把他还原回去?beb1c29d1aa800015c2485d6e99589a0.png
b3285dc8c7dfdc7b5eb099d17e16354f.png
问题2:
00cc222727fdb98a22881edc46a8d58a.png
SYSTEM

展开
收起
小易01 2023-07-19 18:38:27 132 0
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在 Flink CDC 中,如果 MySQL 表中的日期字段被读取为数字,可以使用 Flink 提供的日期时间转换函数将其还原为日期格式。具体而言,可以使用 Flink 的 from_unixtime() 函数将时间戳转换为日期字符串,然后再使用 cast() 函数将日期字符串转换为日期类型。
    以下是一个示例代码:
    java
    Copy
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple5;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

    import java.text.SimpleDateFormat;
    import java.util.Date;

    public class MySQLToDate {
    public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建 Flink CDC 连接器
        MySQLSource<String> mysqlSource = MySQLSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("test")
                .tableList("test_table")
                .username("root")
                .password("password")
                .deserializer(new StringDeserializer())
                .build();
    
        // 读取 MySQL 表中的数据
        DataStream<String> mysqlDataStream = env.addSource(mysqlSource);
    
        // 将读取的数据转换为 Tuple5
        DataStream<Tuple5<Integer, String, Integer, String, Long>> tupleDataStream = mysqlDataStream.map(new MapFunction<String, Tuple5<Integer, String, Integer, String, Long>>() {
            @Override
            public Tuple5<Integer, String, Integer, String, Long> map(String value) throws Exception {
                String[] fields = value.split(",");
                int id = Integer.parseInt(fields[0]);
                String name = fields[1];
                int age = Integer.parseInt(fields[2]);
                String dateStr = fields[3];
                long timestamp = Long.parseLong(fields[4]);
    
                return new Tuple5<>(id, name, age, dateStr, timestamp);
            }
        });
    
        // 将时间戳转换为日期字符串
        DataStream<Tuple5<Integer, String, Integer, Date, Long>> dateDataStream = tupleDataStream.map(new MapFunction<Tuple5<Integer, String, Integer, String, Long>, Tuple5<Integer, String, Integer, Date, Long>>() {
            @Override
            public Tuple5<Integer, String, Integer, Date, Long> map(Tuple5<Integer, String, Integer, String, Long> value) throws Exception {
                String pattern = "yyyy-MM-dd";
                SimpleDateFormat sdf = new SimpleDateFormat(pattern);
                Date date = sdf.parse(value.f3);
    
                return new Tuple5<>(value.f0, value.f1, value.f2, date, value.f4);
            }
        });
    
        // 输出结果
        dateDataStream.print();
    
        env.execute();
    }
    

    }
    在上述示例中,首先将 MySQL 表中的数据读取为字符串格式,然后使用 MapFunction 将其转换为 Tuple5 格式。其中,日期字段 dateStr 被读取为字符串格式。接下来,使用 MapFunction 将时间戳转换为日期字符串,并将其与其他字段一起组成 Tupl

    2023-07-29 19:15:15
    赞同 展开评论 打赏
  • 意中人就是我呀!

    "回答1:看看你的mysql时区 是否是 time_zone = 'Asia/Shanghai';
    用1970-01-01加上这个天数就可以了。此回答整理至钉群“Flink CDC 社区”。"

    2023-07-19 19:04:55
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
搭建电商项目架构连接MySQL 立即下载
搭建4层电商项目架构,实战连接MySQL 立即下载
PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

相关镜像