Canal使用

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 使用canal监听MySQL中binlog,搭配RabbitMQ,做到记录数据库变化

搭建环境

数据库

MySQL5.7:官网自行下载安装

  1. 确认是否开启binlog:show variables like'log_bin',当显示off则未开启
  2. 修改MySQL配置文件my.cnf,windows下为my.ini文件,添加内容如下
log-bin=log-bin
binlog-format=ROW
server_id=1
  1. 重启mysql服务,systemctl restart mysqld
  2. 查看是否开启成功,显示on则为已开启

RabbitMQ

  1. 使用docker安装部署RabbitMQ(或者官网下载安装RabbitMQ)
#docker安装部署rabbitmqdocker search rabbitmq-management
docker pull rabbitmq-management #默认拉取latest,不需要管理界面的可以不携带management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq-management
docker ps #查看是否正常启动
#本地访问管理界面:localhost:15672   用户名和密码:guest

canal

  1. 下载地址:https://github.com/alibaba/canal/releases  ,本次为下载:canal-deployer-1.1.5.tar.gz,低版本有可能不支持RabbitMQ
  2. 解压缩文件
  3. 修改文件下conf\example中instance.properties文件
#修改为设置的数据ip,端口,用户名密码canal.instance.master.address = 127.0.0.1:3306
canal.instance.dbUsername = root
canal.instance.dbPassword = root
  1. bin目录下启动
  2. 简单使用:https://github.com/alibaba/canal/wiki/ClientExample

使用

简单实现

可以参考github上实例,上文给出的地址

privateStringcanalMonitorTableName="study";
privateStringcanalMonitorHost="127.0.0.1";
privateIntegercanalMonitorPort=11111;
@AsyncpublicvoidstartMonitorSQL() {
while (true) {
CanalConnectorconnector=CanalConnectors.newSingleConnector(newInetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "", "");
try {
//打开连接connector.connect();
log.info("数据库检测连接成功!"+canalMonitorTableName);
//订阅数据库表connector.subscribe(canalMonitorTableName+"\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();
while (true) {
// 获取指定数量的数据Messagemessage=connector.getWithoutAck(1000);
longbatchId=message.getId();
intsize=message.getEntries().size();
if (batchId==-1||size==0) {
                } else {
//处理数据handleDATAChange(message.getEntries());
                }
// 提交确认connector.ack(batchId);
            }
        } catch (Exceptione) {
e.printStackTrace();
log.error("成功断开监测连接!尝试重连");
        } finally {
connector.disconnect();
//防止频繁访问数据库链接: 线程睡眠 10秒try {
Thread.sleep(10*1000);
            } catch (InterruptedExceptione) {
e.printStackTrace();
            }
        }
    }
}
/*** 解析binlog获得的实体类信息*/privatevoidhandleDATAChange(List<CanalEntry.Entry>entrys) {
for (CanalEntry.Entryentry : entrys) {
if (entry.getEntryType() ==CanalEntry.EntryType.TRANSACTIONBEGIN||entry.getEntryType() ==CanalEntry.EntryType.TRANSACTIONEND) {
continue;
        }
//RowChange对象,包含了一行数据变化的所有特征CanalEntry.RowChangerowChage;
try {
rowChage=CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (Exceptione) {
thrownewRuntimeException("ERROR ## parser of eromanga-event has an error , com.canal.data:"+entry.toString(), e);
        }
CanalEntry.EventTypeeventType=rowChage.getEventType();
log.info("Canal监测到更新:【{}】", entry.getHeader().getTableName());
switch (eventType) {
caseDELETE:
//todo :执行删除的业务System.out.println("DELETE");
break;
caseINSERT:
//todo :执行插入的业务System.out.println("INSERT");
break;
caseUPDATE:
//todo :执行更新的业务System.out.println("UPDATE");
break;
default:
break;
        }
    }
}

RabbitMQ实现

canal配置修改

注意:一个service可以配置多个实例监听,默认自带exampl实例,增加实例可以复制exampl,修改里面的配置文件instance.properties

canal.properties

# rabbitMQcanal.serverMode = rabbitMQ
##################################################rabbitmq.host =127.0.0.1
rabbitmq.virtual.host =/
rabbitmq.exchange =canal.exchange
rabbitmq.username =guest
rabbitmq.password =guest
rabbitmq.deliveryMode =2

instance.properties

canal.mq.topic=canal.routing.key

RabbitMQ配置

  1. 定义队列,队列名称:canal.queue
  2. 定义交换器:canal.exchange
  3. 设置路由键:canal.routing.key,进行绑定

  1. 启动canal,修改数据库数据,查看RabbitMQ是否接收message

代码实现

基础搭建:使用springboot+mybatis-plus搭建环境,jdk1.8,下列基础的配置和代码不做展示

pom文件:canal,rabbitmq依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency>

yaml文件:rabbitmq配置

spring:  rabbitmq:    host: 127.0.0.1    port: 5672    username: guest
    password: guest
    virtual-host: /

常量类

publicclassRabbitMqConstant {
/*** 交换器*/publicstaticfinalStringEXCHANGE_TOPIC="canal.exchange";
/*** 队列*/publicstaticfinalStringQUEUE_INFO="canal.queue";
/*** 路由键*/publicstaticfinalStringROUTING_KEY="canal.routing.key";
}

配置类:设置队列,交换器,进行绑定

@ConfigurationpublicclassExchangeConfig {
@BeanpublicTopicExchangeexchange() {
returnnewTopicExchange(RabbitMqConstant.EXCHANGE_TOPIC, true, false, null);
  }
@BeanpublicQueueinfoQueue() {
returnnewQueue(RabbitMqConstant.QUEUE_INFO, true, false, false);
  }
@BeanpublicBindinginfoBinding() {
//链式写法,绑定交换机和队列,并设置匹配键returnBindingBuilder//绑定队列        .bind(infoQueue())
//到交换机        .to(exchange()).with(RabbitMqConstant.ROUTING_KEY);
  }
}

配置类:配置rabbitmq

@ConfigurationpublicclassRabbitMqConfig {
@AutowiredprivateRabbitTemplaterabbitTemplate;
/*** 自定义JSON消息序列化器, 默认就是json*/@BeanpublicMessageConvertermessageConverter() {
returnnewJackson2JsonMessageConverter();
  }
@BeanpublicRabbitListenerContainerFactory<?>rabbitListenerContainerFactory(ConnectionFactoryconnectionFactory){
SimpleRabbitListenerContainerFactoryfactory=newSimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(newJackson2JsonMessageConverter());
returnfactory;
  }
@PostConstructpublicvoidconfigureRabbitTemplate() {
// 比如在这里设置接收消息后的回调方法//    rabbitTemplate.setConfirmCallback(new ConfirmCallbackImpl());//    rabbitTemplate.setReturnsCallback(new ReturnsCallbackImpl());  }
/*** 成功后的回调方法*/publicstaticclassConfirmCallbackImplimplementsConfirmCallback {
/*** 实现confirm方法*/@Overridepublicvoidconfirm(CorrelationDatacorrelationData, booleanack, Stringcause) {
    }
  }
/*** 失败后的回调方法*/publicstaticclassReturnsCallbackImplimplementsReturnsCallback {
@OverridepublicvoidreturnedMessage(ReturnedMessagereturned) {
    }
  }
}

接收类:接收rabbitmq收到的message,且通过在rabbitmq中可以看到是json格式

@NoArgsConstructor@DatapublicclassCanalMessage<T> {
@JsonProperty("type")
privateStringtype;
@JsonProperty("table")
privateStringtable;
@JsonProperty("data")
privateList<T>data;
@JsonProperty("database")
privateStringdatabase;
@JsonProperty("es")
privateLonges;
@JsonProperty("id")
privateIntegerid;
@JsonProperty("isDdl")
privateBooleanisDdl;
@JsonProperty("old")
privateList<T>old;
@JsonProperty("pkNames")
privateList<String>pkNames;
@JsonProperty("sql")
privateStringsql;
@JsonProperty("ts")
privateLongts;
@JsonProperty("sqlType")
privateTsqlType;
@JsonProperty("mysqlType")
privateTmysqlType;
}

监听类:接收队列消息

@ComponentpublicclassInfoListener {
@RabbitHandler@RabbitListener(queues=RabbitMqConstant.QUEUE_INFO)
publicvoiddata(CanalMessagemessage){
System.out.println(message);
    }
}

参考文章:

https://blog.csdn.net/qq_36079912/article/details/104885048

https://www.cnblogs.com/haoxianrui/p/15522538.html

https://www.cnblogs.com/charkey/p/15503888.html

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
canal SQL 关系型数据库
Canal报错总结(三)
Canal报错总结(三)
|
canal SQL 关系型数据库
|
canal 关系型数据库 MySQL
Canal服务搭建
Canal服务搭建
1161 1
Canal服务搭建
|
canal 关系型数据库 MySQL
canal 组件介绍(1)
前言     首先,这个文章系列主要是讲canal的,毫无疑问,对吧。那么在开始阅读这个系列之前,我希望真正有兴趣的同学一定要先去阅读canal的官方文档,没有什么比这个更权威了。
2401 0
|
6月前
|
canal 监控 关系型数据库
Canal使用和安装总结
Canal使用和安装总结
303 2
|
8月前
|
canal SQL 关系型数据库
Canal入门
Canal入门
220 1
|
消息中间件 NoSQL 关系型数据库
Canal+Kafka实现MySQL与Redis数据同步(二)
Canal+Kafka实现MySQL与Redis数据同步
252 0
|
消息中间件 canal NoSQL
Canal+Kafka实现MySQL与Redis数据同步(一)
Canal+Kafka实现MySQL与Redis数据同步
678 0
|
canal druid 关系型数据库
Canal报错总结(二)
Canal报错总结(二)
|
canal 关系型数据库 MySQL
Canal
Canal是一个用于MySQL数据增量订阅和消费的开源组件,支持多种数据订阅方式,包括基于GTID位点的订阅。
1145 0