1. canal原理介绍
canal简介
canal [kǝ'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务cache刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
canal原理
主从复制原理:
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷⻉到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
所以canal模拟mysql slave也实现了自己的binlog同步,原理如下:
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
2. canal实战
首先去下载安装相关canal组件
这里可以参考官网:https://github.com/alibaba/canal
安装完canal服务,然后按照文档介绍去配置相应的参数,就可以启动canal的服务了,由于官网介绍的比较细,大家自行去安装和配置,这里着重介绍canal同步binlog代码实践。
canal-go订阅和消费canal binlog
下载canal-go客户端,运行里面samples下的main.go,当然可能报错,将路径替换正确就OK了。(有问题欢迎私信) 部分订阅消费代码如下:
func main() { // 192.168.199.17 替换成你的canal server的地址 // example 替换成-e canal.destinations=example 你自己定义的名字 connector := client.NewSimpleCanalConnector("127.0.0.1", 11113, "", "", "example", 60000, 60*60*1000) err := connector.Connect() if err != nil { log.Println(err) os.Exit(1) } err = connector.Subscribe(".*\\..*") if err != nil { log.Println(err) os.Exit(1) } for { message, err := connector.Get(100, nil, nil) if err != nil { log.Println(err) os.Exit(1) } batchId := message.Id if batchId == -1 || len(message.Entries) <= 0 { time.Sleep(300 * time.Millisecond) fmt.Println("===没有数据了===") continue } printEntry(message.Entries) } }
我们可以对canal配置文件中已经配置的数据库表进行增删改,看看我们程序是否能显示相关binlog日志
- 增加
mysql> insert into canal(`id`, `name`) values(1,"hah"); Query OK, 1 row affected (0.04 sec) mysql>
打印日志:
===没有数据了=== ================> binlog[mysql-bin.000002 : 2069],name[test,canal], eventType: INSERT id : 1 update= true name : hah update= true sex : update= true age : update= true amount : update= true email : update= true occur_time : 2021-09-15 09:47:09 update= true
- 更新
mysql> update canal set name="hello" where id=1; Query OK, 1 row affected (0.04 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql>
打印日志如下:
===没有数据了=== ================> binlog[mysql-bin.000002 : 2348],name[test,canal], eventType: UPDATE -------> before id : 1 update= false name : hah update= false sex : update= false age : update= false amount : update= false email : update= false occur_time : 2021-09-15 09:47:09 update= false -------> after id : 1 update= false name : hello update= true sex : update= false age : update= false amount : update= false email : update= false occur_time : 2021-09-15 09:47:09 update= false ===没有数据了===
- 删除
mysql> delete from canal where id=1; Query OK, 1 row affected (0.03 sec) mysql>
打印日志如下:
================> binlog[mysql-bin.000002 : 2643],name[test,canal], eventType: DELETE id : 1 update= false name : hello update= false sex : update= false age : update= false amount : update= false email : update= false occur_time : 2021-09-15 09:47:09 update= false ===没有数据了===
3. 总结
通过canal-go客户端可以订阅和消费canal server,那么同样我们可以配置kafka,mysql,es,mq等也可以去订阅和消费数据,项目中比较常见的就是将mysql数据同步到es中提供搜索查询能力,而这最好的方式就是通过canal中间件来做,减少人工的维护成本,提供工作效率。