假设有一操作表:
CREATE TABLE `test1` ( `id` int(11) NOT NULL, `k` int(11) NOT NULL DEFAULT '0', `c` char(120) NOT NULL DEFAULT '', `pad` char(60) NOT NULL DEFAULT '', PRIMARY KEY (`id`), KEY `k_1` (`k`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 dbpartition by hash(`id`);
从数据库中导出源数据
源数据可以用户自行生成,也可以从数据库中导出,在数据库中导出可通过mysql -e
命令的方式,PolarDB-X和MySQL都支持该方式,具体方法如下:
mysql -h ip -P port -u usr -pPassword db_name -N -e "SELECT id,k,c,pad FROM test1;" >/home/data_1000w.txt ## 原始数据以制表符分隔,数据格式:188092293 27267211 59775766593-64673028018-...-09474402685 01705051424-...-54211554755 mysql -h ip -P port -u usr -pPassword db_name -N -e "SELECT id,k,c,pad FROM test1;" | sed 's/\t/,/g' >/home/data_1000w.csv ## csv文件格式以逗号分隔,数据格式:188092293,27267211,59775766593-64673028018-...-09474402685,01705051424-...-54211554755
推荐对字符串进行处理,转变成csv文件格式,方便后续程序读取数据。
在PolarDB-X中创建目标表
源数据不包括建表语句,所以需要手动在PolarDB-X目标数据库上创建表,关于PolarDB-X建表语句的语法请参见CREATE TABLE语句,例如:
CREATE TABLE `test1` ( `id` int(11) NOT NULL, `k` int(11) NOT NULL DEFAULT '0', `c` char(120) NOT NULL DEFAULT '', `pad` char(60) NOT NULL DEFAULT '', PRIMARY KEY (`id`), KEY `k_1` (`k`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 dbpartition by hash(`id`);
使用程序导入数据到PolarDB-X
您可以自行编写程序,连接PolarDB-X,然后读取本地数据,通过Batch Insert语句导入PolarDB-X中。
下面是一个简单的JAVA程序示例:
// 需要mysql-connector-java.jar, 详情界面:https://mvnrepository.com/artifact/mysql/mysql-connector-java // 下载链接:https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar // 注:不同版本的mysql-connector-java.jar,可能Class.forName("com.mysql.cj.jdbc.Driver")类路径不同 // 编译 javac LoadData.java // 运行 java -cp .:mysql-connector-java-8.0.20.jar LoadData import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; public class LoadData { public static void main(String[] args) throws IOException, ClassNotFoundException, SQLException { File dataFile = new File("/home/data_1000w.csv"); String sql = "insert into test1(id, k, c, pad) values(?, ?, ?, ?)"; int batchSize = 1000; try ( Connection connection = getConnection("ip", 3306, "db", "usr", "password"); BufferedReader br = new BufferedReader(new FileReader(dataFile))) { String line; PreparedStatement st = connection.prepareStatement(sql); long startTime = System.currentTimeMillis(); int batchCount = 0; while ((line = br.readLine()) != null) { String[] data = line.split(","); st.setInt(1, Integer.valueOf(data[0])); st.setInt(2, Integer.valueOf(data[1])); st.setObject(3, data[2]); st.setObject(4, data[3]); st.addBatch(); if (++batchCount % batchSize == 0) { st.executeBatch(); System.out.println(String.format("insert %d records", batchCount)); } } if (batchCount % batchSize != 0) { st.executeBatch(); } long cost = System.currentTimeMillis() - startTime; System.out.println(String.format("Take %d second,insert %d records, tps %d", cost/1000, batchCount, batchCount/(cost/1000))); } } /** * 获取数据库连接 * * @param host 数据库地址 * @param port 端口 * @param database 数据库名称 * @param username 用户名 * @param password 密码 * @return * @throws ClassNotFoundException * @throws SQLException */ private static Connection getConnection(String host, int port, String database, String username, String password) throws ClassNotFoundException, SQLException { Class.forName("com.mysql.cj.jdbc.Driver"); String url = String.format( "jdbc:mysql://%s:%d/%s?autoReconnect=true&socketTimeout=600000&rewriteBatchedStatements=true", host, port, database); Connection con = DriverManager.getConnection(url, username, password); return con; } }
您可以根据实际应用场景编写程序,设置合适的batch size和多线程导入,能够加快性能。