Zookeeper——分布式锁的概念理解 & 应用举例

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: Zookeeper——分布式锁的概念理解 & 应用举例

文章目录:


1.前言

2.原生Zookeeper实现分布式锁 

3.Curator框架实现分布式锁案例

4.Zookeeper常见面试题

1.前言


什么叫做分布式锁呢?

比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。


2.原生Zookeeper实现分布式锁


代码中的注释我已经写的很详细了。

这其中用到了JUC中的CountDownLatch,可以参考:https://blog.csdn.net/weixin_43823808/article/details/120799251

package com.szh.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
/**
 *
 */
public class DistributedZkLock {
    private final String connectString = "192.168.40.130:2181";
    private final int sessionTimeout = 30000;
    private final ZooKeeper zk;
    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);
    private String currentNode;
    private String waitPath;
    public DistributedZkLock() throws IOException, InterruptedException, KeeperException {
        //获取zk连接
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //如果连接上zk,则connectLatch可以释放
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }
                //如果监听的节点已被删除,同时当前监听的节点路径与即将被监听节点的前一个节点路径相同,则waitLatch可以释放
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });
        //等待zk连接成功之后,程序则继续往下走,其他线程进入等待连接的状态
        connectLatch.await();
        //判断根节点 /locks 是否存在
        Stat stat = zk.exists("/locks", false);
        //如果根节点 /locks 不存在,则立马创建
        if (Objects.isNull(stat)) {
            //此 /locks 节点默认所有人均可访问,而且是永久性节点
            zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }
    //加锁
    public void zkLock() {
        try {
            //所谓加锁,就是在根节点/locks下创建对应的临时、带序号的节点
            currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            //睡一会,让结果更清晰
            Thread.sleep(100);
            //判断创建的节点是否是序号最小的节点,如果是,则获取到锁;如果不是,则监听它序号的前一个节点
            List<String> children = zk.getChildren("/locks", false);
            if (children.size() == 1) { //只有一个节点,则直接获取锁
                return;
            } else { //如果有多个节点,则需要判断谁的序号最小
                //先对获取的节点的list集合排序,确保从小到大的顺序
                Collections.sort(children);
                //获取节点名称  seq-00000000
                String thisNode = currentNode.substring("/locks/".length());
                //通过节点名称获取到它在list集合中的位置
                int index = children.indexOf(thisNode);
                if (index == -1) { //没数据,无意义
                    System.out.println("数据异常....");
                } else if (index == 0) { //说明此节点处于第一个位置,可以获取锁
                    return;
                } else { //非第一个位置,需要监听前一个节点的变化
                    //获取该节点序号的前一个节点
                    waitPath = "/locks/" + children.get(index - 1);
                    //监听,回调Watch的process方法
                    zk.getData(waitPath, true, new Stat());
                    //其他线程进入等待锁的状态
                    waitLatch.await();
                    return;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //解锁
    public void zkUnLock() {
        //删除节点即解锁
        try {
            zk.delete(this.currentNode, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}

下面是针对上面的一些方法的测试。 

package com.szh.case2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
 *
 */
public class DistributedZkLockTest {
    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
        //创建分布式锁1
        final DistributedZkLock lock1 = new DistributedZkLock();
        //创建分布式锁2
        final DistributedZkLock lock2 = new DistributedZkLock();
        //如下创建两个线程,模拟获取分布式锁的过程
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.zkLock();
                    System.out.println(Thread.currentThread().getName() + " 已启动,获取到锁....");
                    TimeUnit.MILLISECONDS.sleep(3000);
                    lock1.zkUnLock();
                    System.out.println(Thread.currentThread().getName() + "已释放锁....");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.zkLock();
                    System.out.println(Thread.currentThread().getName() + " 已启动,获取到锁....");
                    TimeUnit.MILLISECONDS.sleep(3000);
                    lock2.zkUnLock();
                    System.out.println(Thread.currentThread().getName() + "已释放锁....");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}


那么上面是我们自己手写的加锁、解锁的一些方法,其中也存在着很多问题。

1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch

2Watch 需要重复注册,不然就不能生效

3)开发的复杂性还是比较高的

4)不支持多节点删除和创建。需要自己去递归

所以就引出了下面的案例:👇👇👇

3.Curator框架实现分布式锁案例


Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。

详情请查看官方文档:https://curator.apache.org/index.html

要使用它,就需要在pom文件中添加相关依赖。

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.3.0</version>
        </dependency>
package com.szh.case3;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
/**
 *
 */
public class CuratorLockTest {
    public static void main(String[] args) {
        //创建分布式锁1
        InterProcessMutex lock1 = new InterProcessMutex(getZkClient(), "/locks");
        //创建分布式锁2
        InterProcessMutex lock2= new InterProcessMutex(getZkClient(), "/locks");
        //下面创建两个线程
        new Thread(() -> {
            try {
                lock1.acquire(); //获取锁
                System.out.println(Thread.currentThread().getName() + " 首次获取到锁....");
                lock1.acquire();
                System.out.println(Thread.currentThread().getName() + " 再次获取到锁....");
                TimeUnit.MILLISECONDS.sleep(5000);
                lock1.release(); //释放锁
                System.out.println(Thread.currentThread().getName() + " 首次释放锁....");
                lock1.release();
                System.out.println(Thread.currentThread().getName() + " 再次释放锁....");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                lock2.acquire();
                System.out.println(Thread.currentThread().getName() + " 首次获取到锁....");
                lock2.acquire();
                System.out.println(Thread.currentThread().getName() + " 再次获取到锁....");
                TimeUnit.MILLISECONDS.sleep(5000);
                lock2.release();
                System.out.println(Thread.currentThread().getName() + " 首次释放锁....");
                lock2.release();
                System.out.println(Thread.currentThread().getName() + " 再次释放锁....");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
    private static CuratorFramework getZkClient() {
        //客户端和服务器连接失败之后,多少秒之后再进行重试,以及重试的次数,5000ms之后重试,重试3次
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(5000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.40.130:2181")
                .connectionTimeoutMs(20000)
                .sessionTimeoutMs(30000)
                .retryPolicy(policy).build();
        //启动zk客户端
        client.start();
        System.out.println("zookeeper 启动成功....");
        return client;
    }
}

4.Zookeeper常见面试题


·       选举机制

               半数机制,超过半数的投票通过,即通过。

                       1)第一次启动选举规则:投票过半数时,服务器 id 大的胜出

                       2)第二次启动选举规则:EPOCH 大的直接胜出

                                                                       EPOCH 相同,事务 id 大的胜出

                                                                       事务 id 相同,服务器 id 大的胜出

·       生产集群安装多少 zk 合适?

               安装奇数台。

               生产经验:10 台服务器:3 zk

                                  20台服务器:5 zk

                                  100台服务器:11 zk

                                  200台服务器:11 zk

               服务器台数多:好处,提高可靠性;坏处:提高通信延时

·       常用命令lsgetcreatedelete

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
20天前
|
存储 SQL 分布式数据库
OceanBase 入门:分布式数据库的基础概念
【8月更文第31天】在当今的大数据时代,随着业务规模的不断扩大,传统的单机数据库已经难以满足高并发、大数据量的应用需求。分布式数据库应运而生,成为解决这一问题的有效方案之一。本文将介绍一款由阿里巴巴集团自主研发的分布式数据库——OceanBase,并通过一些基础概念和实际代码示例来帮助读者理解其工作原理。
69 0
|
2月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
19天前
|
存储 运维 应用服务中间件
阿里云分布式存储应用示例
通过阿里云EDAS,您可以轻松部署与管理微服务应用。创建应用时,使用`CreateApplication`接口基于模板生成新应用,并获得包含应用ID在内的成功响应。随后,利用`DeployApplication`接口将应用部署至云端,返回&quot;Success&quot;确认部署成功。当业务调整需下线应用时,调用`ReleaseApplication`接口释放资源。阿里云EDAS简化了应用全生命周期管理,提升了运维效率与可靠性。[相关链接]提供了详细的操作与返回参数说明。
|
22天前
|
机器学习/深度学习 分布式计算 PyTorch
大规模数据集管理:DataLoader在分布式环境中的应用
【8月更文第29天】随着大数据时代的到来,如何高效地处理和利用大规模数据集成为了许多领域面临的关键挑战之一。本文将探讨如何在分布式环境中使用`DataLoader`来优化大规模数据集的管理与加载过程,并通过具体的代码示例展示其实现方法。
28 1
|
26天前
|
运维 安全 Cloud Native
核心系统转型问题之保障云原生分布式转型中的基础设施和应用层面如何解决
核心系统转型问题之保障云原生分布式转型中的基础设施和应用层面如何解决
|
1月前
|
运维 负载均衡 算法
“分布式基础概念”全面解析,让你秒懂分布式系统!【一】
该博客文章全面解析了分布式系统的基础概念,包括微服务架构、集群与分布式的区别、节点定义、远程调用、负载均衡、服务注册与发现、配置中心、服务熔断与降级以及API网关,帮助读者快速理解分布式系统的关键组成部分和工作原理。
“分布式基础概念”全面解析,让你秒懂分布式系统!【一】
|
1月前
|
存储 分布式计算 数据处理
解释弹性分布式数据集(RDD)的概念
【8月更文挑战第13天】
60 4
|
1月前
|
Kubernetes 安全 云计算
分布式应用的终极革命:Distributionless,告别分布式烦恼!
【8月更文挑战第8天】探讨分布式应用的进化形态——Distributionless,一种使开发者聚焦业务逻辑而非系统细节的理念。借助容器化、云计算与自动化工具的进步,分布式应用的开发与管理变得简易。透过示例展现了使用Bazel构建及Kubernetes部署的流程,预示着Distributionless模式下的应用将更加高效、可靠与安全,引领未来分布式应用的发展趋势。
49 7
|
19天前
|
开发者 云计算 数据库
从桌面跃升至云端的华丽转身:深入解析如何运用WinForms与Azure的强大组合,解锁传统应用向现代化分布式系统演变的秘密,实现性能与安全性的双重飞跃——你不可不知的开发新模式
【8月更文挑战第31天】在数字化转型浪潮中,传统桌面应用面临新挑战。本文探讨如何融合Windows Forms(WinForms)与Microsoft Azure,助力应用向云端转型。通过Azure的虚拟机、容器及无服务器计算,可轻松解决性能瓶颈,满足全球用户需求。文中还提供了连接Azure数据库的示例代码,并介绍了集成Azure Storage和Functions的方法。尽管存在安全性、网络延迟及成本等问题,但合理设计架构可有效应对,帮助开发者构建高效可靠的现代应用。
15 0
|
19天前
|
UED 存储 数据管理
深度解析 Uno Platform 离线状态处理技巧:从网络检测到本地存储同步,全方位提升跨平台应用在无网环境下的用户体验与数据管理策略
【8月更文挑战第31天】处理离线状态下的用户体验是现代应用开发的关键。本文通过在线笔记应用案例,介绍如何使用 Uno Platform 优雅地应对离线状态。首先,利用 `NetworkInformation` 类检测网络状态;其次,使用 SQLite 实现离线存储;然后,在网络恢复时同步数据;最后,通过 UI 反馈提升用户体验。
33 0

热门文章

最新文章