基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

简介: 前言 前文架构篇,可以看到 MySQL + Tablestore 非常适合大规模订单系统这一类需求场景。那么,我们首先要做的是,利用 CDC(Change Data Capture) 技术将订单数据实时从 MySQL 同步到 Tablestore 中。对于订单系统的数据同步,我们需要关注同步的稳定性、实时性。目前,存在多款工具可以实现这一功能,他们有的是开源工具如 Canal,有的是阿里云端服务如

前言 

前文架构篇,可以看到 MySQL + Tablestore 非常适合大规模订单系统这一类需求场景。那么,我们首先要做的是,利用 CDC(Change Data Capture) 技术将订单数据实时从 MySQL 同步到 Tablestore 中。对于订单系统的数据同步,我们需要关注同步的稳定性、实时性。目前,存在多款工具可以实现这一功能,他们有的是开源工具如 Canal,有的是阿里云端服务如 DTS。下面我们将对各种同步工具进行介绍,并以 DTS 为例展示同步操作。

同步工具介绍

DataX

DataX 是异构数据源离线同步的工具,支持多种异构数据源之间高效的数据同步。它使用 SQL 从数据库拉取数据,全内存操作。它具有一下特点:

  • 适合进行离线全量同步,不适合支持增量同步。
  • 可以通过编程来进行增量同步,但有一定延时,源表需要通过字段区分哪些记录为待同步字段,且无法捕获删除操作。
  • 单点执行。

因此,它适合中小用户,同步对实时性无太高要求的数据。其具体使用见:数据同步-从MySQL到Tablestore

Canal

Canal是阿里开源 CDC 工具,他可以获取 MySQL binlog 并解析,并将数据变动传输给下游。详情可参考Canal官网。基于 Canal,可以实现从 MySQL 到其他数据库的实时同步。Canal 部署简单、成本低,适合中小规模 Mysql 数据库向其他数据库的同步工作。Tablestore 团队已经在 Canal 中实现了 Tablestore 适配器,可以支持将 MySQL 数据同步进入 Tablestore,具体细节请参考后续文章。

DTS

数据传输服务(Data Transmission Service,简称DTS)支持关系型数据库、NoSQL、大数据(OLAP)等数据源,集数据迁移、订阅及实时同步功能于一体,能够解决公共云、混合云场景下,远距离、毫秒级异步数据传输难题。其特点为:

  • 基于云部署,只需要简单配置就可以运行
  • 基于 binlog,可以实时同步数据
  • 费用相对于 DataX 高

因此,目前,中小型用户,对实时性要求没有很高的用户,可以使用 DataX 进行 MySQL 到 Tablestore 的同步。而企业级用户,或者对于延迟要求比较高的客户,推荐使用 DTS 进行数据同步。 本文会展示如何完成基于 DTS 从 MySQL 到 Tablestore 的同步系统的搭建。而这套同步系统正是订单数据上Tablestore 的第一步工作。

服务开通

创建 MySQL 并建表

在 RDS 上申请源的 MySQL 数据库,可以参考创建RDS MySQL实例。已经在 RDS 上或者 ECS 上拥有 MySQL 实例的同学可以忽略这一步。

在数据库中创建订单表 order_contract,建表语句如下:

CREATE TABLE `order_contract` (
  `oId` varchar(50) NOT NULL,
  `create_time` datetime NOT NULL COMMENT '下单时间',
  `pay_time` datetime DEFAULT NULL COMMENT '支付时间',
  `has_paid` tinyint(4) DEFAULT NULL COMMENT '是否已经支付',
  `c_id` varchar(20) DEFAULT NULL COMMENT '消费者id',
  `c_name` varchar(20) DEFAULT NULL COMMENT '消费者姓名',
  `p_brand` tinytext COMMENT '产品品牌',
  `p_count` mediumint(9) DEFAULT NULL COMMENT '产品数量',
  `p_id` varchar(20) DEFAULT NULL COMMENT '产品id',
  `p_name` varchar(20) DEFAULT NULL COMMENT '产品名',
  `p_price` decimal(16,2) DEFAULT NULL COMMENT '产品价格',
  `s_id` varchar(20) DEFAULT NULL COMMENT '店铺id',
  `s_name` varchar(20) DEFAULT NULL COMMENT '店铺名称',
  `total_price` decimal(16,2) DEFAULT NULL COMMENT '总价格',
  PRIMARY KEY (`oId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
;

开通 Tablestore 服务

提供了三种方式创建 Tablestore 实例,读者请自由选择一种创建。

阿里云 CLI (POP Client) 创建按量模式实例

通过阿里云官网下载安装并配置阿里云 CLI。具体参考 阿里云 CLI。完毕后,打开命令行,输入

aliyun ots InsertInstance --endpoint ots.cn-hangzhou.aliyuncs.com  --InstanceName test-20210609

用于创建实例。其中 endpoint 填入实例所在地域域名。然后在 Tabelstore 控制台即可看到创建出的实例如图。

在 Cloud Shell 中可以同样使用此指令建立 Tablestore 实例,Cloud Shell 地址见 Cloud Shell 地址

控制台创建按量模式实例

进入Tablestore首页。点击进入管理控制台

点击创建实例

选择按量模式。填入实例名称

点击确定,完成创建。然后可以在控制台首页看到对应实例。

控制台创建预留模式实例

此创建过程与控制台创建按量模式实例基本相同,只是在选择购买方式时选择预留模式。

开通配置 DTS

进入DTS首页。点击立即购买

商品类型选择数据传输服务 Tablestore(后付费),功能选择数据同步,源实例选择 MySQL ,目标实例选择 Tablestore ,同步拓扑选择单向同步

若有压测等需要大量同步记录的需求,同步链路规格参数可以适当选大,每种链路的传输数据能力可以在页面上查看。

点击立即购买。弹出如下页面,勾选服务协议,然后点击立即开通,即完成购买。

购买成功后,进入管理控制台,点击同步实例下面的数字“1”,可以看见尚未配置的实例信息。

点击配置同步链路

配置源库 MySQL 和目标库 Tablestore 的信息。AccessKey 配置参考:获取AccessKey。配置完成后点击授权白名单并进入下一步

选择同步对象页,同步初始化这里,结构初始化全量数据初始化增量数据初始化这三个选项都要进行勾选。源库对象,选择源表 order_contract,点击中间红框中的按钮,将表传送到右侧“已选中对象”。

点击编辑

将 pay_time 的数据类型映射为 Integer。这样在 Tablestore 中的 pay_time字段是以微秒为单位的时间戳。点击确定。

 

配置完成后,向下滑动窗口,点击“下一步”进入高级配置。继续点击预检查并启动,进行启动。在控制台的数据同步页,可以看到刚刚配置的同步任务。

数据同步测试

字段说明

通过前面,我们申请好了充当订单库的 MySQL 数据库,Tablestore 实例,并且搭建了 DTS 任务从 MySQL 向Tablestore 同步数据。下面,我们简要写一个 Java 程序,持续向 MySQL 中写入订单数据,以验证 DTS 的持续同步能力。生成的订单记录,各字段加入一些随机性,以构造更加真实的测试数据。各字段生成逻辑见表。

字段

字段含义

取值说明

oId

订单号

使用当前时间戳 + c_id,例如1623228187366_user2

create_time

下单时间

取当前时间

pay_time

支付时间

取当前时间,不做更细化仿真,假设每笔订单下单同时支付。

has_paid

是否已经支付

设定为true,这里不对此字段进行仿真。

c_id

消费者id

取一千万以内的随机整数,假设有一千万消费者。消费者id为“user” + id格式,比如“user1”

c_name

消费者姓名

使用“客户” + 消费者id的格式,比如1号消费者,姓名为“客户1”

p_brand

产品品牌

格式为“品牌id”,id为5000以内随机整数,仿真5000个品牌

p_count

产品数量

1到10取随机整数

p_id

产品id

格式为"store1_id", id为100以内随机数,假设每个店铺有100个产品,例如store1_1

p_name

产品名

格式为“产品” + p_id,比如“产品store4075_25”

p_price

产品价格

0到1000元随机浮点数

s_id

店铺id

5000以内随机整数,假设有5000家店铺。店铺id为“store” + id 格式,比如“store1”

s_name

店铺名称

使用“旗舰店” + id的格式,比如1号店铺,id为“store1”,店铺名称为“旗舰店1”

total_price

总价格

p_count * p_price

程序说明

搭建 Springboot 项目,其中创建订单代码如下,代码中包含随机生成参数的逻辑。

    // 创建订单
    private OrderContract createOrder() {
        long now = System.currentTimeMillis();
        LocalDateTime nowT = LocalDateTime.now();
        int cNumber = r.nextInt(1000 * 10000); // 一千万用户
        String userId = "user" + cNumber;
        String oId = now + "_" + userId;

        OrderContract item = new OrderContract();
        item.setoId(oId);
        item.setCreateTime(nowT);
        item.setPayTime(nowT);
        item.setHasPaid(true);
        item.setcId(userId);
        item.setcName("客户" + cNumber);

        int count = r.nextInt(10) + 1;
        item.setpCount(count);   // 商品数量

        double price = r.nextDouble() * 1000d;   // 单价1到1000
        item.setpPrice(price);

        int storeId = r.nextInt(5000); //5000个店铺
        item.setsId("store" + storeId);
        item.setsName("旗舰店" + storeId);
        item.setTotalPrice(item.getpPrice() * item.getpCount());

        int brandId = r.nextInt(5000);
        item.setpBrand("品牌" + brandId);

        int productId = r.nextInt(100);
        item.setpId(item.getsId() +"_" + productId);
        item.setpName("产品" + item.getpId());

        return item;
    }

批量获得订单并插入数据库代码如下,根据传入参数,插入数据库。

public void insertIntoOrders(int size) {
        System.out.println("start insert orders");
        List<OrderContract> list = new ArrayList<>();
        for (int i = 0; i < size; i++) {
            OrderContract order = createOrder();
            list.add(order);
        }

        userMapper.batchInsert(list);
        System.out.println("finish insert orders.");
    }

插入数据库代码:

// Mybatis配置sql
<insert id="batchInsert" parameterType="List">
    insert into order_contract(oId,create_time,pay_time,has_paid,c_id,c_name,p_brand,
    p_count,p_id,p_name,p_price,s_id,s_name,total_price)
    values
    <foreach collection="list" index="index" item="item" separator=",">
        (#{item.oId},#{item.createTime},#{item.payTime},#{item.hasPaid},#{item.cId},#{item.cName},#{item.pBrand},
        #{item.pCount},#{item.pId},#{item.pName},#{item.pPrice},#{item.sId},#{item.sName},#{item.totalPrice})
    </foreach>
</insert>

循环执行批量插入数据库逻辑,以达到批量生成订单数据的目的。

   public void initOrders() {
        while (true) {
            try {
                int size = r.nextInt(1000);
                insertIntoOrders(size);

                Thread.sleep(4000L);
            } catch (InterruptedException e) {
               break;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

同步结果

启动 Java 程序,可以在 Tablestore 中观察到新的订单记录持续的从 MySQL 库同步到 Tablestore 中。

手动导入 SQL

用户也可以手动执行 SQL 插入测试数据。使用下面的存储过程可以直接通过 SQL 写入测试数据。因为逐条插入,插入性能要比程序里的批量插入慢。

DROP PROCEDURE if EXISTS test;
DELIMITER //
CREATE procedure test()
BEGIN
DECLARE i INT;
DECLARE userId INT;
DECLARE c INT;
DECLARE price DOUBLE;
DECLARE storeId INT;
DECLARE brandId INT;
DECLARE productId INT;
DECLARE c_id VARCHAR(255);
SET i = 0;
WHILE i<1000 DO    // 这里的值决定写入记录数
SET userId=CEILING(RAND()*1000*10000);
SET c=CEILING(RAND()*10);
SET price=RAND()*1000;
SET storeId=CEILING(RAND()*5000);
SET brandId=CEILING(RAND()*5000);
SET productId=CEILING(RAND()*100);
SET c_id=CONCAT("user",userId);

INSERT INTO test(oId,create_time,pay_time,
has_paid,c_id,c_name,
p_brand,p_count,p_id,
p_name,p_price,s_id,
s_name,total_price) VALUES
(CONCAT(unix_timestamp(now()),"_",c_id), now(), now(),
true,c_id,CONCAT("客户",userId),
CONCAT("品牌",brandId),c,CONCAT("store",storeId,"_",productId),
CONCAT("产品store",storeId,"_",productId),price,CONCAT("store",storeId),
CONCAT("旗舰店",storeId),p_price * c
);
SET i = i+1;
END WHILE;
END
//
DELIMITER ;
CALL test();

总结

基于 DTS,我们可以实现 MySQL 数据向 Tablestore 的实时同步。数据进入 Tablestore 后,我们可以利用 Tablestore 的特性进行搜索、分析等操作。我们会在后续文章中进行说明。

附录

代码 git 地址:https://github.com/aliyun/tablestore-examples

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
存储 关系型数据库 MySQL
Linux 安装 mysql 及配置存储位置
Linux 安装 mysql 及配置存储位置
401 3
|
存储 关系型数据库 MySQL
MySQL——数据库备份上传到阿里云OSS存储
MySQL——数据库备份上传到阿里云OSS存储
656 0
|
7月前
|
监控 安全 API
电商系统数据同步——电商API接口篇
电商系统中,数据同步是保障业务高效运转与优化用户体验的核心环节,而API接口作为关键技术起着至关重要的作用。本文从API基础解析、应用场景(如商品信息、订单、库存、支付与物流同步)、技术实现路径、挑战与应对策略及最佳实践等方面深入剖析电商API接口在数据同步中的作用,并通过亚马逊、阿里巴巴等实战案例展示其应用价值。
|
7月前
|
存储 关系型数据库 MySQL
【免费动手教程上线】阿里云RDS MySQL推出大容量高性能存储:高性能本地盘(最高16TB存储空间)、高性能云盘(最高64TB存储空间)
阿里云RDS MySQL提供高性能本地盘与高性能云盘等存储方案,满足用户大容量、低延迟需求。高性能本地盘单盘最大16TB,IO延时微秒级;高性能云盘兼容ESSD特性,支持IO性能突发、BPE及16K原子写等能力。此外,阿里云还提供免费动手体验教程,帮助用户直观感受云数据库 RDS 存储性能表现。
|
8月前
|
存储 监控 安全
无需云服务器、无需公网IP,轻松实现门禁系统远程接入与数据同步
智慧园区门禁管理中,贝锐花生壳提供高效解决方案。通过内网穿透技术,无需公网IP即可集中管理多区域门禁系统,保障数据安全传输。采用RSA与AES混合加密,支持权限精细化控制及多维度监控,简单三步实现远程访问,助力园区智慧安全管理升级。
373 7
|
存储 关系型数据库 MySQL
mysql 使用变量存储中间结果的写法
mysql 使用变量存储中间结果的写法
|
10月前
|
存储 关系型数据库 MySQL
MySQL进阶突击系列(09)数据磁盘存储模型 | 一行数据怎么存?
文中详细介绍了MySQL数据库中一行数据在磁盘上的存储机制,包括表空间、段、区、页和行的具体结构,以及如何设计和优化行数据存储以提高性能。
|
存储 关系型数据库 MySQL
PACS系统 中 dicom 文件在mysql 8.0 数据库中的 存储和读取(pydicom 库使用)
PACS系统 中 dicom 文件在mysql 8.0 数据库中的 存储和读取(pydicom 库使用)
400 2
|
存储 SQL 关系型数据库
MySQL 存储函数及调用
MySQL 存储函数及调用
710 3

热门文章

最新文章

推荐镜像

更多