【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁

简介: 前言上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列中,我们一起写了下如何通过ZooKeeper的持久性顺序节点实现一个分布式队列。本文我们来一起写一个ZooKeeper的实现的分布式锁。

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列中,我们一起写了下如何通过ZooKeeper的持久性顺序节点实现一个分布式队列。
本文我们来一起写一个ZooKeeper的实现的分布式锁。

设计

参考之前学习的【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock,实现java.util.concurrent.locks.Lock接口。
我们通过重写接口中的方法实现一个可重入锁。

  • lock:请求锁,如果成功则直接返回,不成功则阻塞 直到获取锁。
  • lockInterruptibly:请求锁,如果失败则一直阻塞等待 直到获取锁或线程中断
  • tryLock:1、尝试获取锁,获取失败的话 直接返回false,不会再等待。2、尝试获取锁,获取成功返回true,否则一直请求,直到超时返回false
  • unlock:释放锁

我们使用ZooKeeper的EPHEMERAL临时节点机制,如果能创建成功的话,则获取锁成功,释放锁或客户端断开连接后,临时节点自动删除,这样可以避免误删除或漏删除的情况。

获取锁失败后,这里我们使用轮询的方式来不断尝试创建。其实应该使用Watcher机制来实现,这样能避免大量的无用请求。在下一节更优雅的分布式锁实现机制中我们会用到。

DistributedLock

public class DistributedLock implements Lock {
    private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class);

    //ZooKeeper客户端,进行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根节点名称
    private String dir;

    //加锁节点
    private String node;

    //ZooKeeper鉴权信息
    private List<ACL> acls;

    //要加锁节点
    private String fullPath;

    //加锁标识,为0时表示未获取到锁,每获取一次锁则加一,释放锁时减一。减到0时断开连接,删除临时节点。
    private volatile int state;

    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
    public DistributedLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        this.fullPath = dir.concat("/").concat(node);
        init();
    }

    private void init() {
        try {
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[DistributedLock#init] error : " + e.toString(), e);
        }
    }
}

lock

public void lock() {
    //通过state实现重入机制,如果已经获取锁,则将state++即可。
    if (addLockCount()) {
        return;
    }
    //一直尝试获取锁,知道获取成功
    for (;;) {
        try {
            //创建临时节点
            zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL);
            //第一次获取锁,state++,这里不需要使用加锁机制保证原子性,因为同一时间,最多只有一个线程能create节点成功。
            state++;
            break;
        } catch (InterruptedException ie) {
            //如果捕获中断异常,则设置当前线程为中断状态
            logger.error("[DistributedLock#lock] error : " + ie.toString(), ie);
            Thread.currentThread().interrupt();
        } catch (KeeperException ke) {
            //如果捕获到的异常是 节点已存在 外的其他异常,则设置当前线程为中断状态
            logger.error("[DistributedLock#lock] error : " + ke.toString(), ke);
            if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

lockInterruptibly

public void lockInterruptibly() throws InterruptedException {
    //通过state实现重入机制,如果已经获取锁,则将state++即可。
    if (addLockCount()) {
        return;
    }
    for (;;) {
        //如果当前线程为中断状态,则抛出中断异常
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        try {
            zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL);
            state++;
            break;
        } catch (InterruptedException ie) {
            //如果捕获中断异常,则设置当前线程为中断状态
            logger.error("[DistributedLock#lockInterruptibly] error : " + ie.toString(), ie);
            Thread.currentThread().interrupt();
        } catch (KeeperException ke) {
            //如果捕获到的异常是 节点已存在 外的其他异常,则设置当前线程为中断状态
            logger.error("[DistributedLock#lockInterruptibly] error : " + ke.toString(), ke);
            if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

tryLock

public boolean tryLock() {
    //通过state实现重入机制,如果已经获取锁,则将state++即可。
    if (addLockCount()) {
        return true;
    }
    //如果获取成功则返回true,失败则返回false
    try {
        zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL);
        state++;
        return true;
    } catch (Exception e) {
        logger.error("[DistributedLock#tryLock] error : " + e.toString(), e);
    }

    return false;
}


public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    //通过state实现重入机制,如果已经获取锁,则将state++即可。
    if (addLockCount()) {
        return true;
    }

    //如果尝试获取超时,则返回false
    long nanosTimeout = unit.toNanos(time);
    if (nanosTimeout <= 0L) {
        return false;
    }

    final long deadline = System.nanoTime() + nanosTimeout;
    for (;;) {
        //如果当前线程为中断状态,则抛出中断异常
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }

        //如果尝试获取超时,则返回false
        nanosTimeout = deadline - System.nanoTime();
        if (nanosTimeout <= 0L) {
            return false;
        }
        try {
            zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL);
            state++;
            return true;
        } catch (InterruptedException ie) {
            //如果捕获中断异常,则返回false
            logger.error("[DistributedLock#tryLock] error : " + ie.toString(), ie);
            return false;
        } catch (KeeperException ke) {
            //如果捕获到的异常是 节点已存在 外的其他异常,则返回false
            logger.error("[DistributedLock#tryLock] error : " + ke.toString(), ke);
            if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
                return false;
            }
        }
    }
}

unlock

public void unlock() {
    //通过state实现重入机制,如果已经获取锁,释放锁时,需要将state--。
    delLockCount();
    
    //如果state为0时,说明不再持有锁,需要将连接关闭,自动删除临时节点
    if (state == 0 && zooKeeper != null) {
        try {
            zooKeeper.close();
        } catch (InterruptedException e) {
            logger.error("[DistributedLock#unlock] error : " + e.toString(), e);
        }
    }
}

addLockCount

private boolean addLockCount() {
    //如果state大于0,即已持有锁,将state数量加一
    if (state > 0) {
        synchronized (this) {
            if (state > 0) {
                state++;
                return true;
            }
        }
    }
    return false;
}

delLockCount

private boolean delLockCount() {
    //如果state大于0,即还持有锁,将state数量减一
    if (state > 0) {
        synchronized (this) {
            if (state > 0) {
                state--;
                return true;
            }
        }
    }
    return false;
}

总结

上面就是一个通过ZooKeeper实现的分布式可重入锁,利用了临时节点的特性。源代码可见:aloofJr
其中有几个可以优化的点。

  • 轮询的方式换成Watcher机制
  • 可重入锁实现方式的优化
  • 所有线程竞争一个节点的创建,容易出现羊群效应,且是一种不公平的锁竞争模式

下节我们使用新的方式实现分布式锁来解决上面的几个问题,如果大家好的优化建议,欢迎一起讨论。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

目录
相关文章
|
9月前
|
人工智能 Kubernetes 数据可视化
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
本文回顾了一次关键词监测任务在容器集群中失效的全过程,分析了中转IP复用、调度节奏和异常处理等隐性风险,并提出通过解耦架构、动态IP分发和行为模拟优化采集策略,最终实现稳定高效的数据抓取与分析。
160 2
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
|
11月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
664 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
11月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
313 11
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
1505 0
分布式爬虫框架Scrapy-Redis实战指南
|
6月前
|
消息中间件 分布式计算 资源调度
《聊聊分布式》ZooKeeper与ZAB协议:分布式协调的核心引擎
ZooKeeper是一个开源的分布式协调服务,基于ZAB协议实现数据一致性,提供分布式锁、配置管理、领导者选举等核心功能,具有高可用、强一致和简单易用的特点,广泛应用于Kafka、Hadoop等大型分布式系统中。
|
7月前
|
存储 算法 安全
“卧槽,系统又崩了!”——别慌,这也许是你看过最通俗易懂的分布式入门
本文深入解析分布式系统核心机制:数据分片与冗余副本实现扩展与高可用,租约、多数派及Gossip协议保障一致性与容错。探讨节点故障、网络延迟等挑战,揭示CFT/BFT容错原理,剖析规模与性能关系,为构建可靠分布式系统提供理论支撑。
335 2
|
9月前
|
数据采集 缓存 NoSQL
分布式新闻数据采集系统的同步效率优化实战
本文介绍了一个针对高频新闻站点的分布式爬虫系统优化方案。通过引入异步任务机制、本地缓存池、Redis pipeline 批量写入及身份池策略,系统采集效率提升近两倍,数据同步延迟显著降低,实现了分钟级热点追踪能力,为实时舆情监控与分析提供了高效、稳定的数据支持。
365 1
分布式新闻数据采集系统的同步效率优化实战
|
10月前
|
数据采集 机器学习/深度学习 数据可视化
让回归模型不再被异常值"带跑偏",MSE和Cauchy损失函数在噪声数据环境下的实战对比
本文探讨了MSE与Cauchy损失函数在线性回归中的表现,特别是在含噪声数据环境下的差异。研究发现,MSE虽具良好数学性质,但对异常值敏感;而Cauchy通过其对数惩罚机制降低异常值影响,展现出更强稳定性。实验结果表明,Cauchy损失函数在处理含噪声数据时参数估计更接近真实值,为实际应用提供了更鲁棒的选择。
371 1
让回归模型不再被异常值"带跑偏",MSE和Cauchy损失函数在噪声数据环境下的实战对比
|
10月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
2696 7