分布式锁作用
分布式锁,主要用于分布式环境下多线程环境下保证线程安全,达到保证数据最终一致性
ZK排他锁
排他锁也叫写锁或者独占锁,在加锁期间只有持锁线程能访问,其他线程均需等待锁的释放;ZK通过数据节点来表示一个锁,在获取锁的时候,通过创建临时节点(例如/lock)来获取锁,如果创建成功则该线程获取到锁,如果创建不成功那么该线程会在/lock节点下注册一个新的节点顺序节点(依次递增),以确保锁的公平性,同时该节点会监听前一个节点,如果收到前一个节点的删除通知,判断自己是否为序号最小,如果是则获取锁成功,否则还是等待状态。(为什么不直接监听/lock节点,而是在/lock节点下创建顺序节点?是因为避免羊群效应,避免/lock节点删除之后其它节点都会收到监听信息)
实现:
可用Curator的InterProcessMutex分布式可重入排他锁实现
代码:
publicclassZkInterProcessMutex { privatestaticintcount=0; publicstaticvoidmain(String[] args) { StringznodeLock="/lock"; CuratorFrameworkzkClient=getZkClient(); InterProcessMutexlock=newInterProcessMutex(zkClient, znodeLock); for (inti=0; i<5; i++) { finalintj=i; newThread(newRunnable() { publicvoidrun() { try { lock.acquire(); System.out.println("当前线程: "+Thread.currentThread().getName() +", count: "+count++); Thread.sleep(1000); } catch (Exceptione) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exceptione) { e.printStackTrace(); } } } }, "Thread: "+i).start(); } } privatestaticCuratorFrameworkgetZkClient() { // zk ipStringzkServerAddress="192.168.0.138:2181"; // 重试策略RetryPolicyretryPolicy=newExponentialBackoffRetry(1000, 5, 5000); // 创建zk链接CuratorFrameworkzkClient=CuratorFrameworkFactory.builder() .connectString(zkServerAddress) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); zkClient.start(); returnzkClient; } }
ZK共享锁
共享锁也叫读锁,允许资源被同时读操作,但是不允许同时进行读写操作或者同时进行写操作。ZK会在/lock节点下面创建读、写顺序节点 ,读类型节点 “XXXXX-READ_0000000620”,写类型节点 “XXXXX-WRIT_0000000621”,对于读请求如果该节点是最小节点或者当前节点是读节点并且前面的最小节点也都是读节点,则可以获取到锁,如果比自己小的节点中有写请求,则需要等待锁;对于写请求,如果是最小节点则获取到锁,如果不是最小节点则需要等待锁。
实现:
可用Curator的InterProcessReadWriteLock分布式可重入排他锁实现
代码:
publicclassZkInterProcessReadWriteLock { privatestaticintcount=0; publicstaticvoidmain(String[] args) { StringznodeLock="/lock"; CuratorFrameworkzkClient=getZkClient(); InterProcessReadWriteLocklock=newInterProcessReadWriteLock(zkClient, znodeLock); for (inti=0; i<5; i++) { finalintj=i; newThread(newRunnable() { publicvoidrun() { try { if (j%2==0) { lock.writeLock().acquire(); System.out.println("当前线程: "+Thread.currentThread().getName() +", count: "+count++); } else { lock.readLock().acquire(); System.out.println("当前线程: "+Thread.currentThread().getName() +", count: "+count++); } Thread.sleep(1000); } catch (Exceptione) { e.printStackTrace(); } finally { try { if (j%2==0) { lock.writeLock().release(); } else { lock.readLock().release(); } } catch (Exceptione) { e.printStackTrace(); } } } }, "Thread: "+i).start(); } } privatestaticCuratorFrameworkgetZkClient() { // zk ipStringzkServerAddress="192.168.0.138:2181"; // 重试策略RetryPolicyretryPolicy=newExponentialBackoffRetry(1000, 5, 5000); // 创建zk链接CuratorFrameworkzkClient=CuratorFrameworkFactory.builder() .connectString(zkServerAddress) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); zkClient.start(); returnzkClient; } }