Canal源码分析之启动时处理逻辑和主备切换机制

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
全局流量管理 GTM,标准版 1个月
简介: Canal源码分析之启动时处理逻辑和主备切换机制


canal主备切换机制架构图

 

源码分析

Canal的版本是1.0.3,先找到程序的入口点

/**
 * canal独立版本启动的入口类
 *
 */
public class CanalLauncher {
    ...
    public static void main(String[] args) {
        try {
            ...
            if (remoteConfigLoader != null) {
                remoteConfigLoader.startMonitor(new RemoteCanalConfigMonitor() {
                    @Override
                    public void onChange(Properties properties) {
                        try {
                            // 远程配置canal.properties修改重新加载整个应用
                            canalStater.destroy();
                            //从这里进入下面的类
                            canalStater.start(properties);
                        } catch (Throwable throwable) {
                            logger.error(throwable.getMessage(), throwable);
                        }
                    }
                });
            }
            ....
        } catch (Throwable e) {
            logger.error("## Something goes wrong when starting up the canal Server:", e);
        }
    }
}

可以看到启动时,通过配置文件判断是否需要将监听到的binlog数据写入mq

/**
 * Canal server 启动类
 *
 */
public class CanalStater {
    private CanalController     controller      = null;
    private CanalMQProducer     canalMQProducer = null;
    private Thread              shutdownThread  = null;
    private CanalMQStarter      canalMQStarter  = null;
    /**
     * 启动方法
     *
     * @param properties canal.properties 配置
     * @throws Throwable
     */
    synchronized void start(Properties properties) throws Throwable {
        String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
        //根据配置初始化mq的生产者
        if (serverMode.equalsIgnoreCase("kafka")) {
            canalMQProducer = new CanalKafkaProducer();
        } else if (serverMode.equalsIgnoreCase("rocketmq")) {
            canalMQProducer = new CanalRocketMQProducer();
        }
        if (canalMQProducer != null) {
             ...
            if ("true".equals(autoScan)) {
                String rootDir = CanalController.getProperty(properties, CanalConstants.CANAL_CONF_DIR);
                ...
            } else {
                String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
                System.setProperty(CanalConstants.CANAL_DESTINATIONS, destinations);
            }
        }
        logger.info("## start the canal server.");
        controller = new CanalController(properties);
        //在这里进入下一面的类
        controller.start();
        ...
        if (canalMQProducer != null) {
            canalMQStarter = new CanalMQStarter(canalMQProducer);
            MQProperties mqProperties = buildMQProperties(properties);
             //启动mq生产者
            canalMQStarter.start(mqProperties);
            controller.setCanalMQStarter(canalMQStarter);
        }
    }
     ...
}

下面看下这个类CanalController  

创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态

/**
 * canal调度控制器
 *
 */
public class CanalController {
    ....
    public void start() throws Throwable {
        ...
        for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
            final String destination = entry.getKey();
            InstanceConfig config = entry.getValue();
            // 创建destination的工作节点
            //创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状
            if (!embededCanalServer.isStart(destination)) {
                // HA机制启动
                ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                if (!config.getLazy() && !runningMonitor.isStart()) {
                    //从这里进入下一个类
                    runningMonitor.start();
                }
            }
            if (autoScan) {
                instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
            }
        }
        // 启动网络接口
        if (canalServer != null) {
            canalServer.start();
        }
    }
   ...
}

上面现实了canal的程序入口,下面重点来分析下ServerRunningMonitor 这个类看下cancal如何现实主备切换

一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行尝试创建新的同名节点,重新选出一个canal server启动instance.

/**
 * 针对server的running节点控制
 * 
 */
public class ServerRunningMonitor extends AbstractCanalLifeCycle {
    private BooleanMutex               mutex        = new BooleanMutex(false);
    // 当前服务节点状态信息
    private ServerRunningData          serverData;
    // 当前实际运行的节点状态信息
    private volatile ServerRunningData activeData;
    ...
    public ServerRunningMonitor(){
        // 创建父节点
        dataListener = new IZkDataListener() {
            public void handleDataChange(String dataPath, Object data) throws Exception {
                MDC.put("destination", destination);
                ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
                if (!isMine(runningData.getAddress())) {
                    mutex.set(false);
                }
                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
                    release = true;
                    releaseRunning();// 彻底释放mainstem
                }
                activeData = (ServerRunningData) runningData;
            }
            public void handleDataDeleted(String dataPath) throws Exception {
                MDC.put("destination", destination);
                //利用AQS自旋锁实现多线程下的无锁阻塞
                mutex.set(false);
                if (!release && activeData != null && isMine(activeData.getAddress())) {
                    // 如果上一次active的状态就是本机,则即时触发一下active抢占
                    initRunning();
                } else {
                    // 否则就是等待delayTime,避免因网络闪断或者zk异常,导致出现频繁的切换操作
                    // 具体场景: canal server所在的网络出现闪断,导致zookeeper认为session失 
                    // 效, 释放了running节点,此时canal server对应的jvm并未退出,(一种假死状 
                    // 态,非常特殊的情况)
                    delayExector.schedule(new Runnable() {
                        public void run() {
                            initRunning();
                        }
                    }, delayTime, TimeUnit.SECONDS);
                }
            }
        };
    }
     ...
    private void initRunning() {
        if (!isStart()) {
            return;
        }
        String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
        // 序列化
        byte[] bytes = JsonUtils.marshalToByte(serverData);
        try {
            mutex.set(false);
            zkClient.create(path, bytes, CreateMode.EPHEMERAL);
            activeData = serverData;
            processActiveEnter();// 触发一下事件
            mutex.set(true);
        } catch (ZkNodeExistsException e) {
            bytes = zkClient.readData(path, true);
            if (bytes == null) {// 如果不存在节点,立即尝试一次
                initRunning();
            } else {
                activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
            }
        } catch (ZkNoNodeException e) {
            zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
            initRunning();
        }
    }
    /**
     * 判断处于active的节点是不是本机实例
     * @param address
     * @return
     */
    private boolean isMine(String address) {
        return address.equals(serverData.getAddress());
    }
}

 

下面来做个下HA机制的测试

操作步骤

一.配置

把canal配置成HA的架构,配置如下

1.canal-master

canal.properties

canal.id = 2
canal.ip = 127.0.0.1
canal.port = 32121
canal.metrics.pull.port = 11112
canal.zkServers = 127.0.0.1:2181
#基于ZK记录解析位点,HA机制下必须启用
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal.mq.servers = 127.0.0.1:9092

Instace.properties

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=example
canal.mq.partition=0

2.canal-slave

canal.properties

canal.id = 3
canal.ip = 127.0.0.1
canal.port = 32122
canal.metrics.pull.port = 11113
canal.zkServers = 127.0.0.1:2181
#基于ZK记录解析位点,HA机制下必须启用
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal.mq.servers = 127.0.0.1:9092

Instace.properties

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=example
canal.mq.partition=0

两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置

高可用测试

1.分别启动canal-master和canal-slave

在zk里可以看到canal-master先创建running节点处于active,此时canal-master处于standby

2.截断表t_test

Kafka tool里这是出现了一条数据type为TRUNCATE

 

{

    "data": null,

    "database": "trade",

    "es": 1555560679000,

    "id": 1,

    "isDdl": true,

    "mysqlType": null,

    "old": null,

    "pkNames": null,

    "sql": "TRUNCATE `t_test`",

    "sqlType": null,

    "table": "t_test",

    "ts": 1555560920238,

    "type": "TRUNCATE"

}

3.通过navicat往mysql里插入数据

直接执行t_test.sql文件,往t_test表里按照id编号插入8000条数据(注意不要用navicat的数据同步功能可能会造成数据写入数据不是按照id递增的顺序,对测试结果的观察不大友好)

 

{
    "data": null,
    "database": "trade",
    "es": 1555560679000,  
    "id": 1,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "sql": "TRUNCATE `t_test`",
    "sqlType": null,
    "table": "t_test",
    "ts": 1555560920238,
    "type": "TRUNCATE"
}

4.模拟 canal server对应的jvm异常crash的情况

把canal-master对应得java进程强制kill掉,过了段时间可以看到running节点cid变成了3,就是canal-slave的id,说明canal完成了主从切换。

5.检查kafka里数据的正确性

 

通过新running(ctime = 2019-04-18 12:33:07) 节点的创建时间查找时间为2019-04-18 12:33:07附近的数据可以发现id为1392-1394的数据插入重复发到kafka里了

原因分析:canal的HA机制是通过把从mysql binlg拉取的位点信息异步写入zk里的,发生主从切换的时候canal-slave节点会zk里把该位点信息读取出来,并且从这个地方开始解析mysql的binlog日志。canal-master可能有些数据已经发到kafka里但是还来不及更新zk就挂掉了,所以就会出现数据重复发到kafka的情况

写入zk的cursor位点信息如下:

 

建议解决方案:HA机制可以保证拉取binlog的数据不丢失,但是会出现重复发送到kafka的情况,DELETE和UPDATE事件的记录消息并没影响,而INSERT事件需要根据原始数据的id进行幂等性判断

 

三.功能测试

1.执行批量更新的操作

UPDATE `t_test`   a
SET `net_balance` = '123.0000000000',
 `sma_committed` = '123.0000000000',
 `high_water_mark` = '2359539.0000000000',
 `version` = '2018-11-01'
WHERE
 a.create_time >= '2018-10-11 14:42:22'
AND  a.create_time <= '2019-02-13 04:28:23'

2.查看kafka里的数


offset=0的数据详情

结论:批量操作的DML语句是多条数据(具体数量并不确定)合在一起发一条kafka数据的

四.数据格式

Kafka收到canal解析的DML binlog日志格式

{
    "data": [
        {
            //修改,删除,新增后的数据
            "external_id": "20180522032329",
            "complete_time": "2018-05-22 20:20:12",
            "return_time": "2018-05-24 11:34:12"
        }
    ],
    "database": "数据库名",
    "es": 1555489722000,
    "id": 主键值,  
    "isDdl": false,
"mysqlType": {
        //字段格式
        "external_id": "varchar(32)",
        "complete_time": "datetime",
        "return_time": "datetime"
    },
    "old": [
        {
            //老的数据
            "complete_time": "2018-05-22 20:20:22",
            "return_time": "2018-05-24 11:34:10"
        }
    ],
"pkNames": [
        //主键值
        "external_id"
    ],
    "sql": "",
    "sqlType": {
        "external_id": 12,
        "complete_time": 93,
        "return_time": 93
},
    //表名
    "table": "return_time",
"ts": 1555489722742,
    //操作类型
    "type": "UPDATE"
}

Kafka收到canal解析的DDL binlog日志格式

{
    "data": null,
    "database": "trade",
    "es": 1555575860000,
    "id": 2832,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "sql": "ALTER TABLE `t_order`\r\nADD COLUMN `test`  varchar(255) NULL AFTER `is_bo_modified`",
    "sqlType": null,
    "table": "t_order",
    "ts": 1555575861188,
    "type": "ALTER"
}

 


相关文章
|
关系型数据库 流计算 PostgreSQL
关于PostgreSQL逻辑订阅中的复制状态
关于PostgreSQL逻辑订阅中的复制状态
2671 0
|
4月前
|
监控 NoSQL 算法
Redis问题之哨兵模式中的配置文件会在故障转移后发生什么变化如何解决
Redis问题之哨兵模式中的配置文件会在故障转移后发生什么变化如何解决
|
6月前
|
canal 缓存 关系型数据库
Canal实现0侵入同步缓存数据
Canal实现0侵入同步缓存数据
33 0
|
SQL 关系型数据库 MySQL
MySQL主从同步原理
MySQL主从同步原理
156 0
Java 最常见的面试题:zookeeper 怎么保证主从节点的状态同步?
Java 最常见的面试题:zookeeper 怎么保证主从节点的状态同步?
zookeeper实现分布式应用系统服务器上下线动态感知程序、监听机制与守护线程
zookeeper实现分布式应用系统服务器上下线动态感知程序、监听机制与守护线程
92 0
|
SQL 存储 关系型数据库
PostgreSQL 流复制搭建主从环境,同步和异步的解释,压力测试,主从角色切换|学习笔记
快速学习PostgreSQL 流复制搭建主从环境,同步和异步的解释,压力测试,主从角色切换
PostgreSQL 流复制搭建主从环境,同步和异步的解释,压力测试,主从角色切换|学习笔记
|
SpringCloudAlibaba API Nacos
Sentinel持久化模式
Sentinel持久化模式
Sentinel持久化模式
|
SQL 缓存 网络协议
深入理解MySQL主从复制原理以及集群部署流程
主从复制是指将主数据库的 DDL 和 DML 操作通过二进制日志传到从库服务器中,然后在从库上对这些日志重新执行(也叫重做),从而使得从库和主库的数据保持同步。 MySQL支持一台主库同时向多台从库进行复制, 从库同时也可以作为其他从服务器的主库,实现链状复制。
522 0
|
canal 存储 SQL
Canal 初次启动时如何定位同步位点(文末附流程图)
Canal 初次启动时如何定位同步位点(文末附流程图)
Canal 初次启动时如何定位同步位点(文末附流程图)
下一篇
无影云桌面