Flink CDC 同步 MySQL 数据写入paimon,写入时间戳字段会毫秒补0,mysql数据2023-05-19 08:59:12 / cdc to paimon数据2023-05-19 08:59:12.000 ,如何解决?
Flink CDC(Change Data Capture)在同步MySQL数据到Paimon时,可能会遇到时间戳字段毫秒补0的情况。这是因为Flink CDC默认将时间戳字段转换为精确到毫秒的时间戳,而MySQL的时间戳字段可能只精确到秒。这种情况下,毫秒部分会被补零。
要解决这个问题,可以采取以下几种方法:
TIMESTAMP(3)
或DATETIME(3)
,其中数字3表示毫秒精度。TO_TIMESTAMP_LTZ
函数将时间戳转换为精确到秒的格式。CREATE TABLE mysql_source (
id INT,
ts TIMESTAMP(3), -- 注意这里的毫秒精度
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'database-name' = 'testdb',
'table-name' = 'test_table',
'username' = 'root',
'password' = 'password',
'scan.startup.mode' = 'initial'
);
CREATE TABLE paimon_sink (
id INT,
ts TIMESTAMP(3), -- 注意这里的毫秒精度
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'paimon',
'path' = 'hdfs://localhost:9000/paimon/test_table',
'sink.format' = 'json'
);
INSERT INTO paimon_sink
SELECT
id,
TO_TIMESTAMP_LTZ(ts, 3) AS ts -- 转换为毫秒精度
FROM mysql_source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class CdcToPaimon {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Define MySQL CDC source
String jdbcUrl = "jdbc:mysql://localhost:3306/testdb";
String username = "root";
String password = "password";
tEnv.executeSql(
"CREATE TABLE mysql_source (" +
"id INT," +
"ts TIMESTAMP(3)," + // 注意这里的毫秒精度
"PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
"'connector' = 'mysql-cdc'," +
"'hostname' = 'localhost'," +
"'port' = '3306'," +
"'database-name' = 'testdb'," +
"'table-name' = 'test_table'," +
"'username' = '" + username + "'," +
"'password' = '" + password + "'," +
"'scan.startup.mode' = 'initial'" +
")"
);
// Define Paimon sink
tEnv.executeSql(
"CREATE TABLE paimon_sink (" +
"id INT," +
"ts TIMESTAMP(3)," + // 注意这里的毫秒精度
"PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
"'connector' = 'paimon'," +
"'path' = 'hdfs://localhost:9000/paimon/test_table'," +
"'sink.format' = 'json'" +
")"
);
Table sourceTable = tEnv.from("mysql_source");
// 使用Java API处理时间戳字段
DataStream<Tuple2<Integer, String>> dataStream = tEnv.toAppendStream(sourceTable, Tuple2.class);
DataStream<Tuple2<Integer, String>> processedStream = dataStream.map(tuple -> {
// 这里可以添加逻辑来处理时间戳字段
// 例如:转换为毫秒精度
return tuple; // 返回处理后的数据
});
// 将处理后的数据写入Paimon
processedStream.addSink(JdbcSink.sink(
"INSERT INTO test_table (id, ts) VALUES (?, ?)",
new JdbcStatementBuilder<Tuple2<Integer, String>>() {
@Override
public void accept(PreparedStatement stmt, Tuple2<Integer, String> value) throws SQLException {
stmt.setInt(1, value.f0);
stmt.setTimestamp(2, Timestamp.valueOf(value.f1));
}
},
new JdbcExecutionOptions.Builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(jdbcUrl)
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername(username)
.withPassword(password)
.build()
));
env.execute("Flink CDC to Paimon");
}
}
flink.sql.connectors.mysql-cdc.time-zone=UTC
flink.sql.connectors.mysql-cdc.timestamp-type=TIMESTAMP_LTZ
通过上述方法之一或组合使用,应该可以解决Flink CDC同步MySQL数据到Paimon时时间戳字段毫秒补0的问题。请根据您的具体情况选择合适的方法进行调整。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。