基于ZooKeeper的一种简单分布式锁的实现

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介:         ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务。基于ZooKeeper,我们可以实现一种简单的分布式互斥锁,包括可重入与不可重入。代码如下: import java.

        ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务。基于ZooKeeper,我们可以实现一种简单的分布式互斥锁,包括可重入与不可重入。代码如下:

import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class TestZooKeeperDistributeLock {

	// 是否为可重入锁
	private boolean reentrant = false;

	public boolean isReentrant() {
		return reentrant;
	}

	private ZooKeeper zk = null;

	public ZooKeeper getZk() {
		return zk;
	}

	public TestZooKeeperDistributeLock(boolean reentrant) {

		this.reentrant = reentrant;

		// 初始化环境:连接Zookeeper并创建根目录
		init();
	}

	private void init() {
		try {
			System.out.println("...");
			System.out.println("...");
			System.out.println("...");
			System.out.println("...");

			System.out.println("开始连接ZooKeeper...");

			// 创建与ZooKeeper服务器的连接zk
			String address = "192.168.1.226:2181";
			int sessionTimeout = 3000;
			zk = new ZooKeeper(address, sessionTimeout, new Watcher() {
				// 监控所有被触发的事件
				public void process(WatchedEvent event) {
					if (event.getType() == null || "".equals(event.getType())) {
						return;
					}
					System.out.println("已经触发了" + event.getType() + "事件!");
				}
			});

			System.out.println("ZooKeeper连接创建成功!");

			Thread.currentThread().sleep(1000l);

			System.out.println("...");
			System.out.println("...");
			System.out.println("...");
			System.out.println("...");

			// 创建根目录节点
			// 路径为/tmp_root_path
			// 节点内容为字符串"我是根目录/tmp_root_path"
			// 创建模式为CreateMode.PERSISTENT
			System.out.println("开始创建根目录节点/tmp_root_path...");
			zk.create("/tmp_root_path", "我是根目录/tmp_root_path".getBytes(),
					Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			System.out.println("根目录节点/tmp_root_path创建成功!");

			Thread.currentThread().sleep(1000l);

			System.out.println("...");
			System.out.println("...");
			System.out.println("...");
			System.out.println("...");
		} catch (Exception e) {
			zk = null;
		}
	}

	public void destroy() {

		// 删除根目录节点
		try {
			System.out.println("开始删除根目录节点/tmp_root_path...");
			zk.delete("/tmp_root_path", -1);
			System.out.println("根目录节点/tmp_root_path删除成功!");
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		} catch (KeeperException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}

		// 关闭连接
		if (zk != null) {
			try {
				zk.close();
				System.out.println("释放ZooKeeper连接成功!");

			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {

		final TestZooKeeperDistributeLock testZooKeeperDistributeLock = new TestZooKeeperDistributeLock(
				true);
		final Random radom = new Random();

		try {
			Thread.currentThread().sleep(1000l);
		} catch (InterruptedException e2) {
			// TODO Auto-generated catch block
			e2.printStackTrace();
		}
		
		ArrayList<Thread> threadList = new ArrayList<Thread>();
		
		for (int i = 0; i < 4; i++) {
			
			Thread thread = new Thread() {
				@Override
				public void run() {

					boolean locked = false;
					while (true) {
						try {
							// 创建需要获取锁的目录节点,创建成功则说明能够获取到锁,创建不成功,则说明锁已被其他线程(哪怕是不同进程的)获取
							// 路径为/tmp_root_path/lock
							// 节点内容为当前线程名
							// 创建模式为CreateMode.PERSISTENT
							System.out.println("线程"
									+ Thread.currentThread().getName()
									+ "尝试获取锁...");
							testZooKeeperDistributeLock.getZk()
									.create("/tmp_root_path/lock",
											Thread.currentThread().getName()
													.getBytes(),
											Ids.OPEN_ACL_UNSAFE,
											CreateMode.PERSISTENT);
							System.out.println("线程"
									+ Thread.currentThread().getName()
									+ "成功获取到锁!");

							locked = true;

							System.out.println("线程"
									+ Thread.currentThread().getName()
									+ "开始处理业务逻辑...");
							Thread.currentThread().sleep(
									3000 + radom.nextInt(3000));
							System.out.println("线程"
									+ Thread.currentThread().getName()
									+ "业务逻辑处理完毕!");

						} catch (Exception e) {

							if (testZooKeeperDistributeLock.isReentrant()) {
								try {
									String lockThread = new String(
											testZooKeeperDistributeLock
													.getZk()
													.getData(
															"/tmp_root_path/lock",
															false, null));
									if (lockThread != null) {

										// 当前线程与获取到的锁线程名一致,重入锁
										if (lockThread.equals(Thread
												.currentThread().getName())) {
											System.out.println("线程"
													+ Thread.currentThread()
															.getName()
													+ "成功重入锁!");

											locked = true;

											System.out.println("线程"
													+ Thread.currentThread()
															.getName()
													+ "开始处理业务逻辑...");
											Thread.currentThread().sleep(
													3000 + radom.nextInt(3000));
											System.out.println("线程"
													+ Thread.currentThread()
															.getName()
													+ "业务逻辑处理完毕!");
										} else {
											System.out.println("线程"
													+ Thread.currentThread()
															.getName()
													+ "尝试获取锁失败,锁被线程"
													+ lockThread + "占用!");
										}

									}
								} catch (KeeperException e1) {
									// TODO Auto-generated catch block
									e1.printStackTrace();
								} catch (InterruptedException e1) {
									// TODO Auto-generated catch block
									e1.printStackTrace();
								}

							} else {
								System.out.println("线程"
										+ Thread.currentThread().getName()
										+ "尝试获取锁失败!");
							}

							try {
								Thread.currentThread().sleep(
										3000 + radom.nextInt(3000));
							} catch (InterruptedException e1) {
								// TODO Auto-generated catch block
								e1.printStackTrace();
							}
						} finally {
							try {

								if (locked) {
									System.out.println("线程"
											+ Thread.currentThread().getName()
											+ "开始释放锁...");
									testZooKeeperDistributeLock.getZk().delete(
											"/tmp_root_path/lock", -1);
									System.out.println("线程"
											+ Thread.currentThread().getName()
											+ "成功释放锁!");

									Thread.currentThread().sleep(
											3000 + radom.nextInt(3000));
								}

							} catch (InterruptedException e) {
								// TODO Auto-generated catch block
								e.printStackTrace();
							} catch (KeeperException e) {
								// TODO Auto-generated catch block
								e.printStackTrace();
							} finally {
								locked = false;
							}
						}
					}

				}
			};
			
			threadList.add(thread);
			
			thread.start();
		}

		try {
			Thread.currentThread().sleep(1000 * 20);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		for(int i = 0; i < threadList.size(); i++){
			Thread thread = threadList.get(i);
			thread.stop();
		}

		// 释放资源
		testZooKeeperDistributeLock.destroy();

	}
}
        运行结果如下:

...
...
...
...
开始连接ZooKeeper...
ZooKeeper连接创建成功!
已经触发了None事件!
...
...
...
...
开始创建根目录节点/tmp_root_path...
根目录节点/tmp_root_path创建成功!
...
...
...
...
线程Thread-0尝试获取锁...
线程Thread-2尝试获取锁...
线程Thread-3尝试获取锁...
线程Thread-1尝试获取锁...
线程Thread-0成功获取到锁!
线程Thread-0开始处理业务逻辑...
线程Thread-1尝试获取锁失败,锁被线程Thread-0占用!
线程Thread-2尝试获取锁失败,锁被线程Thread-0占用!
线程Thread-3尝试获取锁失败,锁被线程Thread-0占用!
线程Thread-1尝试获取锁...
线程Thread-1尝试获取锁失败,锁被线程Thread-0占用!
线程Thread-2尝试获取锁...
线程Thread-2尝试获取锁失败,锁被线程Thread-0占用!
线程Thread-3尝试获取锁...
线程Thread-3尝试获取锁失败,锁被线程Thread-0占用!
线程Thread-0业务逻辑处理完毕!
线程Thread-0开始释放锁...
线程Thread-0成功释放锁!
线程Thread-2尝试获取锁...
线程Thread-2成功获取到锁!
线程Thread-2开始处理业务逻辑...
线程Thread-1尝试获取锁...
线程Thread-1尝试获取锁失败,锁被线程Thread-2占用!
线程Thread-3尝试获取锁...
线程Thread-3尝试获取锁失败,锁被线程Thread-2占用!
线程Thread-0尝试获取锁...
线程Thread-0尝试获取锁失败,锁被线程Thread-2占用!
线程Thread-2业务逻辑处理完毕!
线程Thread-2开始释放锁...
线程Thread-2成功释放锁!
线程Thread-3尝试获取锁...
线程Thread-1尝试获取锁...
线程Thread-3成功获取到锁!
线程Thread-3开始处理业务逻辑...
线程Thread-1尝试获取锁失败,锁被线程Thread-3占用!
线程Thread-0尝试获取锁...
线程Thread-0尝试获取锁失败,锁被线程Thread-3占用!
线程Thread-2尝试获取锁...
线程Thread-2尝试获取锁失败,锁被线程Thread-3占用!
线程Thread-3业务逻辑处理完毕!
线程Thread-3开始释放锁...
线程Thread-3成功释放锁!
线程Thread-0尝试获取锁...
线程Thread-0成功获取到锁!
线程Thread-0开始处理业务逻辑...
线程Thread-1尝试获取锁...
线程Thread-1尝试获取锁失败,锁被线程Thread-0占用!
线程Thread-2尝试获取锁...
线程Thread-2尝试获取锁失败,锁被线程Thread-0占用!
线程Thread-0开始释放锁...
开始删除根目录节点/tmp_root_path...
线程Thread-0成功释放锁!
根目录节点/tmp_root_path删除成功!
释放ZooKeeper连接成功!
        示例可能略显粗糙,但是大体原理就是这样。我们可以利用ZooKeeper上一个固定位置的节点有无,来判断锁是否被获取到。当某一线程来临时,如果节点不存在,则说明没有其他线程占用对应锁,调用ZooKeeper的create()方法创建节点,标识分布式锁已被当前线程占用。待业务处理完毕后,再调用ZooKeeper的delete()方法删除节点,则完成锁的释放。

        同时,我们可以对应节点写入线程名等区分线程唯一性的字段来标识锁被哪个线程占用。而当再有线程要获取锁时,如果为不可重入锁,无论哪个线程,即便是持有锁的线程本身,也得等锁释放后再获取锁,如果为可重入锁,则判断当前线程唯一性字段与对应节点中的数据是否一致即可。

        当然,也可以利用ZooKeeper的CreateMode中PERSISTENT_SEQUENTIAL,一种顺序自动编号的目录节点,检测某节点的子目录,来实现可以排队的优先级锁!这个留待以后我们再研究!


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
2月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
2月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
30天前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
40 2
|
30天前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
42 1
|
2月前
分布式-Zookeeper-数据订阅
分布式-Zookeeper-数据订阅
|
2月前
|
监控
分布式-Zookeeper-Zab协议
分布式-Zookeeper-Zab协议
|
30天前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
43 0
|
2月前
|
Java
分布式-Zookeeper-分布式锁
分布式-Zookeeper-分布式锁
|
2月前
|
存储 负载均衡 算法
分布式-Zookeeper-Master选举
分布式-Zookeeper-Master选举
|
22天前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?

热门文章

最新文章