5.Watcher机制(二)WatchManager

简介: 本文深入分析ZooKeeper中WatchManager类的源码,介绍其核心属性watchTable与watch2Paths的映射关系,并详解size、addWatch、removeWatcher、triggerWatch及dumpWatches等同步方法的实现逻辑,揭示Watcher事件的注册、触发与管理机制。

一、前言

  前面已经分析了Watcher机制中的第一部分,即在org.apache.zookeeper下的相关类,接着来分析org.apache.zookeeper.server下的WatchManager类。

二、WatchManager源码分析

2.1 类的属性 

public class WatchManager {
    // Logger
    private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
    // watcher表
    private final HashMap<String, HashSet<Watcher>> watchTable =
        new HashMap<String, HashSet<Watcher>>();
    // watcher到节点路径的映射
    private final HashMap<Watcher, HashSet<String>> watch2Paths =
        new HashMap<Watcher, HashSet<String>>();
}

说明:WatcherManager类用于管理watchers和相应的触发器。watchTable表示从节点路径到watcher集合的映射,而watch2Paths则表示从watcher到所有节点路径集合的映射。

2.2 核心方法分析

1. size方法

public synchronized int size(){
    int result = 0;
    for(Set<Watcher> watches : watchTable.values()) { // 遍历watchTable所有的值集合(HashSet<Watcher>集合)
        // 每个集合大小累加
        result += watches.size();
    }
    // 返回结果
    return result;
}

说明:可以看到size方法是同步的,因此在多线程环境下是安全的,其主要作用是获取watchTable的大小,即遍历watchTable的值集合。

2. addWatch方法

public synchronized void addWatch(String path, Watcher watcher) {
    // 根据路径获取对应的所有watcher
    HashSet<Watcher> list = watchTable.get(path);
    if (list == null) { // 列表为空
        // don't waste memory if there are few watches on a node
        // rehash when the 4th entry is added, doubling size thereafter
        // seems like a good compromise
        // 新生成watcher集合
        list = new HashSet<Watcher>(4);
        // 存入watcher表
        watchTable.put(path, list);
    }
    // 将watcher直接添加至watcher集合
    list.add(watcher);
    // 通过watcher获取对应的所有路径
    HashSet<String> paths = watch2Paths.get(watcher);
    if (paths == null) { // 路径为空
        // cnxns typically have many watches, so use default cap here
        // 新生成hash集合
        paths = new HashSet<String>();
        // 将watcher和对应的paths添加至映射中
        watch2Paths.put(watcher, paths);
    }
    // 将路径添加至paths集合
    paths.add(path);
}

说明:addWatch方法同样是同步的,其大致流程如下

  ① 通过传入的path(节点路径)从watchTable获取相应的watcher集合,进入②

  ② 判断①中的watcher是否为空,若为空,则进入③,否则,进入④

  ③ 新生成watcher集合,并将路径path和此集合添加至watchTable中,进入④【类似缓存操作】

  ④ 将传入的watcher添加至watcher集合,即完成了path和watcher添加至watchTable的步骤,进入⑤

  ⑤ 通过传入的watcher从watch2Paths中获取相应的path集合,进入⑥

  ⑥ 判断path集合是否为空,若为空,则进入⑦,否则,进入⑧

  ⑦ 新生成path集合,并将watcher和paths添加至watch2Paths中,进入⑧

  ⑧ 将传入的path(节点路径)添加至path集合,即完成了path和watcher添加至watch2Paths的步骤。

综上:addWatche方法会将:

1.入参所对应的watcher添加到入参path所对应的全部Watcher集合中,如path下已有则添加,没有创建新的并添加进去;

2.入参所对应的path添加到入参watcher所对应给的所有路径集合中,如watcher对应路径为空则创建新的集合进行添加,非空将入参path直接添加进去。

3. removeWatcher方法  

public synchronized void removeWatcher(Watcher watcher) {
    // 从wach2Paths中移除watcher,并返回watcher对应的path集合
    HashSet<String> paths = watch2Paths.remove(watcher);
    if (paths == null) { // 集合为空,直接返回
        return;
    }
    for (String p : paths) { // 遍历路径集合
        // 从watcher表中根据路径取出相应的watcher集合
        HashSet<Watcher> list = watchTable.get(p);
        if (list != null) { // 若集合不为空
            // 从list中移除该watcher
            list.remove(watcher);
            if (list.size() == 0) { // 移除后list为空,则从watch表中移出
                watchTable.remove(p);
            }
        }
    }
}

说明:removeWatcher用作从watch2Paths和watchTable中中移除该watcher,其大致步骤如下

  ① 从watch2Paths中移除传入的watcher,并且返回该watcher对应的路径集合,进入②

  ② 判断返回的路径集合是否为空,若为空,直接返回,否则,进入③

  ③ 遍历②中的路径集合,对每个路径,都从watchTable中取出与该路径对应的watcher集合,进入④

  ④ 若③中的watcher集合不为空,则从该集合中移除watcher,并判断移除元素后的集合大小是否为0,若为0,进入⑤

  ⑤ 从watchTable中移除路径

4. triggerWatch方法

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    // 根据事件类型、连接状态、节点路径创建WatchedEvent
    WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    // watcher集合
    HashSet<Watcher> watchers;
    synchronized (this) { // 同步块
        // 从watcher表中移除path,并返回其对应的watcher集合
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) { // watcher集合为空
            if (LOG.isTraceEnabled()) { 
                ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                         "No watchers for " + path);
            }
            // 返回
            return null;
        }
        for (Watcher w : watchers) { // 遍历watcher集合
            // 根据watcher从watcher表中取出路径集合
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) { // 路径集合不为空
                // 则移除路径
                paths.remove(path);
            }
        }
    }
    for (Watcher w : watchers) { // 遍历watcher集合
        if (supress != null && supress.contains(w)) { // supress不为空并且包含watcher,则跳过
            continue;
        }
        // 进行处理
        w.process(e);
    }
    return watchers;
}

 说明:该方法主要用于触发watch事件,并对事件进行处理。其大致步骤如下

  ① 根据事件类型、连接状态、节点路径创建WatchedEvent,进入②

  ② 从watchTable中移除传入的path对应的键值对,并且返回path对应的watcher集合,进入③

  ③ 判断watcher集合是否为空,若为空,则之后会返回null,否则,进入④

  ④ 遍历②中的watcher集合,对每个watcher,从watch2Paths中取出path集合,进入⑤

  ⑤ 判断④中的path集合是否为空,若不为空,则从集合中移除传入的path。进入⑥

  ⑥ 再次遍历watcher集合,对每个watcher,若supress不为空并且包含了该watcher,则跳过,否则,进入⑦

  ⑦ 调用watcher的process方法进行相应处理,之后返回watcher集合。【这里的process具体怎么执行的呢

5. dumpWatches方法

public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
    if (byPath) { // 控制写入watchTable或watch2Paths
        for (Entry<String, HashSet<Watcher>> e : watchTable.entrySet()) { // 遍历每个键值对
            // 写入键
            pwriter.println(e.getKey());
            for (Watcher w : e.getValue()) { // 遍历值(HashSet<Watcher>)
                pwriter.print("\t0x");
                pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId()));
                pwriter.print("\n");
            }
        }
    } else {
        for (Entry<Watcher, HashSet<String>> e : watch2Paths.entrySet()) { // 遍历每个键值对
            // 写入"0x"
            pwriter.print("0x");
            pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));
            for (String path : e.getValue()) { // 遍历值(HashSet<String>)
                // 
                pwriter.print("\t");
                pwriter.println(path);
            }
        }
    }
}

  说明:dumpWatches用作将watchTable或watch2Paths写入磁盘。

三、总结

  WatchManager类用作管理watcher、其对应的路径以及触发器,其方法都是针对两个映射的操作。

相关文章
|
1天前
|
缓存 运维 监控
一场FullGC故障排查
本文记录了一次Java应用CPU使用率异常升高的排查过程。通过分析发现,问题根源为频繁Full GC导致CPU飙升,而Full GC是因用户上传的Excel数据被加载为大对象并长期驻留JVM内存所致。使用JProfiler分析堆内存,定位到List&lt;Map&lt;String, String&gt;&gt;结构造成内存膨胀,空间效率仅约13.4%。最终提出“治本”与“治标”两类解决方案:一是将大数据移出JVM内存,存入Redis;二是优化代码,及时清理无用字段以减小对象体积。文章总结了从监控识别、工具分析到根本解决的完整排查思路,对类似性能问题具有参考价值。(238字)
|
1天前
|
存储 缓存 负载均衡
Nacos注册中心
本文介绍Nacos的安装部署、服务注册中心整合、分级模型、负载均衡策略、权重控制、环境隔离及实例类型,详解其在微服务架构中的应用,帮助开发者掌握Nacos核心功能与最佳实践。
 Nacos注册中心
|
1天前
|
负载均衡 算法 架构师
Ribbon负载均衡
本文深入讲解Spring Cloud中Ribbon实现客户端负载均衡的原理,包括@LoadBalanced注解的作用、负载均衡策略分类与算法,以及如何自定义配置和优化首次调用延迟的饥饿加载机制,帮助读者全面理解微服务间的流量分发技术。
Ribbon负载均衡
|
1天前
|
Java Nacos Maven
Eureka服务注册与发现
本节介绍Eureka注册中心的搭建与使用,完成服务注册与发现功能,为后续Nacos替换做铺垫。
 Eureka服务注册与发现
|
2天前
|
安全 JavaScript
JeecgBoot介绍
JeecgBoot是一款基于代码生成器的低代码开发平台,支持零代码快速开发。采用SpringBoot2.x、Ant Design&Vue、Mybatis-plus等主流技术,前后端分离架构,集成Shiro、JWT安全控制,助力高效构建企业级应用。
JeecgBoot介绍
|
2天前
|
NoSQL 关系型数据库 Java
基础环境配置
项目开发环境要求JDK8+、Maven、Redis 3.2+、MySQL 5.7+,推荐使用Idea开发工具,需安装Lombok插件和JRebel热部署工具。技术栈基于SpringBoot、MybatisPlus、Shiro及SpringCloud Alibaba,适合构建微服务架构应用。
基础环境配置
|
1天前
|
数据库 前端开发 NoSQL
代码拉取与运行
本文档介绍JeecgBoot前后端项目部署流程,包含代码拉取(在线/离线)、数据库脚本导入、Idea工程配置、修改数据库与Redis连接、后端启动及前端Vue3项目运行步骤,附目录结构与关键配置说明,助您快速搭建开发环境。
代码拉取与运行
|
1天前
|
Dubbo IDE API
SpringCloud工程部署启动
本文介绍SpringCloud微服务工程搭建全过程,涵盖项目创建、模块配置、数据库部署及服务远程调用实现。通过两种方案快速搭建工程,使用RestTemplate完成服务间HTTP通信,并解析调用流程与设计思想,帮助开发者掌握微服务基础架构与协作机制。
|
1天前
|
数据采集 领域建模 数据库
领域模型图(数据架构/ER图)
本文介绍如何通过四色原型法进行领域建模,构建数据架构中的ER图。利用时标性(MI)、参与方-地点-物品(PPT)、角色(Role)和描述(DESC)四类原型,逐步从业务流程中提炼实体与关系,最终形成清晰的数据模型,助力系统设计。
|
1天前
|
uml C语言
系统时序图
时序图(Sequence Diagram)是UML中描述对象间消息传递时间顺序的交互图。横轴为对象,纵轴为时间,通过生命线、控制焦点和消息等元素展现动态协作过程,强调交互的时间先后关系,适用于建模并发与同步行为。