微服务轮子项目(36) -Canal数据库日志解析消费

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 微服务轮子项目(36) -Canal数据库日志解析消费

1. Canal概述

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

目前内部版本已经支持mysql和oracle部分版本的日志解析

当前的canal开源版本支持5.7及以下的版本(阿里内部mysql

5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)

基于日志增量订阅&消费支持的业务:

  • 数据库镜像
  • 数据库实时备份
  • 多级索引 (卖家和买家各自分库索引)
  • search build
  • 业务cache刷新
  • 价格变化等重要业务消息

1.1 工作原理

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

1.2 架构

说明:

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1…n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

1.3 HA机制设计

canal的HA分为两部分,canal servercanal client分别有对应的HA实现

  • canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
  • canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)。

Canal Server:

大致步骤:

  1. canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
  2. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
  3. 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
  4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.

Canal Client的方式和canal server方式类似,也是利用zokeeper的抢占EPHEMERAL节点的方式进行控制。

1.4 相关资料

2. 安装部署

2.1 创建数据库用户canal

目标数据库先要创建好canal用的用户

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

2.2 远程拉取

  1. 访问docker hub获取最新的版本 访问:https://hub.docker.com/r/canal/canal-server/tags/
  2. 下载对应的版本,比如最新版为1.1.3
docker pull canal/canal-server:v1.1.3

2.3 启动Docker

docker目录下自带了一个run.sh脚本

下载脚本:

wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh 

创建启动脚本:

vim start.sh
docker stop canal-server
docker rm canal-server
sh run.sh -e canal.auto.scan=false \
              -e canal.destinations=zlt-test \
              -e canal.instance.master.address=192.168.28.130:3306  \
              -e canal.instance.dbUsername=canal  \
              -e canal.instance.dbPassword=canal  \
              -e canal.instance.connectionCharset=UTF-8 \
              -e canal.instance.tsdb.enable=true \
              -e canal.instance.gtidon=false  \

destinations:目标数据库名

instance.master.address:目标数据库地址

instance.dbUsername:目标数据库用户名

instance.dbPassword:目标数据库密码

具体其他配置可参考AdminGuide

docker模式下,单docker实例只能运行一个instance,主要为配置问题。如果需要运行多instance时,可以自行制作一份docker镜像即可

2.4 运行

sh start.sh
• 1

运行效果:看到successful之后,就代表canal-server启动成功,可以启动canal-client连接上来进行binlog订阅了

2.5 MQ消息投递

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:RocketMQ和Kafka

将上面的启动脚本改为以下,增加MQ相关参数:

docker stop canal-server
docker rm canal-server
sh run.sh -e canal.auto.scan=false \
                  -e canal.destinations=zlt-test \
                  -e canal.instance.master.address=192.168.28.130:3306  \
                  -e canal.instance.dbUsername=canal  \
                  -e canal.instance.dbPassword=canal  \
                  -e canal.instance.connectionCharset=UTF-8 \
                  -e canal.instance.tsdb.enable=true \
                  -e canal.instance.gtidon=false  \
                  -e canal.mq.topic=canal-test \
                  -e canal.serverMode=RocketMQ \
                  -e canal.mq.servers=192.168.28.130:9876 \

canal.mq.topic:配置mq的topic

canal.serverMode:tcp(默认), kafka, RocketMQ

canal.mq.servers:mq地址

投递MQ后的消息如下图:

2.6 如果要订阅的是mysql的从库该怎么做?

生产环境中的主库是不能随便重启的,所以订阅的话必须订阅mysql主从的从库,而从库中是默认下只将主库的操作写进中继日志,并写到自己的二进制日志的,所以需要让其成为canal的主库,必须让其将日志也写到自己的二进制日志里面。处理方法:修改my.cnf,增加一行log_slave_updates=1,重启数据库后就可以了。

3. 实时同步数据到ElasticSearch

如今大型的IT系统中,都会使用分布式的方式,作为使用最广泛的数据库,如何将mysql的数据与中间件的数据进行同步,既能确保数据的一致性、及时性,也能做到代码无侵入的方式呢?下面介绍如何实现数据修改后,需要及时的将mysql中的数据更新到elasticsearch中。

3.1 数据同步方案选择

  • 代码实现(双写):针对代码中进行数据库的增删改操作时,同时进行elasticsearch的增删改操作。
  • mybatis实现:通过mybatis plugin进行实现,截取sql语句进行分析, 针对insert、update、delete的语句进行处理。显然,这些操作如果都是单条数据的操作,是很容易处理的。但是,实际开发中,总是会有一些批量的更新或者删除操作,这时候,就很难进行处理了。
  • Aop实现:不管是通过哪种Aop方式,根据制定的规则,如规范方法名,注解等进行切面处理,但依然还是会出现无法处理批量操作数据的问题。
  • logstash:logstash类似的同步组件提供的文件和数据同步的功能,可以进行数据的同步,只需要简单的配置就能将mysql数据同步到elasticsearch,但是logstash的原理是每分钟进行一次增量数据查询,将结果同步到elasticsearch,实时性要求特别高的,可能无法满足要求。且此方案的性能不是很好,造成资源的浪费。
实现方式 优缺点
代码实现 技术难度低,侵入性强,实时性高
基于mybatis 有一定的技术难度,但是无法覆盖所有的场景
Aop实现 技术难度低,半侵入性,需要规范代码,依然无法覆盖所有的场景
logstash 技术难度低,无侵入性,无需开发,但会造成资源浪费,实时性也不高

那么是否有什么更好的方式进行处理吗?

  • mysql binlog同步,实时性强,对于应用无任何侵入性,且性能更好,不会造成资源浪费。Canal安装部署通过数据库binlog实时抓取数据更新信息推送到消息队列MQ里,然后就可以通过消费MQ消息把数据实时同步到不同的异构数据源里了

3.2 增量同步ES

1.创建索引:在同步之前需要先创建号索引,下面是创建sys_user索引的例子

curl -X PUT "http://192.168.28.130:9200/sys_user/" -H 'Content-Type: application/json' -d'
{
    "settings" : {
      "number_of_shards" : 1,
        "number_of_replicas" : 0
    },
    "mappings":{
      "properties":{
                "id": {
                    "type": "long"
                },
                "username": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "nickname": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "mobile": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "sex": {
                    "type": "keyword"
                },
                "type": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "createTime": {
                    "type": "date"
                },
                "updateTime": {
                    "type": "date"
                },
                "company": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "openId": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    }
                },
                "isDel": {
                    "type": "keyword"
                }
      }
    }
}

地址192.168.28.130:9200为es的ip地址

sys_user为索引名

2.安装配置Adapter,下载adapter:https://github.com/alibaba/canal/releases

详细的配置说明请参考官方wiki:https://github.com/alibaba/canal/wiki/Sync-ES

3.同步表sys_user的配置样例:

canal.adapter-xxx\conf\application.yml:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: rocketMQ # kafka rocketMQ
  mqServers: 192.168.28.130:9876 #or rocketmq
  flatMessage: true
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.28.130:3306/user-center?useUnicode=true
      username: canal
      password: canal
  canalAdapters:
  - instance: canal-sys-user # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es
        hosts: 192.168.28.130:9300
        properties:
          cluster.name: my-es

mode:消费的类型有3种选择tcp、kafka和rocketMQ

mqServers: mq的地址

defaultDS:配置源数据库的地址

instance:配置mq的topic名称

es:配置es的地址和集群名

canal.adapter-xxx\conf\es\sys_user.yml:

dataSourceKey: defaultDS
destination: canal-sys-user
groupId: g1
esMapping:
  _index: sys_user
  _type: search_data
  _id: id
  upsert: true
  sql: "select id, username, nickname, mobile
          , case when sex = 0 then '男' when sex = 1 then '女' end sex
          , case when type = 'app' then '移动用户' when type = 'BACKEND' then '后台用户' end type
          , create_time createTime, update_time updateTime, company, open_id openId
          , case when is_del = 0 then '否' when is_del = 1 then '是' end isDel
        from sys_user"
  etlCondition: "where update_time >= '{0}'"
  commitBatch: 3000

dataSourceKey:配置application.yml中源数据库的key

destination:配置mq的topic名称

_index:插入es中的索引名

_type:插入es中mappings的type属性

_id:配置id字段

upsert:配置插入数据正常时写入,主键冲突时更新

sql:配置具体要同步es的数据

etlCondition:条件判断,通过更新日期实现增量同步

3.3 历史数据全量同步ES

如果在搭建增量同步之前mysql数据库已经存在历史数据,就需要做初始化同步,全量同步可以使用Canal-Adapterrest-api来实现

全量同步初始化,例子如下:

curl -X POST http://192.168.28.130:8081/etl/es/sys_user.yml

ip为Canal-Adapter所在服务器ip

路径/es/sys_user.yml为conf目录下配置文件的路径,会自动忽略where条件进行全量同步

目录
相关文章
|
18天前
|
安全 关系型数据库 MySQL
MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!
《MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!》介绍了MySQL中的三种关键日志:二进制日志(Binary Log)、重做日志(Redo Log)和撤销日志(Undo Log)。这些日志确保了数据库的ACID特性,即原子性、一致性、隔离性和持久性。Redo Log记录数据页的物理修改,保证事务持久性;Undo Log记录事务的逆操作,支持回滚和多版本并发控制(MVCC)。文章还详细对比了InnoDB和MyISAM存储引擎在事务支持、锁定机制、并发性等方面的差异,强调了InnoDB在高并发和事务处理中的优势。通过这些机制,MySQL能够在事务执行、崩溃和恢复过程中保持
47 3
|
3月前
|
SQL JavaScript 关系型数据库
node博客小项目:接口开发、连接mysql数据库
【10月更文挑战第14天】node博客小项目:接口开发、连接mysql数据库
|
3月前
|
SQL 关系型数据库 MySQL
Go语言项目高效对接SQL数据库:实践技巧与方法
在Go语言项目中,与SQL数据库进行对接是一项基础且重要的任务
110 11
|
3月前
|
关系型数据库 MySQL 数据库
DZ社区 mysql日志清理 Discuz! X3.5数据库可以做定期常规清理的表
很多站长在网站日常维护中忽略了比较重要的一个环节,就是对于数据库的清理工作,造成数据库使用量增加必须多的原因一般有2个:后台站点功能开启了家园,此功能现在很少有论坛会用到,但是灌水机会灌入大量垃圾信息致使站长长时间未能发觉;再有就是程序默认的一些通知类表单会存放大量的、对于网站日常运行并无意义的通知信息。
123 2
|
3月前
|
存储 关系型数据库 数据库
数据库启用慢速查询日志如何增强 Postgres 可观测性
数据库启用慢速查询日志如何增强 Postgres 可观测性
50 1
|
18天前
|
存储 Oracle 关系型数据库
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
44 3
|
18天前
|
SQL 关系型数据库 MySQL
数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog
《数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog》介绍了如何利用MySQL的二进制日志(Binlog)恢复误删除的数据。主要内容包括: 1. **启用二进制日志**:在`my.cnf`中配置`log-bin`并重启MySQL服务。 2. **查看二进制日志文件**:使用`SHOW VARIABLES LIKE 'log_%';`和`SHOW MASTER STATUS;`命令获取当前日志文件及位置。 3. **创建数据备份**:确保在恢复前已有备份,以防意外。 4. **导出二进制日志为SQL语句**:使用`mysqlbinlog`
62 2
|
1月前
|
关系型数据库 MySQL 数据库
Python处理数据库:MySQL与SQLite详解 | python小知识
本文详细介绍了如何使用Python操作MySQL和SQLite数据库,包括安装必要的库、连接数据库、执行增删改查等基本操作,适合初学者快速上手。
210 15
|
25天前
|
SQL 关系型数据库 MySQL
数据库数据恢复—Mysql数据库表记录丢失的数据恢复方案
Mysql数据库故障: Mysql数据库表记录丢失。 Mysql数据库故障表现: 1、Mysql数据库表中无任何数据或只有部分数据。 2、客户端无法查询到完整的信息。
|
1月前
|
关系型数据库 MySQL 数据库
数据库数据恢复—MYSQL数据库文件损坏的数据恢复案例
mysql数据库文件ibdata1、MYI、MYD损坏。 故障表现:1、数据库无法进行查询等操作;2、使用mysqlcheck和myisamchk无法修复数据库。