开发者社区> 问答> 正文

TCP模式配置之后 服务正常启动 怎么实现订阅消费 获取不到数据

首先说一个配置槽点 TCP配置模式没有清晰的说明,比如需要改哪些文件配置,都太笼统了遇到什么改什么,为什么不出一个快速配置的设置方式,https://github.com/alibaba/canal/wiki/QuickStart 这里的也是蜻蜓点水。

运行环境说明不清楚 zk(zookeeper)是非必须吧,这里面也要设置,不加说明,没配置canal也是可以启动成功

提供的示例测试不出想要的效果 canal容器运行 docker run -p 11111:11111 --name canal -v /usr/local/canal/canal.properties:/home/admin/canal-server/conf/canal.properties -v /usr/local/canal/example/instance.properties:/home/admin/canal-server/conf/example/instance.properties -d canal/canal-server

环境 host(宿主): 192.168.1.111 ---linux(centos7) docker: 172.17.0.4 3306 ---mysql docker: 172.17.0.5 11111 ---canal docker:172.17.0.6 2181 ---zookeeper

数据库操作 my.cnf配置 [mysqld] #解决Navicat打开很慢问题 skip-name-resolve ##同一局域网内注意要唯一 server-id=9999 ##开启二进制日志功能,可以随便取(关键) log-bin=mysql-bin #选择row模式(canal) binlog-format=ROW

创建一个数据库service_db 这个不写了。

创建sys_user表 DROP TABLE IF EXISTS sys_user; CREATE TABLE sys_user ( id int(11) NOT NULL AUTO_INCREMENT, name varchar(50) NOT NULL, age int(2) NOT NULL, PRIMARY KEY (id) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4; 插入数据: insert into sys_user(name,age) VALUES('boonya',28);

Java代码 重点是想验证表操作事件,完整代码(主机的端口是可以访问的): ` package com.alibaba.otter.canal.example.event;

import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.protocol.Message;

public class SimpleCanalClinetEventTest {

public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.111"/AddressUtils.getHostIp()/, 11111), "example", "canal", "canal"); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".\.."); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); }

        connector.ack(batchId); // 提交确认
        // connector.rollback(batchId); // 处理失败, 回滚数据
    }

    System.out.println("empty too many times, exit");
} finally {
    connector.disconnect();
}

}

private static void printEntry(List entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; }

    RowChange rowChage = null;
    try {
        rowChage = RowChange.parseFrom(entry.getStoreValue());
    } catch (Exception e) {
        throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                   e);
    }

    EventType eventType = rowChage.getEventType();
    // 可以获取到数据库实例名称、日志文件、当前操作的表以及执行的增删改查的操作
    String logFileName= entry.getHeader().getLogfileName();
    long logFileOffset= entry.getHeader().getLogfileOffset();
    String dbName=entry.getHeader().getSchemaName();
    String tableName=entry.getHeader().getTableName();
    System.out.println(String.format("=======&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                     logFileName, logFileOffset,
                                     dbName, tableName,
                                     eventType));

    for (RowData rowData : rowChage.getRowDatasList()) {
        if (eventType == EventType.DELETE) {
        	// 删除
            printColumn(rowData.getBeforeColumnsList());
        } else if (eventType == EventType.INSERT) {
        	// 新增
            printColumn(rowData.getAfterColumnsList());
        } else {
            System.out.println("-------&gt; before");
            printColumn(rowData.getBeforeColumnsList());
            System.out.println("-------&gt; after");
            printColumn(rowData.getAfterColumnsList());
        }
    }
}

}

private static void printColumn(List columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } }

原提问者GitHub用户boonyachengdu

展开
收起
数据大拿 2023-05-04 18:26:55 105 0
1 条回答
写回答
取消 提交回答
  • AdminGuide里有完整参数的介绍

    原回答者GitHub用户agapple

    2023-05-05 10:51:25
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载