Zookeeper4.Watcher机制(一)

简介: 本文深入分析ZooKeeper的Watcher机制核心类与源码实现,涵盖Watcher接口、Event枚举(KeeperState、EventType)、WatchedEvent事件封装、ClientWatchManager及ZKWatchManager的管理逻辑,重点解析事件触发与Watcher通知机制,帮助理解ZooKeeper分布式协调中的状态监听与回调原理。

一、前言
  前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。
二、总体框图
  对于Watcher机制而言,主要涉及的类主要如下。   

说明:
Watcher 接口类型,其定义了process方法,需子类实现
Event 接口类型,Watcher的内部类,无任何方法
KeeperState 枚举类型,Event的内部类,表示Zookeeper所处的状态
EventType 枚举类型,Event的内部类,表示Zookeeper中发生的事件类型
WatchedEvent 表示对ZooKeeper上发生变化后的反馈,包含了KeeperState和EventType
ClientWatchManager 接口类型,表示客户端的Watcher管理者,其定义了materialized方法,需子类实现
ZKWatchManager Zookeeper的内部类,继承ClientWatchManager
MyWatcher ZooKeeperMain的内部类,继承Watcher
ServerCnxn 接口类型,继承Watcher,表示客户端与服务端的一个连接
WatchManager 管理Watcher
三、Watcher源码分析
3.1 内部类
  Event,接口类型,表示事件代表的状态,除去其内部类,其源码结构如下
public interface Watcher {

public interface Event {
    /**
     * Enumeration of states the ZooKeeper may be at the event
     */
    public enum KeeperState {

        @Deprecated
        Unknown (-1),

        Disconnected (0),

        @Deprecated
        NoSyncConnected (1),

        SyncConnected (3),

        AuthFailed (4),

        ConnectedReadOnly (5),

        SaslAuthenticated(6),

        Expired (-112);

        private final int intValue;     

        KeeperState(int intValue) {
            this.intValue = intValue;
        }

        public int getIntValue() {
            return intValue;
        }

        public static KeeperState fromInt(int intValue) {
            switch(intValue) {
                case   -1: return KeeperState.Unknown;
                case    0: return KeeperState.Disconnected;
                case    1: return KeeperState.NoSyncConnected;
                case    3: return KeeperState.SyncConnected;
                case    4: return KeeperState.AuthFailed;
                case    5: return KeeperState.ConnectedReadOnly;
                case    6: return KeeperState.SaslAuthenticated;
                case -112: return KeeperState.Expired;

                default:
                    throw new RuntimeException("Invalid integer value for conversion to KeeperState");
            }
        }
    }

    /**
     * Enumeration of types of events that may occur on the ZooKeeper
     */
    public enum EventType {
        None (-1),
        NodeCreated (1),
        NodeDeleted (2),
        NodeDataChanged (3),
        NodeChildrenChanged (4);

        private final int intValue;     

        EventType(int intValue) {
            this.intValue = intValue;
        }

        public int getIntValue() {
            return intValue;
        }

        public static EventType fromInt(int intValue) {
            switch(intValue) {
                case -1: return EventType.None;
                case  1: return EventType.NodeCreated;
                case  2: return EventType.NodeDeleted;
                case  3: return EventType.NodeDataChanged;
                case  4: return EventType.NodeChildrenChanged;

                default:
                    throw new RuntimeException("Invalid integer value for conversion to EventType");
            }
        }           
    }
}

}
说明:可以看到,Event接口并没有定义任何属性和方法,但其包含了KeeperState和EventType两个内部枚举类。
可以简化成:
public interface Event {}
3.2 接口方法  
abstract public void process(WatchedEvent event);
说明:其代表了实现Watcher接口时必须实现的的方法,即定义进行处理,WatchedEvent表示观察的事件。
四、Event源码分析(即3.1内部类)
4.1 内部类

  1. KeeperState 

public enum KeeperState { // 事件发生时Zookeeper的状态
/* Unused, this state is never generated by the server /
@Deprecated
// 未知状态,不再使用,服务器不会产生此状态
Unknown (-1),

/** The client is in the disconnected state - it is not connected
* to any server in the ensemble. */
// 断开
Disconnected (0),

/** Unused, this state is never generated by the server */
@Deprecated
// 未同步连接,不再使用,服务器不会产生此状态
NoSyncConnected (1),

/** The client is in the connected state - it is connected
* to a server in the ensemble (one of the servers specified
* in the host connection parameter during ZooKeeper client
* creation). */
// 同步连接状态
SyncConnected (3),

/**
* Auth failed state
*/
// 认证失败状态
AuthFailed (4),

/**
* The client is connected to a read-only server, that is the
* server which is not currently connected to the majority.
* The only operations allowed after receiving this state is
* read operations.
* This state is generated for read-only clients only since
* read/write clients aren't allowed to connect to r/o servers.
*/
// 只读连接状态
ConnectedReadOnly (5),

/**
* SaslAuthenticated: used to notify clients that they are SASL-authenticated,
* so that they can perform Zookeeper actions with their SASL-authorized permissions.
*/
// SASL认证通过状态
SaslAuthenticated(6),

/** The serving cluster has expired this session. The ZooKeeper
* client connection (the session) is no longer valid. You must
* create a new client connection (instantiate a new ZooKeeper
* instance) if you with to access the ensemble. */
// 过期状态
Expired (-112);

// 代表状态的整形值
private final int intValue;     // Integer representation of value
// for sending over wire


// 构造函数
KeeperState(int intValue) {
    this.intValue = intValue;
}

// 返回整形值
public int getIntValue() {
    return intValue;
}

// 从整形值构造相应的状态
public static KeeperState fromInt(int intValue) {
    switch(intValue) {
        case   -1: return KeeperState.Unknown;
        case    0: return KeeperState.Disconnected;
        case    1: return KeeperState.NoSyncConnected;
        case    3: return KeeperState.SyncConnected;
        case    4: return KeeperState.AuthFailed;
        case    5: return KeeperState.ConnectedReadOnly;
        case    6: return KeeperState.SaslAuthenticated;
        case -112: return KeeperState.Expired;

        default:
            throw new RuntimeException("Invalid integer value for conversion to KeeperState");
    }
}

}
说明:KeeperState是一个枚举类,其定义了在事件发生时Zookeeper所处的各种状态,其还定义了一个从整形值返回对应状态的方法fromInt。

  1. EventType 
    public enum EventType { // 事件类型
    // 无
    None (-1),
    // 结点创建
    NodeCreated (1),
    // 结点删除
    NodeDeleted (2),
    // 结点数据变化
    NodeDataChanged (3),
    // 结点子节点变化
    NodeChildrenChanged (4);

    // 代表事件类型的整形
    private final int intValue; // Integer representation of value
    // for sending over wire

    // 构造函数
    EventType(int intValue) {

     this.intValue = intValue;
    

    }

    // 返回整形
    public int getIntValue() {

     return intValue;
    

    }

    // 从整形构造相应的事件
    public static EventType fromInt(int intValue) {

     switch(intValue) {
         case -1: return EventType.None;
         case  1: return EventType.NodeCreated;
         case  2: return EventType.NodeDeleted;
         case  3: return EventType.NodeDataChanged;
         case  4: return EventType.NodeChildrenChanged;
    
         default:
             throw new RuntimeException("Invalid integer value for conversion to EventType");
     }
    

    }
    }
    说明:EventType是一个枚举类,其定义了事件的类型(如创建节点、删除节点等事件),同时,其还定义了一个从整形值返回对应事件类型的方法fromInt。
    五、WatchedEvent
    5.1 类的属性 
    public class WatchedEvent {
    // Zookeeper的状态
    final private KeeperState keeperState;
    // 事件类型
    final private EventType eventType;
    // 事件所涉及节点的路径
    private String path;
    }
    说明:WatchedEvent类包含了三个属性,分别代表事件发生时Zookeeper的状态、事件类型和发生事件所涉及的节点路径。
    5.2 构造函数
      1. public WatchedEvent(EventType eventType, KeeperState keeperState, String path)型构造函数 
    public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
    // 初始化属性
    this.keeperState = keeperState;
    this.eventType = eventType;
    this.path = path;
    }
      说明:构造函数传入了三个参数,然后分别对属性进行赋值操作。
      2. public WatchedEvent(WatcherEvent eventMessage)型构造函数  
    public WatchedEvent(WatcherEvent eventMessage) {
    // 从eventMessage中取出相应属性进行赋值
    keeperState = KeeperState.fromInt(eventMessage.getState());
    eventType = EventType.fromInt(eventMessage.getType());
    path = eventMessage.getPath();
    }
      说明:构造函数传入了WatcherEvent参数,之后直接从该参数中取出相应属性进行赋值操作。
    五总结:对于WatchedEvent类的方法而言,相对简单,包含了几个getXXX方法,用于获取相应的属性值。
    六、ClientWatchManager
    public Set materialize(Watcher.Event.KeeperState state,

                             Watcher.Event.EventType type, String path);
    

      说明:该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。
    七、ZKWatchManager(zookeeper内)
    7.1 类的属性
    private static class ZKWatchManager implements ClientWatchManager {

    // 数据变化的Watchers
    private final Map> dataWatches = new HashMap>();

    // 节点存在与否的Watchers
    private final Map> existWatches = new HashMap>();

    // 子节点变化的Watchers
    private final Map> childWatches = new HashMap>();
    }
     说明:ZKWatchManager实现了ClientWatchManager,并定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。
    7.2 核心方法分析

  2. materialize方法
    public Set materialize(Watcher.Event.KeeperState state,

                             Watcher.Event.EventType type,
                             String clientPath)
    

    {
    // 新生成结果Watcher集合
    Set result = new HashSet();

    switch (type) { // 确定事件类型

     case None: // 无类型
         // 添加默认Watcher
         result.add(defaultWatcher);
         // 是否需要清空(提取对zookeeper.disableAutoWatchReset字段进行配置的值、
         // Zookeeper的状态是否为同步连接)
         boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
             state != Watcher.Event.KeeperState.SyncConnected;
         // 同步块
         synchronized(dataWatches) { 
             for(Set<Watcher> ws: dataWatches.values()) {
                 // 添加至结果集合
                 result.addAll(ws);
             }
             if (clear) { // 是否需要清空
                 dataWatches.clear();
             }
         }
    
         // 同步块
         synchronized(existWatches) {  
             for(Set<Watcher> ws: existWatches.values()) {
                 // 添加至结果集合
                 result.addAll(ws);
             }
             if (clear) { // 是否需要清空
                 existWatches.clear();
             }
         }
    
         // 同步块
         synchronized(childWatches) { 
             for(Set<Watcher> ws: childWatches.values()) {
                 // 添加至结果集合
                 result.addAll(ws);
             }
             if (clear) { // 是否需要清空
                 childWatches.clear();
             }
         }
         // 返回结果
         return result;
     case NodeDataChanged: // 节点数据变化
     case NodeCreated: // 创建节点
         synchronized (dataWatches) { // 同步块
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(dataWatches.remove(clientPath), result);
         }
         synchronized (existWatches) { 
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(existWatches.remove(clientPath), result);
         }
         break;
     case NodeChildrenChanged: // 节点子节点变化
         synchronized (childWatches) {
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(childWatches.remove(clientPath), result);
         }
         break;
     case NodeDeleted: // 删除节点
         synchronized (dataWatches) { 
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(dataWatches.remove(clientPath), result);
         }
         // XXX This shouldn't be needed, but just in case
         synchronized (existWatches) {
             // 移除clientPath对应的Watcher
             Set<Watcher> list = existWatches.remove(clientPath);
             if (list != null) {
                 // 移除clientPath对应的Watcher后全部添加至结果集合
                 addTo(existWatches.remove(clientPath), result);
                 LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
             }
         }
         synchronized (childWatches) {
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(childWatches.remove(clientPath), result);
         }
         break;
     default: // 缺省处理
         String msg = "Unhandled watch event type " + type
             + " with state " + state + " on path " + clientPath;
         LOG.error(msg);
         throw new RuntimeException(msg);
    

    }

    // 返回结果集合
    return result;
    }
    说明:该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作:
    如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;
    针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。
    八、总结
      针对Watcher机制的第一部分的源码分析就已经完成,本章节需重点关注:
    ● 事件的变化,状态的定义依赖于Event内部类的两组枚举值
    ● 上下游调用关系图需记忆一下,为加强记忆,再最后再贴一下

相关文章
|
3天前
|
存储 缓存 并行计算
LMCache:基于KV缓存复用的LLM推理优化方案
LMCache推出KV缓存持久化方案,显著优化大模型推理首Token延迟(TTFT)。通过将KV缓存存储至GPU、CPU或磁盘,实现跨请求复用,支持任意位置文本匹配,与vLLM深度集成,多轮对话、RAG场景提速3-10倍,降低硬件压力,提升吞吐。开源支持Linux/NVIDIA,正拓展AMD及更多生态支持。
73 6
LMCache:基于KV缓存复用的LLM推理优化方案
|
1月前
|
人工智能 并行计算 算法
为什么 OpenSearch 向量检索能提速 13 倍?
本文介绍在最新的 OpenSearch 实践中,引入 GPU 并行计算能力 与 NN-Descent 索引构建算法,成功将亿级数据规模下的向量索引构建速度提升至原来的 13 倍。
603 24
为什么 OpenSearch 向量检索能提速 13 倍?
|
10天前
|
人工智能 安全 开发者
解构AI时代的“深圳答案”:以硬实力构建“护城河”
2025年,深圳以“昇腾+光明实验室+华为”协同模式,打造国产AI算力生态。不同于追逐应用热点,深圳聚焦底层突破,构建从芯片到应用的全栈自主链条,通过政企联动、产学研协同,形成“技术攻关—场景验证—迭代优化”闭环,推动算力高效利用与产业深度融合,为全球AI发展提供安全可控的“中国方案”。
78 15
|
8天前
|
机器学习/深度学习 存储 SQL
当系统“情绪化”时:基于 OpenTelemetry 的异常检测与自适应采样,原来可以这么玩!
当系统“情绪化”时:基于 OpenTelemetry 的异常检测与自适应采样,原来可以这么玩!
72 12
|
13天前
|
人工智能 数据可视化 知识图谱
NanoBanana pro真的强嘛?我试了试结果...........【附带工具+Prompt双邪修玩法】
小阁带你体验Nano Banana Pro!1:1动漫转真人、老照片修复、文生图、漫画上色翻译……效果惊艳,中文理解超强。阁下AI全球首发集成该模型,打造AI创作新境界,一键生成爆款内容,重新定义你的生产力!
|
26天前
|
弹性计算 运维 监控
测评:阿里云轻量应用服务器深度评测,适合新手建站使用吗?
阿里云轻量应用服务器专为新手建站设计,预装WordPress等应用,5分钟快速上线。价格优惠,38元起/年,含200M高带宽,打包计费无隐性费用。集成防火墙、监控等简化运维功能,适合个人博客、小型电商等低并发场景,是入门上云的理想选择。(238字)
386 154
|
13天前
|
存储 机器学习/深度学习 人工智能
基于反馈循环的自我进化AI智能体:原理、架构与代码实现
自我进化智能体突破传统AI静态局限,通过“执行-反馈-调整”闭环,实现持续自主优化。它结合大模型与在线学习,利用多评分器反馈自动改进提示或参数,无需人工干预。适用于医疗、金融、编程等动态场景,推动AI迈向终身学习。
142 12
基于反馈循环的自我进化AI智能体:原理、架构与代码实现
|
8天前
|
数据采集 弹性计算 供应链
包年包月、按量付费和抢占式实例有什么区别?阿里云ECS付费类型如何选择?
阿里云ECS提供三种付费模式:包年包月适合长期稳定使用,价格优惠且支持备案;按量付费按小时计费,灵活但成本较高,适合短期或突发业务;抢占式实例价格低至1折,但可能被释放,仅推荐用于无状态应用。根据业务需求选择合适模式可优化成本与稳定性。
64 20
|
12天前
|
存储 监控 Cloud Native
云原生日志监控体系怎么做才不崩?一篇给你讲透采集、存储、分析、告警的最佳实践
云原生日志监控体系怎么做才不崩?一篇给你讲透采集、存储、分析、告警的最佳实践
101 16