数据集成通过JDBC将数据导入MySQL的几种模式

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 目前MySQL JDBC提供了多种将数据写入MySQL的方式,本文将介绍数据集成(DataX、同步中心、原CDP)支持的几种模式: * insert into xxx values (..), (..), (..) * replace into xxx values (..), (..), (..) * insert into xxx values (..), (..),

目前MySQL JDBC提供了多种将数据写入MySQL的方式,本文将介绍数据集成(DataX、同步中心、原CDP)支持的几种模式:

  • insert into xxx values (..), (..), (..)
  • replace into xxx values (..), (..), (..)
  • insert into xxx values (..), (..), (..), … on duplicate key update …

1、功能区别

1.1 insert into 方式

常规的SQL插入,如果提交的MySQL Server端的数据违反了数据库约束(主键冲突、数据类型不匹配)会直接报错;
对应在数据集成中会报脏数据。 常用于向一张空表里面插入数据

1.2 replace into 方式

与insert into类似,区别:假如将要插入表新记录中主键(PRIMARYKEY或UNIQUE索引)与表中旧记录冲突,replace into自身具有处理冲突的能力:

  • 1、当存在pk冲突的时候是先delete再insert
  • 2、当存在uk冲突的时候是直接update

使用replace into 注意事项

  • 1、能够使用replace,您必须同时拥有表的insert和delete权限;
  • 2、冲突记录:新记录与旧记录的主键值不同,所以其他表中所有与本表老数据主键id建立的关联全部会被破坏;
  • 3、冲突记录:所有列的值均取自在热replace语句中被指定的值。所有缺失的列被设置为各自的默认值,即如果您每次同步的不是表的所有列,会存在一些列在旧记录中有值,replace into后无值的情况;
  • 4、replace语句会返回一个数,来指示受影响的行的数目。该数是被删除和被插入的行数的和。

1.3 insert into… on duplicate key update 方式

将要插入表新记录中主键(PRIMARYKEY或UNIQUE索引)与表中旧记录冲突(具有相同的值),则update旧记录。

3、Replace into 存在的坑

  • 如果库存在主备,基于uk去做replace into时,会造成主备的auto_increment不一致(备库因auto_increment小于实际数据的最大值),在主备切换插入时造成replace into出错,失败一次后,会更新auto_increment为最大值+1;

3.1 实例

master:
use test;
CREATE TABLE `test` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `k` int(10) unsigned NOT NULL,
  `v` varchar(100) DEFAULT NULL,
  `extra` varchar(200) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_k` (`k`)
) ENGINE=InnoDB ;

insert into test(k,v,extra) values(1,1,'extra1'),(2,2,'extra2',3,3,'extra3');

插入完成后,主库和备库数据和schema完全一致;执行replace into:

replace into test(k,v) values(1,'1-1');

主备库数据一致,但是schema不一致。

主库表结构如下:
CREATE TABLE `test` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `k` int(10) unsigned NOT NULL,
  `v` varchar(100) DEFAULT NULL,
  `extra` varchar(200) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_k` (`k`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=gbk;
备库:
CREATE TABLE `test` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `k` int(10) unsigned NOT NULL,
  `v` varchar(100) DEFAULT NULL,
  `extra` varchar(200) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_k` (`k`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=gbk;

原因分析:

binlog中记录的SQL:
### UPDATE test.test
### WHERE
###   @1=1
###   @2=1
###   @3='1'
###   @4='extra1'
### SET
###   @1=4
###   @2=1
###   @3='1-1'
###   @4=NULL

如第一章节所述:
replace into 当存在uk冲突的时候是直接update,update操作不会涉及到auto_increment的修改。

基于此,一些replace操作会被建议使用insert into on duplicate key update。

2、数据集成最佳实践

目前数据集成对于上述三种模式均已经支持,对应DataX MySQLWriter插件配置项中writeMode字段;

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "value": "DataX",
                "type": "string"
              }
            ],
            "sliceRecordCount": 1000
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "insert/replace/update",
            "username": "root",
            "password": "root",
            "column": [
              "id",
              "name"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
                "table": [
                  "test"
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

4.1 数据集成如何保证同步到MySQL作业的幂等性

简单解释 幂等性 :多次运行同一个同步作业得到的结果是一致的;

  • 场景一:表中数据可以删除

在数据集成配置同步任务时,配置前置SQL(delete or truncate表的语句),同步任务在每次执行的时候,在真正同步执行前会执行前置SQL,去清空表,这样即可以实现多次运行同步任务的幂等性。

  • 场景二:表中数据不能删除,常见回流线上业务MySQL库
    配置writeMode为 replace 或者 update,同步的时候即会采用replace into 或者 insert into… on duplicate key update 方式插入MySQL数据库。

参考:

https://askdba.alibaba-inc.com/libary/control/getArticle.do?articleId=12735
https://blog.xupeng.me/2013/10/11/mysql-replace-into-trap/

目录
相关文章
|
2月前
|
安全 关系型数据库 MySQL
如何将数据从MySQL同步到其他系统
【10月更文挑战第17天】如何将数据从MySQL同步到其他系统
209 0
|
2月前
|
SQL 前端开发 关系型数据库
全表数据核对 ,行数据核对,列数据核对,Mysql 8.0 实例(sample database classicmodels _No.3 )
全表数据核对 ,行数据核对,列数据核对,Mysql 8.0 实例(sample database classicmodels _No.3 )
53 0
全表数据核对 ,行数据核对,列数据核对,Mysql 8.0 实例(sample database classicmodels _No.3 )
|
2月前
|
关系型数据库 MySQL 数据库
mysql 里创建表并插入数据
【10月更文挑战第5天】
136 1
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
51 3
|
27天前
|
存储 Oracle 关系型数据库
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
本文介绍了MySQL InnoDB存储引擎中的数据文件和重做日志文件。数据文件包括`.ibd`和`ibdata`文件,用于存放InnoDB数据和索引。重做日志文件(redo log)确保数据的可靠性和事务的持久性,其大小和路径可由相关参数配置。文章还提供了视频讲解和示例代码。
131 11
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
|
27天前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
在项目中,为了解决Redis与Mysql的数据一致性问题,我们采用了多种策略:对于低一致性要求的数据,不做特别处理;时效性数据通过设置缓存过期时间来减少不一致风险;高一致性但时效性要求不高的数据,利用MQ异步同步确保最终一致性;而对一致性和时效性都有高要求的数据,则采用分布式事务(如Seata TCC模式)来保障。
58 14
|
29天前
|
SQL 前端开发 关系型数据库
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
54 9
|
2月前
|
SQL Java 关系型数据库
java连接mysql查询数据(基础版,无框架)
【10月更文挑战第12天】该示例展示了如何使用Java通过JDBC连接MySQL数据库并查询数据。首先在项目中引入`mysql-connector-java`依赖,然后通过`JdbcUtil`类中的`main`方法实现数据库连接、执行SQL查询及结果处理,最后关闭相关资源。
|
1月前
|
SQL 关系型数据库 MySQL
定时任务频繁插入数据导致锁表问题 -> 查询mysql进程
定时任务频繁插入数据导致锁表问题 -> 查询mysql进程
47 1
|
1月前
|
SQL 关系型数据库 MySQL
mysql数据误删后的数据回滚
【11月更文挑战第1天】本文介绍了四种恢复误删数据的方法:1. 使用事务回滚,通过 `pymysql` 库在 Python 中实现;2. 使用备份恢复,通过 `mysqldump` 命令备份和恢复数据;3. 使用二进制日志恢复,通过 `mysqlbinlog` 工具恢复特定位置的事件;4. 使用延迟复制从副本恢复,通过停止和重启从库复制来恢复数据。每种方法都有详细的步骤和示例代码。
209 2