「从零单排canal 04」 启动模块deployer源码解析(二)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
全局流量管理 GTM,标准版 1个月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
简介: 「从零单排canal 04」 启动模块deployer源码解析(二)

3.CanalController


前面两个类都是比较清晰的,一个是入口类,一个是启动类,下面来看看核心逻辑所在的CanalController。


这里用了大量的匿名内部类实现接口,看起来有点头大,耐心慢慢剖析一下。


3.1 从构造器开始了解


整体初始化的顺序如下:


  • 构建PlainCanalConfigClient,用于用户远程配置的获取
  • 初始化全局配置,顺便把instance相关的全局配置初始化一下
  • 准备一下canal-server,核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq是不需要这个netty的)
  • 初始化zkClient
  • 初始化ServerRunningMonitors,作为instance 运行节点控制
  • 初始化InstanceAction,完成monitor机制。(监控instance配置变化然后调用ServerRunningMonitor进行处理)


这里有几个机制要详细介绍一下。


3.1.1 CanalServer两种模式


canalServer支持两种模式,CanalServerWithEmbedded和CanalServerWithNetty。

在构造器中初始化代码部分如下:


// 3.准备canal server
//note: 核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq
// 是不需要这个netty的)
ip = getProperty(properties, CanalConstants.CANAL_IP);
//省略一部分。。。
embededCanalServer = CanalServerWithEmbedded.instance();
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));
//省略一部分。。。
String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
    canalServer = CanalServerWithNetty.instance();
    canalServer.setIp(ip);
    canalServer.setPort(port);
}


embededCanalServer:类型为CanalServerWithEmbedded


canalServer:类型为CanalServerWithNetty


二者有什么区别呢?


都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。

关于这两种类型的实现,canal官方文档有以下描述:

104.jpg


说白了,就是我们可以不必独立部署canal server。在应用直接使用CanalServerWithEmbedded直连mysql数据库进行订阅。


如果觉得自己的技术hold不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。


在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServerWithEmbedded进行处理。因此CanalServerWithNetty就是一个马甲而已。CanalServerWithEmbedded才是核心。


因此,在构造器中,我们看到,


用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,


而ip和port被设置到CanalServerWithNetty中。


关于CanalServerWithNetty如何将客户端的请求委派给CanalServerWithEmbedded进行处理,我们将在server模块源码分析中进行讲解。

3.1.2 ServerRunningMonitor


在CanalController的构造器中,canal会为每一个destination创建一个Instance,每个Instance都会由一个ServerRunningMonitor来进行控制。而ServerRunningMonitor统一由ServerRunningMonitors进行管理。


ServerRunningMonitor是做什么的呢?


我们看下它的属性就了解了。它主要用来记录每个instance的运行状态数据的。


/**
* 针对server的running节点控制
*/
public class ServerRunningMonitor extends AbstractCanalLifeCycle {
    private static final Logger        logger       = LoggerFactory.getLogger(ServerRunningMonitor.class);
    private ZkClientx                  zkClient;
    private String                     destination;
    private IZkDataListener            dataListener;
    private BooleanMutex               mutex        = new BooleanMutex(false);
    private volatile boolean           release      = false;
    // 当前服务节点状态信息
    private ServerRunningData          serverData;
    // 当前实际运行的节点状态信息
    private volatile ServerRunningData activeData;
    private ScheduledExecutorService   delayExector = Executors.newScheduledThreadPool(1);
    private int                        delayTime    = 5;
    private ServerRunningListener      listener;
    public ServerRunningMonitor(ServerRunningData serverData){
        this();
        this.serverData = serverData;
    }
    //。。。。。
}


在创建ServerRunningMonitor对象时,首先根据ServerRunningData创建ServerRunningMonitor实例,之后设置了destination和ServerRunningListener。


ServerRunningListener是个接口,这里采用了匿名内部类的形式构建,实现了各个接口的方法。


主要为instance在当前server上的状态发生变化时调用。比如要在当前server上启动这个instance了,就调用相关启动方法,如果在这个server上关闭instance,就调用相关关闭方法。


具体的调用逻辑我们后面在启动过程中分析,这里大概知道下构造器中做了些什么就行了,主要就是一些启动、关闭的逻辑。


new Function<String, ServerRunningMonitor>() {
    public ServerRunningMonitor apply(final String destination) {
        ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
        runningMonitor.setDestination(destination);
        runningMonitor.setListener(new ServerRunningListener() {
            /**
             * note
             * 1.内部调用了embededCanalServer的start(destination)方法。
             * 这里很关键,说明每个destination对应的CanalInstance是通过embededCanalServer的start方法启动的,
             * 这样我们就能理解,为什么之前构造器中会把instanceGenerator设置到embededCanalServer中了。
             * embededCanalServer负责调用instanceGenerator生成CanalInstance实例,并负责其启动。
             *
             * 2.如果投递mq,还会直接调用canalMQStarter来启动一个destination
             */
            public void processActiveEnter() {
               //省略具体内容。。。
            }
            /**
             * note
             * 1.与开始顺序相反,如果有mqStarter,先停止mqStarter的destination
             * 2.停止embedeCanalServer的destination
             */
            public void processActiveExit() {
                //省略具体内容。。。
            }
            /**
             * note
             * 在Canalinstance启动之前,destination注册到ZK上,创建节点
             * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。
             * 此方法会在processActiveEnter()之前被调用
             */
            public void processStart() {
                //省略具体内容。。。
            }
            /**
             * note
             * 在Canalinstance停止前,把ZK上节点删除掉
             * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。
             * 此方法会在processActiveExit()之前被调用
             */
            public void processStop() {
                //省略具体内容。。。
            }
        });
        if (zkclientx != null) {
            runningMonitor.setZkClient(zkclientx);
        }
        // 触发创建一下cid节点
        runningMonitor.init();
        return runningMonitor;
    }
}

3.2 canalController的start方法


具体运行逻辑如下:


  • 在zk的/otter/canal/cluster目录下根据ip:port创建server的临时节点,注册zk监听器
  • 先启动embededCanalServer(会启动对应的监控)
  • 根据配置的instance的destination,调用runningMonitor.start() 逐个启动instance
  • 如果cannalServer不为空,启动canServer (canalServerWithNetty)


这里需要注意,canalServer什么时候为空?


如果用户选择了serverMode为mq,那么就不会启动canalServerWithNetty,采用mqStarter来作为server,直接跟mq集群交互。canalServerWithNetty只有在serverMode为tcp时才启动,用来跟canal-client做交互。


所以如果以后想把embeddedCanal嵌入自己的应用,可以考虑参考mqStarter的写法。后面我们在server模块中会做详细解析。


public void start() throws Throwable {
    // 创建整个canal的工作节点
    final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
    initCid(path);
    if (zkclientx != null) {
        this.zkclientx.subscribeStateChanges(new IZkStateListener() {
            public void handleStateChanged(KeeperState state) throws Exception {
            }
            public void handleNewSession() throws Exception {
                initCid(path);
            }
            @Override
            public void handleSessionEstablishmentError(Throwable error) throws Exception{
                logger.error("failed to connect to zookeeper", error);
            }
        });
    }
    // 先启动embeded服务
    embededCanalServer.start();
    // 尝试启动一下非lazy状态的通道
    for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
        final String destination = entry.getKey();
        InstanceConfig config = entry.getValue();
        // 创建destination的工作节点
        if (!embededCanalServer.isStart(destination)) {
            // HA机制启动
            ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
            if (!config.getLazy() && !runningMonitor.isStart()) {
                runningMonitor.start();
            }
        }
        //note:为每个instance注册一个配置监视器
        if (autoScan) {
            instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
        }
    }
    if (autoScan) {
        //note:启动线程定时去扫描配置
        instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
        //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一
        for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
            if (!monitor.isStart()) {
                monitor.start();
            }
        }
    }
    // 启动网络接口
    if (canalServer != null) {
        canalServer.start();
    }
}


我们重点关注启动instance的过程,也就是ServerRunningMonitor的运行机制,也就是HA启动的关键。


入口在runningMonitor.start()。


  • 如果zkClient != null,就用zk进行HA启动
  • 否则,就直接processActiveEnter启动,这个我们前面已经分析过了

public synchronized void start() {
    super.start();
    try {
        /**
         * note
         * 内部会调用ServerRunningListener的processStart()方法
         */
        processStart();
        if (zkClient != null) {
            // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
            String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
            zkClient.subscribeDataChanges(path, dataListener);
            initRunning();
        } else {
            /**
             * note
             * 内部直接调用ServerRunningListener的processActiveEnter()方法
             */
            processActiveEnter();// 没有zk,直接启动
        }
    } catch (Exception e) {
        logger.error("start failed", e);
        // 没有正常启动,重置一下状态,避免干扰下一次start
        stop();
    }
}


重点关注下HA启动方式,一般 我们都采用这种模式进行。


在集群模式下,可能会有多个canal server共同处理同一个destination,


在某一时刻,只能由一个canal server进行处理,处理这个destination的canal server进入running状态,其他canal server进入standby状态。


同时,通过监听对应的path节点,一旦发生变化,出现异常,可以立刻尝试自己进入running,保证了instace的 高可用!!


启动的重点还是在initRuning()。


利用zk来保证集群中有且只有 一个instance任务在运行。


  • 还构建一个临时节点的路径:/otter/canal/destinations/{0}/running
  • 尝试创建临时节点。
  • 如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。此时会抛出ZkNodeExistsException,进入catch代码块。
  • 如果创建成功,就说明没有其他server启动这个instance,可以创建


private void initRunning() {
    if (!isStart()) {
        return;
    }
    //note:还是一样构建一个临时节点的路径:/otter/canal/destinations/{0}/running
    String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
    // 序列化
    byte[] bytes = JsonUtils.marshalToByte(serverData);
    try {
        mutex.set(false);
        /**
         * note:
         * 尝试创建临时节点。如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。
         * 此时会抛出ZkNodeExistsException,进入catch代码块。
         */
        zkClient.create(path, bytes, CreateMode.EPHEMERAL);
        /**
         * note:
         * 如果创建成功,就开始触发启动事件
         */
        activeData = serverData;
        processActiveEnter();// 触发一下事件
        mutex.set(true);
        release = false;
    } catch (ZkNodeExistsException e) {
        /**
         * note:
         * 如果捕获异常,表示创建失败。
         * 就根据临时节点路径查一下是哪个canal-sever创建了。
         * 如果没有相关信息,马上重新尝试一下。
         * 如果确实存在,就把相关信息保存下来
         */
        bytes = zkClient.readData(path, true);
        if (bytes == null) {// 如果不存在节点,立即尝试一次
            initRunning();
        } else {
            activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
        }
    } catch (ZkNoNodeException e) {
        /**
         * note:
         * 如果是父节点不存在,那么就尝试创建一下父节点,然后再初始化。
         */
        zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
        initRunning();
    }
}


那运行中的HA是如何实现的呢,我们回头看一下


zkClient.subscribeDataChanges(path, dataListener);


对destination对应的running节点进行监听,一旦发生了变化,则说明可能其他处理相同destination的canal server可能出现了异常,此时需要尝试自己进入running状态。


dataListener是在ServerRunningMonitor的构造方法中初始化的,


包括节点发生变化、节点被删两种变化情况以及相对应的处理逻辑,如下 :


public ServerRunningMonitor(){
    // 创建父节点
    dataListener = new IZkDataListener() {
        /**
         * note:
         * 当注册节点发生变化时,会自动回调这个方法。
         * 我们回想一下使用过程中,什么时候可能 改变节点当状态呢?
         * 大概是在控制台中,对canal-server中正在运行的 instance做"停止"操作时,改变了isActive。
         * 可以 触发 HA。
         */
        public void handleDataChange(String dataPath, Object data) throws Exception {
            MDC.put("destination", destination);
            ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
            if (!isMine(runningData.getAddress())) {
                mutex.set(false);
            }
            if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
                releaseRunning();// 彻底释放mainstem
            }
            activeData = (ServerRunningData) runningData;
        }
        /**
         * note:
         * 如果其他canal instance出现异常,临时节点数据被删除时,会自动回调这个方法,此时当前canal instance要顶上去
         */
        public void handleDataDeleted(String dataPath) throws Exception {
            MDC.put("destination", destination);
            mutex.set(false);
            if (!release && activeData != null && isMine(activeData.getAddress())) {
                // 如果上一次active的状态就是本机,则即时触发一下active抢占
                initRunning();
            } else {
                // 否则就是等待delayTime,避免因网络异常或者zk异常,导致出现频繁的切换操作
                delayExector.schedule(new Runnable() {
                    public void run() {
                        initRunning();
                    }
                }, delayTime, TimeUnit.SECONDS);
            }
        }
    };
}


当注册节点发生变化时,会自动回调zkListener的handleDataChange方法。


我们回想一下使用过程中,什么时候可能 改变节点当状态呢?


就是在控制台中,对canal-server中正在运行的 instance做"停止"操作时,改变了isActive,可以 触发 HA。


如下图所示

105.jpg


4.admin的配置监控原理


我们现在采用admin做全局的配置控制。


那么每个canalServer是怎么监控配置的变化呢?


还记得上吗cananlController的start方法中对配置监视器的启动吗?


if (autoScan) {
        //note:启动线程定时去扫描配置
        instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
        //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一
        for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
            if (!monitor.isStart()) {
                monitor.start();
            }
        }
    }


这个就是关键的配置监控。


我们来看deployer模块中的monitor包了。

106.jpg


4.1 InstanceAction


是一个接口,有四个方法,用来获取配置后,对具体instance采取动作。


/**
* config配置变化后的动作
*
* @author jianghang 2013-2-18 下午01:19:29
* @version 1.0.1
*/
public interface InstanceAction {
    /**
     * 启动destination
     */
    void start(String destination);
    /**
     * 主动释放destination运行
     */
    void release(String destination);
    /**
     * 停止destination
     */
    void stop(String destination);
    /**
     * 重载destination,可能需要stop,start操作,或者只是更新下内存配置
     */
    void reload(String destination);
}


具体实现在canalController的构造器中实现了匿名类。


4.2 InstanceConfigMonitor


这个接口有两个实现,一个是基于spring的,一个基于manager(就是admin)。


我们看下基于manager配置的实现的ManagerInstanceConfigMonitor即可。

原理很简单。


  • 采用一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
  • 然后通过defaultAction去start
  • 这个start在canalController的构造器的匿名类中实现,会使用instance对应的runningMonitor做HA启动。具体逻辑上一小节已经详细介绍过了。


/**
* 基于manager配置的实现
*
* @author agapple 2019年8月26日 下午10:00:20
* @since 1.1.4
*/
public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle implements InstanceConfigMonitor, CanalLifeCycle {
    private long                        scanIntervalInSecond = 5;
    private InstanceAction              defaultAction        = null;
    /**
     * note:
     * 每个instance对应的instanceAction,实际上我们看代码发现都是用的同一个defaultAction
     */
    private Map<String, InstanceAction> actions  
    /**
     * note:
     * 每个instance对应的远程配置
     */
    private Map<String, PlainCanal>     configs                                      
    /**
     * note:
     * 一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
     */
    private ScheduledExecutorService    executor             = Executors.newScheduledThreadPool(1,
                                                                 new NamedThreadFactory("canal-instance-scan"));
    private volatile boolean            isFirst              = true;
    /**
     * note:
     * 拉取admin配置的client
     */
    private PlainCanalConfigClient      configClient;
//…
}


5.总结


deployer模块的主要作用:


1)读取canal.properties,确定canal instance的配置加载方式。如果使用了admin,那么还会定时拉取admin上的配置更新。

2)确定canal-server的启动方式:独立启动或者集群方式启动

3)利用zkClient监听canal instance在zookeeper上的状态变化,动态停止、启动或新增,实现了instance的HA

4)利用InstanceConfigMonitor,采用固定线程定时轮训admin,获取instance的最新配置

5)启动canal server,监听客户端请求


这里还有个非常有意思的问题没有展开说明,那就是CanalStarter里面的配置加载,通过ExtensionLoader类的相关实现,如何通过不同的类加载器,实现SPI,后面再分析吧。

目录
相关文章
|
27天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
27天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
27天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
3天前
|
自然语言处理 数据处理 索引
mindspeed-llm源码解析(一)preprocess_data
mindspeed-llm是昇腾模型套件代码仓,原来叫"modelLink"。这篇文章带大家阅读一下数据处理脚本preprocess_data.py(基于1.0.0分支),数据处理是模型训练的第一步,经常会用到。
16 0
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
110 2
|
3月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
93 0
|
3月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
79 0
|
3月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
79 0
|
3月前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
107 0
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
65 12

热门文章

最新文章

推荐镜像

更多