Canal Admin 架构
Canal 1.1.4 版本引入了 Canal Admin,提供了统一管理 Canal Server 的 WebUI 界面。Canal Admin 的核心概念主要有:
- Instance:对应 Canal Server 里的 Instance,一个最小的订阅 MySQL 的队列。
- Server:对应 Canal Server,一个 Server 里可以包含多个 Instance,Canal Server 负责订阅 MySQL 的 binlog 数据,可以将数据输出到消息队列或者为 Canal Adapter 提供消息消费。原本 Canal Server 运行所需要的 canal.properties 和 instance.properties 配置文件可以在 Canal Admin WebUI 上进行统一运维,每个 Canal Server 只需要以最基本的配置启动。 (例如 Canal Admin 的地址,以及访问配置的账号、密码即可)
- 集群:对应一组 Canal Server,通过 Zookeeper 协调主备实现 Canal Server HA 的高可用。
Canal 最初只支持将数据从 MySQL 同步到 Kafka,RabbitMQ 等消息队列中,从 1.1.1 版本开始,Canal 实现了一个配套落地的模块 Canal Adapter,实现对 Canal Server 订阅的 binlog 消息进行消费,支持将数据输出至 HBase,MySQL,Elasticsearch,Kudu 中。
机器规划
IP地址 | 角色 |
11.8.36.104:3306 | Canal 元数据库 |
11.17.6.185:4679 | 源库 |
11.8.36.21:3306 | 目标库 |
11.8.36.104:11111 | Canal Server 主 |
11.8.36.105:11111 | Canal Server 备 |
11.8.36.104 | Canal Adapter 主 |
11.8.36.105 | Canal Adapter 备 |
11.8.36.104:8089 | Canal Admin |
11.8.36.125:2181 | Zookeeper 集群 |
11.8.36.116:2181 | Zookeeper 集群 |
11.8.38.120:2181 | Zookeeper 集群 |
准备工作
源库开启 binlog,创建同步用户
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
开启 binlog,设置 binlog 模式为 row。
[mysqld] ... log_bin = binlog 目录 binlog_format = row ...
安装 Java
yum install -y java
部署 Zookeeper 集群
Canal Server 和 Canal Adapter 依赖 Zookeeper 实现 HA 高可用,Zookeeper 安装部署可以参考 Zookeeper 集群搭建。
部署 Canal Admin
下载并解压安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz mkdir -p /software/canal-admin tar -xzvf canal.admin-1.1.5.tar.gz -C /software/canal-admin
初始化 Canal Admin 元数据库
初始化 SQL 脚本里会默认创建名为 canal_manager 的数据库,canal_manager.sql 脚本存放在解压后的 conf 目录下。
#登录 MySQL mysql -h11.8.36.104:3306 -uroot -p123456 #初始化元数据库 source /software/canal-admin/conf/canal_manager.sql
Canal Admin 配置文件
修改配置文件 vim /software/canal-admin/conf/application.yml:
#Canal Admin Web 界面端口 server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 #元数据库连接信息 spring.datasource: address: 11.8.36.104:3306 database: canal_manager username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 #Canal Server 加入 Canal Admin 使用的密码 canal: adminUser: admin adminPasswd: admin
启动 Canal Admin
sh /software/canal-admin/bin/startup.sh
浏览器输入 http://11.8.36.104:8089 访问 Canal Admin 管理界面,默认用户名 admin,密码 123456。
下载并解压压缩包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz mkdir -p /software/canal-server tar -xzvf canal.deployer-1.1.5.tar.gz -C /software/canal-server
Canal Server 配置文件
因为这里使用 Canal Admin 部署集群,所以 Canal Server 节点只需要配置 Canal Admin 的连接信息即可,真正的配置文件统一通过 Canal Admin 界面来管理。编辑 vim /software/canal-server/conf/canal_local.properties 文件:
#Canal Server 地址 canal.register.ip = 11.8.36.104 #Canal Admin 连接信息 canal.admin.manager = 11.8.36.104:8089 canal.admin.port = 11110 canal.admin.user = admin #mysql5 类型 MD5 加密结果 -- admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 #自动注册 canal.admin.register.auto = true #集群名 canal.admin.register.cluster = canal-cluster-1 #Canal Server 名字 canal.admin.register.name = canal-server-1
另一台 Canal Server 也是一样的方式配置,修改 canal.register.ip = 11.8.36.105
和 canal.admin.register.name = canal-server-2
即可。
Canal Admin 创建集群
进入 Canal Admin 集群管理页面,点击新建集群,添加集群名称和 Zookeeper 地址。MySQL 同步数据到 MySQL 比较麻烦,需要先将源 MySQL 的数据同步到 Canal Server 中内置的消息队列中(或者外部 Kafka,RabbitMQ 等消息队列),然后通过 Canal Adapter 去消费消息队列中的消息再写入目标 MySQL。
集群模式下,虽然有多个 Canal Server,但是只有一个是处于 running 状态,客户端连接的时候会查询 Zookeeper 节点获取并连接处于 running 状态的 Canal Server。
#Zookeeper 地址 canal.zkServers = 127.0.0.1:2181,127.0.0.1:3181,127.0.0.1:4181 #tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = tcp # 此配置需要修改成 default-instance canal.instance.global.spring.xml = classpath:spring/default-instance.xml #TSDB 设置 canal.instance.tsdb.enable = true canal.instance.tsdb.url = jdbc:mysql://11.8.36.104:3306/canal_tsdb canal.instance.tsdb.dbUsername = root canal.instance.tsdb.dbPassword = 123456 #使用外部 MySQL 存储表结构变更信息 canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
Canal Server 默认情况下把源库对表结构修改的记录存储在本地 H2 数据库中,当 Canal Server 主备切换后会导致新的 Canal Server 无法正常同步数据,因此修改 TSDB 的设置将外部 MySQL 数据库作为 Canal Server 存储表结构变更信息的库。
create database canal_tsdb;
以下截图为 canal.properties 配置文件中修改的内容。
Canal Instance 配置
查看源库 gtid 信息。
mysql> show master status; +------------------+-----------+--------------+------------------+------------------------------------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | +------------------+-----------+--------------+------------------+------------------------------------------------+ | mysql-bin.000008 | 257890708 | | | 92572381-0b9c-11ec-9544-8cdcd4b157e0:1-1066141 | +------------------+-----------+--------------+------------------+------------------------------------------------+ 1 row in set (0.01 sec) mysql>
在 Instance 管理界面,点击新建 Instance,保存为 sync_table-instance。Instance 表示一个 MySQL 的订阅队列,需要指定源库的连接信息,需要同步的表,以及同步点位。(binlog+postion 或者 gtid)
# MySQL 集群配置中的 serverId 概念,需要保证和当前 MySQL 集群中 id 唯一 (v1.0.26+版本之后 canal 会自动生成,不需要手工指定) # canal.instance.mysql.slaveId=0 # 源库地址 canal.instance.master.address=11.17.6.185:4679 # 方式一:binlog + postion # mysql源库起始的binlog文件 # canal.instance.master.journal.name=mysql-bin.000008 # mysql主库链接时起始的binlog偏移量 # canal.instance.master.position=257890708 # mysql主库链接时起始的binlog的时间戳 # canal.instance.master.timestamp= # 方式二:gtid(推荐) # 启用 gtid 方式同步 canal.instance.gtidon=true # gtid canal.instance.master.gtid=92572381-0b9c-11ec-9544-8cdcd4b157e0:1-1066141 # 源库账号密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal # MySQL 数据解析编码 canal.instance.connectionCharset = UTF-8 # MySQL 数据解析关注的表,Perl 正则表达式 canal.instance.filter.regex=acpcanaldb.* # MySQL 数据解析表的黑名单,过滤掉不同步的表 # canal.instance.filter.black.regex=mysql\\.slave_.*
以下截图为 instance.propertios 配置文件中修改的内容。如果启动 Instance 的时候有以下报错:
CanalParseException: use gtid and TableMeta TSDB should be config timestamp > 0
就设置下 canal.instance.master.timestamp=
,参数值为时间对应的纪元时间以来的毫秒数。
查看 Instance 下提供服务的 Canal Server。
ls /otter/canal/destinations/sync_table-instance/cluster [11.8.36.104:11111, 11.8.36.105:11111] #处于 running 状态的 Canal Server get /otter/canal/destinations/sync_table-instance/running {"active":true,"address":"11.8.36.104:11111"}
部署 Canal Adapter
下载并解压安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz mkdir -p /software/canal-adapter tar -xzvf canal.adapter-1.1.5.tar.gz -C /software/canal-adapter
Canal Adapter 配置文件
由于 Canal Server 只支持将数据输出到消息队列中,因此需要额外部署 Canal Adapter 订阅 Canal Server 中的消息,然后写入目标 MySQL。编辑 vim /software/canal-adapter/conf/application.yml 文件:
#Canal Adapter Rest API 接口 server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: consumerProperties: #Canal Server 单机时配置 canal.tcp.server.host: #Canal Server 高可用时 Canal Adapter 通过 Zookeeper 识别哪台 Canal Server 对外提供服务 canal.tcp.zookeeper.hosts: 11.8.36.125:2181,11.8.38.116:2181,11.8.38.120:2181 canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: #源库连接信息,这里感觉有点多余,根据 Instance 配置也可以获取到源库信息的 srcDataSources: mySyncDS: #和 rdb 中配置文件对应 url: jdbc:mysql://11.17.6.185:4679/acpcanaldb?useUnicode=true username: canal password: canal canalAdapters: #Instance 名字 - instance: sync_table-instance # canal instance Name or mq topic name groups: #组名,Canal Adapter 根据 groupId 实现高可用,两个相同 groupId 的 Canal Adapter 中只有一个会工作,另一个作为备份 #Canal Adapter 也是通过 Zookeeper 来协调主备的 - groupId: group1 outerAdapters: - name: rdb #目录名,后面要在 rdb 目录中配置源库和目标库的映射信息 key: mysql1 #目标库连接信息 properties: jdbc.driverClassName: com.mysql.jdbc.Driver jdbc.url: jdbc:mysql://11.8.36.21:3306/acpcanaldb?useUnicode=true jdbc.username: root jdbc.password: 123456
在 /software/canal-adapter/conf/rdb 目录下创建一个结尾为 yml 的文件:
#和 application.yml 文件中对应 dataSourceKey: mySyncDS #Instance 名字 destination: sync_table-instance #Canal Adapter GroupId groupId: group1 outerAdapterKey: mysql1 concurrent: true #源库和目标库结构全部对应 dbMapping: mirrorDb: true database: acpcanaldb
备 Canal Adapter 的配置和主 Canal Adapter 所有配置一致。
启动 Canal Adapter
sh /software/canal-adapter/bin/startup.sh
同步测试
准备数据
在源库中创建表,要支持 DELETE 和 UPDATE 的同步操作,源库中的表必须有主键或者唯一约束。
create database acpcanaldb; create table acpcanaldb.sync_table(id int primary key,name varchar(20),age int); insert into acpcanaldb.sync_table values (1,'tom',18), (2,'jack',18), (3,'mike',20), (4,'cris',18), (5,'mary',19);
查看当前源库数据:
mysql> select * from acpcanaldb.sync_table; +------+------+------+ | id | name | age | +------+------+------+ | 1 | tom | 18 | | 2 | jack | 18 | | 3 | mike | 20 | | 4 | cris | 18 | | 5 | mary | 19 | +------+------+------+ 5 rows in set (0.01 sec)
新增数据
在源库插入数据:
insert into acpcanaldb.sync_table values (6,'peter',18);
目标库可以正常同步数据。
mysql> select * from canal_tsdb.meta_history; +----+---------------------+---------------------+---------------------+------------------+---------------+------------------+------------------+------------+------------+------------+---------------------------------------------------------------------------------+----------+-------+ | id | gmt_create | gmt_modified | destination | binlog_file | binlog_offest | binlog_master_id | binlog_timestamp | use_schema | sql_schema | sql_table | sql_text | sql_type | extra | +----+---------------------+---------------------+---------------------+------------------+---------------+------------------+------------------+------------+------------+------------+---------------------------------------------------------------------------------+----------+-------+ | 1 | 2021-09-15 02:27:53 | 2021-09-15 02:27:53 | sync_table-instance | mysql-bin.000009 | 59658313 | 3778449602 | 1631672873000 | acpcanaldb | acpcanaldb | sync_table | create table acpcanaldb.sync_table(id int primary key,name varchar(20),age int) | CREATE | NULL | | 2 | 2021-09-15 02:36:34 | 2021-09-15 02:36:34 | sync_table-instance | mysql-bin.000009 | 60124865 | 3778449602 | 1631673394000 | acpcanaldb | acpcanaldb | sync_table | alter table acpcanaldb.sync_table add loc varchar(20) default 'china' | ALTER | NULL | | 3 | 2021-09-15 02:38:06 | 2021-09-15 02:38:06 | sync_table-instance | mysql-bin.000009 | 60207149 | 3778449602 | 1631673486000 | acpcanaldb | acpcanaldb | sync_table | alter table acpcanaldb.sync_table drop age | ALTER | NULL | +----+---------------------+---------------------+---------------------+------------------+---------------+------------------+------------------+------------+------------+------------+-------------------------------------------------------------------------------
事务
源数据库的事务隔离级别是 MySQL 默认的读已提交。
mysql> select @@tx_isolation; +----------------+ | @@tx_isolation | +----------------+ | READ-COMMITTED | +----------------+ 1 row in set, 1 warning (0.01 sec)
在源库上开启事务插入 2 条数据,根据下图可以看到当源库未提交时,目标库不会同步数据,只有当源库提交事务后,目标库才会同步数据。
如果源库上的事务回滚了,对目标库没有影响,因为原本只有提交的事务才会往目标库同步。
高可用测试
高可用测试分为 3 种场景:
- 1.Canal Server 故障:通过 Zookeeper 协调,备 Canal Server 转为 running 状态提供服务。
- 2.Canal Admin 故障:管理节点故障并影响正在运行的任务。
- 3.Canal Adapter 故障:通过 Zookeeper 协调,备 Canal Adapter 转为 running 状态提供服务。
查看当前源库数据:
mysql> select * from acpcanaldb.sync_table; +----+---------+---------+ | id | name | loc | +----+---------+---------+ | 1 | tom2 | china | | 3 | mike | china | | 4 | cris | china | | 5 | mary | china | | 6 | peter | china | +----+---------+---------+ 6 rows in set (0.01 sec)
Canal Server 故障
登录 Zookeeper 查看当前 Canal Server 运行情况,当前以 HA 的方式运行两个 Canal Server,只有一个 Canal Server 会提供服务,另一个作为热备。当前提供服务的 Canal Server 是 11.8.36.105:11111。
root@ydt-net-kafka1:/usr/local/kafka/bin #./zookeeper-shell.sh 11.8.36.125:2181 Connecting to 11.8.36.125:2181 Welcome to ZooKeeper! JLine support is disabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null #当前提供服务的 Canal Serve get /otter/canal/destinations/sync_table-instance/running {"active":true,"address":"11.8.36.105:11111"} #当前运行的两台 Canal Server ls /otter/canal/destinations/sync_table-instance/cluster [11.8.36.104:11111, 11.8.36.105:11111]
当然在 Canal Admin 界面也可以看到当前处于 running 状态的 Canal Server。
MySQL 主从切换
通常在生产环境中,我们通常会部署双主模式的 MySQL,通过 keepalived 对外提供一个虚拟 IP,当主库发生故障时,虚拟 IP 漂移到备库上,由备库接管服务。关于MySQL 双主高可用部署可以参考 MySQL + Keepalived 双主热备搭建。
查看源库当前数据:
mysql> select * from acpcanaldb.sync_table; +----+---------+---------+ | id | name | loc | +----+---------+---------+ | 1 | tom2 | china | | 3 | mike | china | | 4 | cris | china | | 5 | mary | china | | 6 | peter | china | | 7 | lisa | england | | 8 | handson | spain | +----+---------+---------+ 7 rows in set (0.02 sec)
MySQL 源库发送主从切换后,往源库插入一条新数据。
insert into acpcanaldb.sync_table values(9,'james','italy');
目标库可以正常同步数据。MySQL 数据同步到 Kafka 比 MySQL 同步到 MySQL 简单些,不需要 Canal Adapter,Canal Server 可以直接将 MySQL 的数据输出到 Kafka。
数据准备
在源库中创建一张表。
create table acpcanaldb.table1(id int primary key,name varchar(20),age int);
Canal Instance 配置
和前面同步 MySQL 的 Instance 相比,这里有 2 个地方需要修改:
- 1.生成 Kafka Topic 的规则。
- 2.发送到 Kafka Partition 的规则。
# 静态匹配 #canal.mq.topic=acpcanaldb_table1 # 动态匹配 # 匹配所有表,则每个表都会发送到各自表名的topic # 例如数据库名为 acpcanaldb,表名为 table1,则 topic 名字为 acpcanaldb_table1 # 这里的动态匹配有个 bug,如果源库是的表中有下划线,例如前面的 sync_table,将无法同步成功 canal.mq.dynamicTopic=.*\\..* # 数据同步到消息队列Kafka版Topic的指定分区。 # canal.mq.partition=0 # 以下两个参数配置与canal.mq.partition 互斥。配置以下两个参数可以使数据发送至 Kafka Topic 的不同分区。 # 发往 Kafka 的 3 个分区 canal.mq.partitionsNum=3 #库名.表名: 唯一主键,多个表之间用逗号分隔。 #canal.mq.partitionHash=mytest.person:id,mytest.role:id
关于消息队列中 Topic 和源库表的映射关系可以参考 [Canal Kafka RocketMQ QuickStart] (https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart)。 在 Instance 管理界面点击新增 Instance,保存为 sync_table-kafka。
Kafka 创建 Topic
bin/kafka-topics.sh --bootstrap-server 11.8.36.125:9092 \ --create --topic acpcanaldb_table1 \ --partitions 3 --replication-factor 3
查看创建的 Topic:
bin/kafka-topics.sh --list --bootstrap-server 11.8.36.125:9092 #返回结果 acpcanaldb_table1
数据同步
先在 Kafka 上开启消费消息:
bin/kafka-console-consumer.sh --bootstrap-server 11.8.36.125:9092 \ --topic acpcanaldb_table1
往源库中插入一条数据:
insert into acpcanaldb.table1 values(2,'rose',20);
Kafka 上可以看到消费消息:
{"data":[{"id":"2","name":"rose","age":"20"}],"database":"acpcanaldb","es":1631806308000,"id":4,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(20)","age":"int"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"table1","ts":1631806353089,"type":"INSERT"}
监控
Canal Server 性能指标监控基于 Prometheus 的实现,可以把监控的指标信息收集到 Prometheus 中,然后通过 Grafana 展示。
部署 Prometheus
下载并解压安装包
wget https://github.com/prometheus/prometheus/releases/download/v2.30.0/prometheus-2.30.0.linux-amd64.tar.gz tar -xzvf prometheus-2.30.0.linux-amd64.tar.gz cd cd prometheus-2.30.0.linux-amd64
Prometheus 配置文件
编辑 Prometheus 配置文件 vim prometheus.yml,添加 canal 的 job。
- job_name: "canal" static_configs: - targets: ["11.8.36.104:11112","11.8.36.105:11112"]
启动 Prometheus:
./prometheus
查看监控项
浏览器输入 http://11.8.37.41:9090 访问 Prometheus 界面。
部署 Grafana
下载并解压安装包
wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-5.2.2.linux-amd64.tar.gz tar -xzvf grafana-5.2.2.linux-amd64.tar.gz cd grafana-5.2.2
启动 Grafana Server
bin/grafana-server
配置监控图表
浏览器输入 http://11.8.37.41:3000 访问 Grafana 界面。用户名 admin,密码 admin。具体的监控指标可以参考 [Prometheus监控] (https://github.com/alibaba/canal/wiki/Prometheus-QuickStart)。
参考资料
- [Canal Admin 安装与配置] (https://www.cnblogs.com/victorbu/p/13080168.html)
- [Canal Admin ServerGuide] (https://github.com/alibaba/canal/wiki/Canal-Admin-ServerGuide)
- [Canal Admin QuickStart] (https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart)
- [Canal Admin Guide] (https://github.com/alibaba/canal/wiki/Canal-Admin-Guide)
- [配置canal_admin报错] (https://github.com/alibaba/canal/issues/2142)
- [Prometheus监控] (https://github.com/alibaba/canal/wiki/Prometheus-QuickStart)
- [基于canal的client-adapter数据同步必读指南] (https://juejin.cn/post/6868822398020354056)
- [Canal Kafka RocketMQ QuickStart] (https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart)
- [ClientAdapter] (https://github.com/alibaba/canal/wiki/ClientAdapter)