首先说一个配置槽点 TCP配置模式没有清晰的说明,比如需要改哪些文件配置,都太笼统了遇到什么改什么,为什么不出一个快速配置的设置方式,https://github.com/alibaba/canal/wiki/QuickStart 这里的也是蜻蜓点水。
运行环境说明不清楚 zk(zookeeper)是非必须吧,这里面也要设置,不加说明,没配置canal也是可以启动成功
提供的示例测试不出想要的效果 canal容器运行 docker run -p 11111:11111 --name canal -v /usr/local/canal/canal.properties:/home/admin/canal-server/conf/canal.properties -v /usr/local/canal/example/instance.properties:/home/admin/canal-server/conf/example/instance.properties -d canal/canal-server
环境 host(宿主): 192.168.1.111 ---linux(centos7) docker: 172.17.0.4 3306 ---mysql docker: 172.17.0.5 11111 ---canal docker:172.17.0.6 2181 ---zookeeper
数据库操作 my.cnf配置 [mysqld] #解决Navicat打开很慢问题 skip-name-resolve ##同一局域网内注意要唯一 server-id=9999 ##开启二进制日志功能,可以随便取(关键) log-bin=mysql-bin #选择row模式(canal) binlog-format=ROW
创建一个数据库service_db 这个不写了。
创建sys_user表 DROP TABLE IF EXISTS sys_user
; CREATE TABLE sys_user
( id
int(11) NOT NULL AUTO_INCREMENT, name
varchar(50) NOT NULL, age
int(2) NOT NULL, PRIMARY KEY (id
) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4; 插入数据: insert into sys_user(name,age) VALUES('boonya',28);
Java代码 重点是想验证表操作事件,完整代码(主机的端口是可以访问的): ` package com.alibaba.otter.canal.example.event;
import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.protocol.Message;
public class SimpleCanalClinetEventTest {
public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.111"/AddressUtils.getHostIp()/, 11111), "example", "canal", "canal"); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".\.."); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); }
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; }
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
// 可以获取到数据库实例名称、日志文件、当前操作的表以及执行的增删改查的操作
String logFileName= entry.getHeader().getLogfileName();
long logFileOffset= entry.getHeader().getLogfileOffset();
String dbName=entry.getHeader().getSchemaName();
String tableName=entry.getHeader().getTableName();
System.out.println(String.format("=======> binlog[%s:%s] , name[%s,%s] , eventType : %s",
logFileName, logFileOffset,
dbName, tableName,
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
// 删除
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
// 新增
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } }
原提问者GitHub用户boonyachengdu
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。