基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 前言 在基于 MySQL + Tablestore 分层存储架构的大规模订单系统中,利用 CDC 技术将 MySQL 数据同步到 Tablestore 是不可缺少的一步。前文已经详细讲述了如何使用 DTS 向 Tablestore 同步数据。对于中小规模的数据库,或者个人开发者,还可以使用 Canal 从 MySQL 向 Tablestore 同步数据。Canal 部署简单,易于运维,且相比于 D

前言

在基于 MySQL + Tablestore 分层存储架构的大规模订单系统中,利用 CDC 技术将 MySQL 数据同步到 Tablestore 是不可缺少的一步。前文已经详细讲述了如何使用 DTS 向 Tablestore 同步数据。对于中小规模的数据库,或者个人开发者,还可以使用 Canal 从 MySQL 向 Tablestore 同步数据。Canal 部署简单,易于运维,且相比于 DTS,它成本更低,因此它更适合小规模的数据同步。

Canal 简介

Canal 是阿里开源 CDC 工具,他可以获取 MySQL binlog 解析,并将数据变动传输给下游。详情可参考Canal官网。基于 Canal,可以实现从 MySQL 到其他数据库的实时同步。Canal 部署简单、成本低,适合中小规模 Mysql 数据库同步工作。其架构如图:

Deployer 负责拉取 Binlog,解析数据,分发,记录位点。而 Client-adapter 负责接收上游数据,通过 Adapter适配器,将数据持久化到目标库。Deployer 和 Client-Adapter 作为 Canal 中的两个模块,分别独立部署。Tablestore 团队已经在 Adapter 中增加了 TablestoreAdapter,可以支持向 Tablestore 中写入数据。


下面,我们将部署 Canal,并将处于 Rds 中的 MySQL 订单数据同步进入 Tablestore,实现数据全量、增量的同步。

Canal 部署

环境准备

准备部署 canal 程序的机器。本文中在阿里云官网申请了一台 8 vCPU,16 GiB内存的 Linux 机器作为部署机器。如果读者同样需要申请 ECS,请参考:ECS入门概述

源表准备

使用下面 SQL 在 MySQL 中新建测试表 order_contract_canal,其表结构与 order_contract 相同。

CREATE TABLE `order_contract_canal` (
  `oId` varchar(50) NOT NULL,
  `create_time` datetime NOT NULL COMMENT '下单时间',
  `pay_time` datetime DEFAULT NULL COMMENT '支付时间',
  `has_paid` tinyint(4) DEFAULT NULL COMMENT '是否已经支付',
  `c_id` varchar(20) DEFAULT NULL COMMENT '消费者id',
  `c_name` varchar(20) DEFAULT NULL COMMENT '消费者姓名',
  `p_brand` tinytext COMMENT '产品品牌',
  `p_count` mediumint(9) DEFAULT NULL COMMENT '产品数量',
  `p_id` varchar(20) DEFAULT NULL COMMENT '产品id',
  `p_name` varchar(20) DEFAULT NULL COMMENT '产品名',
  `p_price` decimal(16,2) DEFAULT NULL COMMENT '产品价格',
  `s_id` varchar(20) DEFAULT NULL COMMENT '店铺id',
  `s_name` varchar(20) DEFAULT NULL COMMENT '店铺名称',
  `total_price` decimal(16,2) DEFAULT NULL COMMENT '总价格',
  PRIMARY KEY (`oId`),
  KEY `idx_sid` (`s_id`),
  KEY `idx_paytime_sid` (`pay_time`,`s_id`) USING BTREE,
  KEY `idx_cid` (`c_id`),
  KEY `idx_paytime_cid_totalprice` (`pay_time`,`c_id`,`total_price`) USING BTREE,
  KEY `idx_sid_paytime` (`s_id`,`pay_time`),
  KEY `idx_sid_paytime_totalprice` (`s_id`,`pay_time`,`total_price`),
  KEY `idx_paytime_totalprice_pbrand` (`p_price`,`total_price`,`pay_time`) USING BTREE,
  KEY `idx_paytime` (`pay_time`),
  KEY `idx_pbrand_paytime` (`p_brand`(10),`pay_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

目标表准

在 Tablestore 中创建表 canal_target_order作为测试表,使其表结构与订单表 order_contract 相同。其表结构如图:

Deployer部署

在 Canal 官方 release页,下载 canal.deployer 包。将包解压后,配置 conf 路径下的配置文件,然后就可以通过 bin 路径下的启动脚本启动。配置 MySQL Binlog 模式以及部署 Canal Deployer 的具体步骤可以参考 Canal 官方文档的 QuickStart


本文创建新的实例,在 Deployer 的 conf 目录下创建文件夹 test_ots。将 conf/example/instance.properties 复制到 test_ots 路径下。然后修改 test_ots 路径下的 instance.properties 配置文件。需要关注如下配置项:

参数

说明

canal.instance.master.address

rm-bp15p07134rkvf7z6wo.mysql.rds.aliyuncs.com:3306

数据库域名端口

canal.instance.rds.accesskey

***

本文 MySQL 为阿里云产品 RDS,需填入对应accessKey。若非 RDS 库,此项不用填写。

canal.instance.rds.secretkey

***

本文 MySQL 为阿里云产品 RDS,需填入对应secretkey。若非 RDS 库,此项不用填写。

canal.instance.rds.instanceId

rm-bp15p07134rkvf7z6

本文 MySQL 为阿里云产品 RDS,需填入对应示例 id。若非 RDS 库,此项不用填写。

canal.instance.dbUsername

***

数据库账号用户名

canal.instance.dbPassword

***

数据库账号密码

canal.instance.filter.regex

test_ots\\.[test|order_contract_canal].*

Canal 实例关注的表。通过正则表达式匹配。

这里匹配 test_ots 库下表名以 test 开头或者以 order_contract_canal 开头的表

canal.destinations

test_ots

canal 的实例名称,需要配置文件所在上层路径相同,本例路径为 conf/test_ots /instance.properties,那么实例名为test_ots

ClientAdapter部署

在 Canal 官方 release页,下载 canal.adapter 包。将包解压后,配置 conf 路径下的配置文件,然后就可以通过 bin 路径下的启动脚本启动。包解压后,若 plugin 路径下不存在以 client-adapter.tablestore 开头的 jar 包,说明此安装包不包含 Tablestore 对接部分代码


conf 路径下 application.yml中需要额外关注配置见表。

参数

说明

是否必填

canal.conf:

canalAdapters:instance

test_ots

与depolyer中的destinations保持一致

canal.conf:

canalAdapters:outerAdapters:

-name:

tablestore

定义适配器类型,填入 tablestore 说明此适配器下游写入 Tablestore

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.endpoint


ablestore endpoint

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.accessSecretId

****

AccessSecretId

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.accessSecretKey

****

AccessSecretKey

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.instanceName

test-20210609

tablestore 中的 InstanceName

canal.conf: terminateOnException

true

默认为false。若配置为true,则若数据同步重试后仍失败,程序会暂停实时同步任务,等待用户手动处理

完整配置 application.yml 配置如下

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 3
  timeout:
  accessKey:
  secretKey:
  terminateOnException: true
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://rm-bp15p07134rkvf7z6wo.mysql.rds.aliyuncs.com:3306/test_ots?useUnicode=true
      username: ****
      password: ****
  canalAdapters:
  - instance: test_ots # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: tablestore
        key: ts
        properties:
          tablestore.endpoint: https://test-20210609.cn-hangzhou.ots.aliyuncs.com
          tablestore.accessSecretId: ****
          tablestore.accessSecretKey: ****
          tablestore.instanceName: test-20210609

在 conf/tablestore 路径下,创建 order.yml 文件,填入以下内容。配置表示从源库表 test_ots. order_contract_canal 向目标表 canal_target_order 同步数据。

dataSourceKey: defaultDS
destination: test_ots
groupId: g1
outerAdapterKey: ts
threads: 8
updateChangeColumns: false
dbMapping:
  database: test_ots
  table: order_contract_canal
  targetTable: canal_target_order
  targetPk:
    oId: oId
  targetColumns:
    oId:
    create_time:
    pay_time:
    has_paid:
    c_id:
    c_name:
    p_brand:
    p_count:
    p_id:
    p_name:
    p_price:
    s_id:
    s_name:
    total_price:
  etlCondition: 
  commitBatch: 200 # 批量提交的大小

其中各参数含义见表。

参数

说明

是否必填

dataSourceKey

该任务的源数据库标识,在application.yml中可以找到该标识对应的数据库

destination

canal实例名,与application.yml下的instance相同

groupId

分组id,MQ模式下使用,这里不关心,配置成application.yml中canalAdapters中相同即可

outerAdapterKey

使用的Adapter标识,应与application.yml中outerAdapters下的key值相同

threads

筒数量,默认为1,对应tablestorewriter中的bucket数量

dbMapping.database

源库名

dbMapping.table

源表名

dbMapping.targetTable

目标表

dbMapping.targetPk

主键配置

id: target_id 源表主键:目标表主键。多主键可以配置多个

dbMapping.targetColumns

配置需要同步的列名,以及列映射,可以配置类型转换。

id: target_id$string,表示id字段同步后为target_id字段,且类型映射为string。

id: target_id,表示id字段同步后为target_id字段

id: ,表示id字段同步前后字段名不变,字段类型采用默认映射。

id: $string 功能等同于id: id$string

dbMapping.etlCondition

全量抽取数据时的过滤条件

dbMapping.commitBatch

一次批量RPC请求导入的行数,对应tablestorewriter中的maxBatchRowsCount,默认取writerConfig中的默认值200

updateChangeColumns

行覆盖或行更新。

默认为false,为行覆盖,即记录更新时,使用该记录最新整行值覆盖 Tablestore 中的老记录。若为true,为行更新,即记录更新时,只对变化的字段进行操作。

全量同步数据

使用程序向原始表 order_contract_canal 中插入 1 千万行记录。

同步数据

调用 client-adapter 服务的方法触发同步任务。指令格式为

curl "localhost:8081/etl/{type}/{key}/{task}" -X POST

type 为下游数据库类型;key 是 adapter key;task 为任务配置文件的名称。在本文中,指令为:

curl "localhost:8081/etl/tablestore/ts/order.yml" -X POST

程序会首先中止增量数据传输,然后同步全量历史数据。同步开始后,可以在日志中看到 Adapter 中 TablestoreWriter 的传输日志变化。

性能测试

在 Tablestore 监控页面查看数据写入速率,首先进入 Tablestore控制台。点击对应实例进入实例详情页。

点击表 canal_target_order。

点击监控指标进入监控页面。

同步任务开始后,在监控页面可以看到数据如图。此时 Canal 所在机器配置为 8 核 16G,order.yml 中 threads 配置为 8。源库记录数在 1千万,每行数据大小约 0.5KB。可以看到在任务开始的时候,并发写入 Tablestore 速率很高,在 2w行/s 左右。而随着任务的进行,写入速率开始下降,这是由于全量导入数据时从源库获取数据时使用 limit offset 导致的,受限于上游数据的获取。

1千万数据完成写入共耗时 28 分钟。耗时统计见下表。可以看到,在数据导入初期,导入速率相对较快,而数据导入后期,导入效率明显降低。时间统计和控制台中监控数据吻合。

从程序开始到完成导入

使用时间

完成 300w 行导入

3m

完成 400w 行导入

5m

完成 500w 行导入

8m

完成 600w 行导入

11m

完成 700w 行导入

14m

完成 800w 行导入

18m

完成 900w 行导入

22.5m

完成 1000w 行导入

28m


增量同步数据

使用附录中的程序中的接口("/canal/press"),向原始表持续写入数据。

性能测试

输入如下指令调用接口,使用 3 线程写入数据,每个线程每秒写入 4000 行记录。

curl "localhost:8082/canal/press?rps=4000&threads=3"  -X POST

在控制台可以看到数据持续写入,速率约在 1.2w 行/s。

在当前 8 核 16G 的机器配置下,继续增加并发写入量,写入 1.6w 到 2w 行每秒,测试出的增量同步上限约在 1.5w 行/s,每行记录约 0.5KB。

异常处理

ClientAdapter 配置文件application.yml 中 terminateOnException 若不配置或配置为 false,同步程序同步后仍报错,则程序会记录日志,跳过报错数据,继续同步任务。而若 terminateOnException 配置为 true,则同步报错后,程序会中止增量数据同步任务,等待用户介入处理报错。此时,用户可以通过下面接口查看任务的开启、中断状态。命令格式如下:

curl "localhost:8081/syncSwitch/{destination}"

在本文中命令为

curl "localhost:8081/syncSwitch/test_ots"

处理异常后,可以调用如下接口重新启动增量同步任务。

curl "localhost:8081/syncSwitch/test_ots/on" -X PUT

总结

本文简要介绍了 Canal,并且详细的展示了如何使用 Canal 从 MySQL 库向 Tablestore 中同步全量、增量数据。

附录

Canal 测试程序git地址:

https://github.com/aliyun/tablestore-examples

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
安全 关系型数据库 MySQL
PHP与MySQL交互:从入门到实践
【9月更文挑战第20天】在数字时代的浪潮中,掌握PHP与MySQL的互动成为了开发动态网站和应用程序的关键。本文将通过简明的语言和实例,引导你理解PHP如何与MySQL数据库进行对话,开启你的编程之旅。我们将从连接数据库开始,逐步深入到执行查询、处理结果,以及应对常见的挑战。无论你是初学者还是希望提升技能的开发者,这篇文章都将为你提供实用的知识和技巧。让我们一起探索PHP与MySQL交互的世界,解锁数据的力量!
|
1月前
|
监控 关系型数据库 MySQL
深入了解MySQL主从复制:构建高效稳定的数据同步架构
深入了解MySQL主从复制:构建高效稳定的数据同步架构
99 1
|
2月前
|
NoSQL 关系型数据库 MySQL
微服务架构下的数据库选择:MySQL、PostgreSQL 还是 NoSQL?
在微服务架构中,数据库的选择至关重要。不同类型的数据库适用于不同的需求和场景。在本文章中,我们将深入探讨传统的关系型数据库(如 MySQL 和 PostgreSQL)与现代 NoSQL 数据库的优劣势,并分析在微服务架构下的最佳实践。
|
18小时前
|
存储 SQL 关系型数据库
Mysql高可用架构方案
本文阐述了Mysql高可用架构方案,介绍了 主从模式,MHA模式,MMM模式,MGR模式 方案的实现方式,没有哪个方案是完美的,开发人员在选择何种方案应用到项目中也没有标准答案,合适的才是最好的。
21 3
Mysql高可用架构方案
|
10天前
|
NoSQL 关系型数据库 MySQL
MySQL与Redis协同作战:百万级数据统计优化实践
【10月更文挑战第21天】 在处理大规模数据集时,传统的单体数据库解决方案往往力不从心。MySQL和Redis的组合提供了一种高效的解决方案,通过将数据库操作与高速缓存相结合,可以显著提升数据处理的性能。本文将分享一次实际的优化案例,探讨如何利用MySQL和Redis共同实现百万级数据统计的优化。
36 9
|
1月前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
67 0
|
3月前
|
API C# 开发框架
WPF与Web服务集成大揭秘:手把手教你调用RESTful API,客户端与服务器端优劣对比全解析!
【8月更文挑战第31天】在现代软件开发中,WPF 和 Web 服务各具特色。WPF 以其出色的界面展示能力受到欢迎,而 Web 服务则凭借跨平台和易维护性在互联网应用中占有一席之地。本文探讨了 WPF 如何通过 HttpClient 类调用 RESTful API,并展示了基于 ASP.NET Core 的 Web 服务如何实现同样的功能。通过对比分析,揭示了两者各自的优缺点:WPF 客户端直接处理数据,减轻服务器负担,但需处理网络异常;Web 服务则能利用服务器端功能如缓存和权限验证,但可能增加服务器负载。希望本文能帮助开发者根据具体需求选择合适的技术方案。
129 0
|
3月前
|
存储 关系型数据库 MySQL
深入MySQL:事务日志redo log详解与实践
【8月更文挑战第24天】在MySQL的InnoDB存储引擎中,为确保事务的持久性和数据一致性,采用了redo log(重做日志)机制。redo log记录了所有数据修改,在系统崩溃后可通过它恢复未完成的事务。它由内存中的redo log buffer和磁盘上的redo log file组成。事务修改先写入buffer,再异步刷新至磁盘,最后提交事务。若系统崩溃,InnoDB通过redo log重放已提交事务并利用undo log回滚未提交事务,确保数据完整。理解redo log工作流程有助于优化数据库性能和确保数据安全。
485 0
|
21天前
|
存储 关系型数据库 MySQL
Mysql(4)—数据库索引
数据库索引是用于提高数据检索效率的数据结构,类似于书籍中的索引。它允许用户快速找到数据,而无需扫描整个表。MySQL中的索引可以显著提升查询速度,使数据库操作更加高效。索引的发展经历了从无索引、简单索引到B-树、哈希索引、位图索引、全文索引等多个阶段。
55 3
Mysql(4)—数据库索引
|
6天前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。同时,文章还对比了编译源码安装与使用 RPM 包安装的优缺点,帮助读者根据需求选择最合适的方法。通过具体案例,展示了编译源码安装的灵活性和定制性。
40 2

热门文章

最新文章