搭建环境
数据库
MySQL5.7:官网自行下载安装
- 确认是否开启binlog:show variables like'log_bin',当显示off则未开启
- 修改MySQL配置文件my.cnf,windows下为my.ini文件,添加内容如下
log-bin=log-bin binlog-format=ROW server_id=1
- 重启mysql服务,systemctl restart mysqld
- 查看是否开启成功,显示on则为已开启
RabbitMQ
- 使用docker安装部署RabbitMQ(或者官网下载安装RabbitMQ)
#docker安装部署rabbitmqdocker search rabbitmq-management docker pull rabbitmq-management #默认拉取latest,不需要管理界面的可以不携带management docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq-management docker ps #查看是否正常启动 #本地访问管理界面:localhost:15672 用户名和密码:guest
canal
- 下载地址:https://github.com/alibaba/canal/releases ,本次为下载:canal-deployer-1.1.5.tar.gz,低版本有可能不支持RabbitMQ
- 解压缩文件
- 修改文件下conf\example中instance.properties文件
#修改为设置的数据ip,端口,用户名密码canal.instance.master.address = 127.0.0.1:3306 canal.instance.dbUsername = root canal.instance.dbPassword = root
- bin目录下启动
- 简单使用:https://github.com/alibaba/canal/wiki/ClientExample
使用
简单实现
可以参考github上实例,上文给出的地址
privateStringcanalMonitorTableName="study"; privateStringcanalMonitorHost="127.0.0.1"; privateIntegercanalMonitorPort=11111; publicvoidstartMonitorSQL() { while (true) { CanalConnectorconnector=CanalConnectors.newSingleConnector(newInetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "", ""); try { //打开连接connector.connect(); log.info("数据库检测连接成功!"+canalMonitorTableName); //订阅数据库表connector.subscribe(canalMonitorTableName+"\\..*"); //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback(); while (true) { // 获取指定数量的数据Messagemessage=connector.getWithoutAck(1000); longbatchId=message.getId(); intsize=message.getEntries().size(); if (batchId==-1||size==0) { } else { //处理数据handleDATAChange(message.getEntries()); } // 提交确认connector.ack(batchId); } } catch (Exceptione) { e.printStackTrace(); log.error("成功断开监测连接!尝试重连"); } finally { connector.disconnect(); //防止频繁访问数据库链接: 线程睡眠 10秒try { Thread.sleep(10*1000); } catch (InterruptedExceptione) { e.printStackTrace(); } } } } /*** 解析binlog获得的实体类信息*/privatevoidhandleDATAChange(List<CanalEntry.Entry>entrys) { for (CanalEntry.Entryentry : entrys) { if (entry.getEntryType() ==CanalEntry.EntryType.TRANSACTIONBEGIN||entry.getEntryType() ==CanalEntry.EntryType.TRANSACTIONEND) { continue; } //RowChange对象,包含了一行数据变化的所有特征CanalEntry.RowChangerowChage; try { rowChage=CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exceptione) { thrownewRuntimeException("ERROR ## parser of eromanga-event has an error , com.canal.data:"+entry.toString(), e); } CanalEntry.EventTypeeventType=rowChage.getEventType(); log.info("Canal监测到更新:【{}】", entry.getHeader().getTableName()); switch (eventType) { caseDELETE: //todo :执行删除的业务System.out.println("DELETE"); break; caseINSERT: //todo :执行插入的业务System.out.println("INSERT"); break; caseUPDATE: //todo :执行更新的业务System.out.println("UPDATE"); break; default: break; } } }
RabbitMQ实现
canal配置修改
注意:一个service可以配置多个实例监听,默认自带exampl实例,增加实例可以复制exampl,修改里面的配置文件instance.properties
canal.properties
# rabbitMQcanal.serverMode = rabbitMQ ##################################################rabbitmq.host =127.0.0.1 rabbitmq.virtual.host =/ rabbitmq.exchange =canal.exchange rabbitmq.username =guest rabbitmq.password =guest rabbitmq.deliveryMode =2
instance.properties
canal.mq.topic=canal.routing.key
RabbitMQ配置
- 定义队列,队列名称:canal.queue
- 定义交换器:canal.exchange
- 设置路由键:canal.routing.key,进行绑定
- 启动canal,修改数据库数据,查看RabbitMQ是否接收message
代码实现
基础搭建:使用springboot+mybatis-plus搭建环境,jdk1.8,下列基础的配置和代码不做展示
pom文件:canal,rabbitmq依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency>
yaml文件:rabbitmq配置
spring rabbitmq host127.0.0.1 port5672 username guest password guest virtual-host /
常量类
publicclassRabbitMqConstant { /*** 交换器*/publicstaticfinalStringEXCHANGE_TOPIC="canal.exchange"; /*** 队列*/publicstaticfinalStringQUEUE_INFO="canal.queue"; /*** 路由键*/publicstaticfinalStringROUTING_KEY="canal.routing.key"; }
配置类:设置队列,交换器,进行绑定
publicclassExchangeConfig { publicTopicExchangeexchange() { returnnewTopicExchange(RabbitMqConstant.EXCHANGE_TOPIC, true, false, null); } publicQueueinfoQueue() { returnnewQueue(RabbitMqConstant.QUEUE_INFO, true, false, false); } publicBindinginfoBinding() { //链式写法,绑定交换机和队列,并设置匹配键returnBindingBuilder//绑定队列 .bind(infoQueue()) //到交换机 .to(exchange()).with(RabbitMqConstant.ROUTING_KEY); } }
配置类:配置rabbitmq
publicclassRabbitMqConfig { privateRabbitTemplaterabbitTemplate; /*** 自定义JSON消息序列化器, 默认就是json*/publicMessageConvertermessageConverter() { returnnewJackson2JsonMessageConverter(); } publicRabbitListenerContainerFactory<?>rabbitListenerContainerFactory(ConnectionFactoryconnectionFactory){ SimpleRabbitListenerContainerFactoryfactory=newSimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(newJackson2JsonMessageConverter()); returnfactory; } publicvoidconfigureRabbitTemplate() { // 比如在这里设置接收消息后的回调方法// rabbitTemplate.setConfirmCallback(new ConfirmCallbackImpl());// rabbitTemplate.setReturnsCallback(new ReturnsCallbackImpl()); } /*** 成功后的回调方法*/publicstaticclassConfirmCallbackImplimplementsConfirmCallback { /*** 实现confirm方法*/publicvoidconfirm(CorrelationDatacorrelationData, booleanack, Stringcause) { } } /*** 失败后的回调方法*/publicstaticclassReturnsCallbackImplimplementsReturnsCallback { publicvoidreturnedMessage(ReturnedMessagereturned) { } } }
接收类:接收rabbitmq收到的message,且通过在rabbitmq中可以看到是json格式
publicclassCanalMessage<T> { "type") (privateStringtype; "table") (privateStringtable; "data") (privateList<T>data; "database") (privateStringdatabase; "es") (privateLonges; "id") (privateIntegerid; "isDdl") (privateBooleanisDdl; "old") (privateList<T>old; "pkNames") (privateList<String>pkNames; "sql") (privateStringsql; "ts") (privateLongts; "sqlType") (privateTsqlType; "mysqlType") (privateTmysqlType; }
监听类:接收队列消息
publicclassInfoListener { queues=RabbitMqConstant.QUEUE_INFO) (publicvoiddata(CanalMessagemessage){ System.out.println(message); } }
参考文章:
https://blog.csdn.net/qq_36079912/article/details/104885048