大多数互联网系统都是分布式部署的,分布式部署确实能带来性能和效率上的提升,但为此,我们就需要多解决一个分布式环境下,数据一致性的问题 在单机环境中,我们可以通过java提供的并发API(lock、syn等)来解决,而在分布式环境下要复杂的多(跨JVM),常见的方案是分布式事物、分布式锁等。
Zookeeper实现分布式锁
业务场景和产生的问题
在分布式环境下,生产全局订单号,由于多个客户端不能实现同步,在分布式场景下使用时间戳生产订单号可能会重复 解决方案:
- 使用分布式锁
- 提前生产好订单号,存放到redis,然后从redis中取
Zookeeper基于同名节点的分布式锁
Zookeeper的强一致性能够很好地保证在分布式高并发情况下节点的创建能够保证全局唯一性,利用Zookeeper的这个特性,实现排它锁。
- 定义锁:Zookeeper上的节点表示一个锁
- 获取锁:利用zkClient客户端调用create方法创建一个临时节点(锁),创建成功的客户端获得了锁
- 监控锁:同时在该节点上注册Watcher监听,实时监听该节点的变更情况
- 释放锁:当前获得锁的客户端宕机或者异常,Zookeeper上这个临时节点就会被删除;客户端主动删除该临时节点
具体实现:
- 创建一个Lock接口,这个并不是JDK提供的,只是模拟了该接口里的方法自定义实现,按需使用。
package com.ooliuyue.zookeeperlock.zk; public interface Lock { public void getLock(); public void unLock(); } 复制代码
- 创建抽象类AbstratcLock实现Lock接口(模板)
package com.ooliuyue.zookeeperlock.zk; /** * @Auther: ly * @Date: 2019/4/25 11:31 */ public abstract class AbstracLock implements Lock { public void getLock() { //尝试获得锁资源 if (tryLock()) { System.out.println("##获取Lock锁的资源##"); } else { //等待(监控) waitLock(); //重新获取锁 getLock(); } } public abstract boolean tryLock(); public abstract void waitLock(); } 复制代码
- 创建类ZookeeperAbstractLock(重复代码写入子类), 用于配置Zookeeper
package com.ooliuyue.zookeeperlock.zk; import org.I0Itec.zkclient.ZkClient; /** * @Auther: ly * @Date: 2019/4/25 11:41 */ //将重复代码写入子类中 public abstract class ZookeeperAbstractLock extends AbstracLock { //zk连接地址 private static final String CONNECTSTRING = "127.0.0.1:2181"; //创建zk连接 protected ZkClient zkClient = new ZkClient(CONNECTSTRING); protected static final String PATH = "/lock"; protected static final String PATH2 = "/lock2"; } 复制代码
- zookeeper实现分布式锁的业务逻辑
package com.ooliuyue.zookeeperlock.zk; import org.I0Itec.zkclient.IZkDataListener; import java.util.concurrent.CountDownLatch; /** * @Auther: ly * @Date: 2019/4/25 11:45 */ public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock { private CountDownLatch countDownLatch = null; @Override //尝试获得锁 public boolean tryLock() { try { zkClient.createEphemeral(PATH); System.out.println("当前线程获取锁" + PATH + "成功"); return true; } catch (Exception e) { //如果创建失败抛出异常 // e.printStackTrace(); return false; } } @Override public void waitLock() { /*当前节点数据内容或版本发生变化或者当前节点被删除,触发当前接口 */ IZkDataListener iZkDataListener = new IZkDataListener() { @Override public void handleDataChange(String path, Object o) throws Exception { } @Override /* 删除当前节点,触发该接口通知。此时,path即当前节点路径 */ public void handleDataDeleted(String path) throws Exception { //唤醒被等待的线程 if (countDownLatch != null ) { countDownLatch.countDown(); } } }; //监听节点PATH的变化 zkClient.subscribeDataChanges(PATH,iZkDataListener); //如果节点存在 if (zkClient.exists(PATH)) { countDownLatch = new CountDownLatch(1); try { //等待,一直到监听器发起删除节点的通知,表示锁释放 countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //删除监听 zkClient.unsubscribeDataChanges(PATH,iZkDataListener); } public void unLock() { //释放锁 if (zkClient != null) { zkClient.delete(PATH); zkClient.close(); System.out.println("释放锁资源。。"); } } } 复制代码
- 创建测试类OrderService,模拟10个线程并发生产订单
package com.ooliuyue.zookeeperlock.zk; import java.util.List; import java.util.Vector; import java.util.concurrent.CountDownLatch; /** * @Auther: ly * @Date: 2019/4/25 14:39 */ public class OrderService implements Runnable { private OrderNumGenerator orderNumGenerator = new OrderNumGenerator(); private Lock lock = new ZookeeperDistrbuteLock(); @Override public void run() { getNum(); } public void getNum() { try { lock.getLock(); String number = orderNumGenerator.getNumber(); System.out.println(Thread.currentThread().getName() + ",生产订单ID:" + number); } catch (Exception e) { // e.printStackTrace(); } finally { lock.unLock(); } } public static void main(String[] args) { System.out.println("##生产唯一订单号##"); for (int i = 0; i < 10 ; i++) { new Thread(new OrderService()).start(); } } } 复制代码
- 生成订单的类
package com.ooliuyue.zookeeperlock.zk; import java.text.SimpleDateFormat; import java.util.Date; /** * @Auther: ly * @Date: 2019/4/25 14:34 */ public class OrderNumGenerator { private static int count = 0; public String getNumber() { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss"); return simpleDateFormat.format(new Date()) + "-" + ++count; } } 复制代码
运行main函数,控制台输出结果
##生产唯一订单号## 当前线程获取锁/lock成功 ##获取Lock锁的资源## Thread-3,生产订单ID:2019-04-28-11-42-53-1 释放锁资源。。 当前线程获取锁/lock成功 ##获取Lock锁的资源## Thread-7,生产订单ID:2019-04-28-11-42-53-2 释放锁资源。。 当前线程获取锁/lock成功 ##获取Lock锁的资源## Thread-9,生产订单ID:2019-04-28-11-42-53-3 释放锁资源。。 当前线程获取锁/lock成功 ##获取Lock锁的资源## Thread-11,生产订单ID:2019-04-28-11-42-53-4 释放锁资源。。 当前线程获取锁/lock成功 ##获取Lock锁的资源## Thread-1,生产订单ID:2019-04-28-11-42-53-5 释放锁资源。。 当前线程获取锁/lock成功 ##获取Lock锁的资源## Thread-15,生产订单ID:2019-04-28-11-42-53-6 当前线程获取锁/lock成功 ##获取Lock锁的资源## Thread-17,生产订单ID:2019-04-28-11-42-53-7 释放锁资源。。 当前线程获取锁/lock成功 ##获取Lock锁的资源## Thread-19,生产订单ID:2019-04-28-11-42-53-8 释放锁资源。。 当前线程获取锁/lock成功 ##获取Lock锁的资源## Thread-13,生产订单ID:2019-04-28-11-42-53-9 释放锁资源。。 当前线程获取锁/lock成功 ##获取Lock锁的资源## Thread-5,生产订单ID:2019-04-28-11-42-53-10 释放锁资源。。 复制代码
思路就是通过监听机制,当节点被删除时,表示锁释放,其他线程去获取锁。 这个方式是有问题的,当锁释放时,会有很多线程同时去获取锁,羊群效应。性能会比较差,不过实现起来简单。
Zookeeper基于临时顺序节点实现分布式锁
具体实现
- 多个进程访问共享资源时,为它们在一个父节点下分别创建临时有序节点
- 判断创建的节点是否是所有节点中序号最小的
- 如果是序号最小的节点,对应的进程获得锁。否则通过watcher机制监听比自己小的那个节点(这里需要进行阻塞),当收到删除的通知,然后去获取锁
- 释放锁的时候删除当前节点
代码如下:
package com.ooliuyue.zookeeperlock.zk; import org.I0Itec.zkclient.IZkDataListener; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; /** * @Auther: ly * @Date: 2019/4/26 10:55 */ public class ZookeeperDistrbuteLock2 extends ZookeeperAbstractLock { private CountDownLatch countDownLatch = null; private String beforePath; //当前线程的节点前一个节点 private String currentPath; //当前线程的节点 public ZookeeperDistrbuteLock2(){ if (!this.zkClient.exists(PATH2)) { this.zkClient.createPersistent(PATH2); } } @Override public boolean tryLock() { //如果currentPath为空则为当前线程尝试加锁 if (currentPath == null || currentPath.length() <= 0) { //创建一个临时顺序节点 currentPath = this.zkClient.createEphemeralSequential(PATH2 + '/',"lock"); System.out.println("当前线程的锁为===" + currentPath); } //获取所有临时节点并排序,临时节点名称为自增长字符串如:/lock2/0000000004 List<String> children = this.zkClient.getChildren(PATH2); Collections.sort(children); //如果当前线程节点在所有节点中排名第一则获取锁成功 if (currentPath.equals(PATH2 + '/' + children.get(0))) { System.out.println(currentPath + "===节点获取锁成功"); return true; } else { //如果当前线程节点在所有节点中不是排第一,则获取前面的节点名称,并赋值给beforePath int i = Collections.binarySearch(children, currentPath.substring(7)); beforePath = PATH2 + '/' + children.get(i - 1); } return false; } @Override public void waitLock() { IZkDataListener iZkDataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override public void handleDataDeleted(String s) throws Exception { if (countDownLatch != null) { countDownLatch.countDown(); } } }; //给排在前面的节点增加数据删除的watcher,(启动另外一个线程去监听它) System.out.println(currentPath + "===监控beforPath===" + beforePath); this.zkClient.subscribeDataChanges(beforePath,iZkDataListener); if (this.zkClient.exists(beforePath)) { countDownLatch = new CountDownLatch(1); try { //监听线程执行完毕后删除监听 countDownLatch.await(); } catch (InterruptedException e) { // e.printStackTrace(); } } this.zkClient.unsubscribeDataChanges(beforePath,iZkDataListener); } @Override public void unLock() { System.out.println(currentPath + "===释放锁资源..."); zkClient.delete(currentPath); zkClient.close(); } } 复制代码
运行main函数进行测试
package com.ooliuyue.zookeeperlock.zk; import java.util.List; import java.util.Vector; import java.util.concurrent.CountDownLatch; /** * @Auther: ly * @Date: 2019/4/25 14:39 */ public class OrderService implements Runnable { private OrderNumGenerator orderNumGenerator = new OrderNumGenerator(); private Lock lock = new ZookeeperDistrbuteLock2(); @Override public void run() { getNum(); } public void getNum() { try { lock.getLock(); String number = orderNumGenerator.getNumber(); System.out.println(Thread.currentThread().getName() + ",生产订单ID:" + number); } catch (Exception e) { // e.printStackTrace(); } finally { lock.unLock(); } } public static void main(String[] args) { System.out.println("##生产唯一订单号##"); for (int i = 0; i < 10 ; i++) { new Thread(new OrderService()).start(); } } } 复制代码
控制台输出结果:
##生产唯一订单号## 当前线程的锁为===/lock2/0000000083 当前线程的锁为===/lock2/0000000084 /lock2/0000000083===节点获取锁成功 Thread-1,生产订单ID:2019-04-26-14-12-37-1 /lock2/0000000083===释放锁资源... 当前线程的锁为===/lock2/0000000085 当前线程的锁为===/lock2/0000000086 当前线程的锁为===/lock2/0000000087 /lock2/0000000086===监控beforPath===/lock2/0000000085 /lock2/0000000085===监控beforPath===/lock2/0000000084 /lock2/0000000084===监控beforPath===/lock2/0000000083 /lock2/0000000087===监控beforPath===/lock2/0000000086 当前线程的锁为===/lock2/0000000088 /lock2/0000000084===节点获取锁成功 Thread-3,生产订单ID:2019-04-26-14-12-37-2 /lock2/0000000084===释放锁资源... 当前线程的锁为===/lock2/0000000089 /lock2/0000000089===监控beforPath===/lock2/0000000088 当前线程的锁为===/lock2/0000000090 /lock2/0000000090===监控beforPath===/lock2/0000000089 当前线程的锁为===/lock2/0000000091 /lock2/0000000091===监控beforPath===/lock2/0000000090 /lock2/0000000085===节点获取锁成功 Thread-5,生产订单ID:2019-04-26-14-12-37-3 /lock2/0000000085===释放锁资源... 当前线程的锁为===/lock2/0000000092 /lock2/0000000086===节点获取锁成功 Thread-7,生产订单ID:2019-04-26-14-12-37-4 /lock2/0000000086===释放锁资源... /lock2/0000000087===节点获取锁成功 Thread-9,生产订单ID:2019-04-26-14-12-37-5 /lock2/0000000087===释放锁资源... /lock2/0000000088===节点获取锁成功 Thread-11,生产订单ID:2019-04-26-14-12-37-6 /lock2/0000000088===释放锁资源... /lock2/0000000089===节点获取锁成功 Thread-13,生产订单ID:2019-04-26-14-12-37-7 /lock2/0000000089===释放锁资源... /lock2/0000000090===节点获取锁成功 Thread-15,生产订单ID:2019-04-26-14-12-37-8 /lock2/0000000090===释放锁资源... /lock2/0000000091===节点获取锁成功 Thread-17,生产订单ID:2019-04-26-14-12-37-9 /lock2/0000000091===释放锁资源... /lock2/0000000092===节点获取锁成功 Thread-19,生产订单ID:2019-04-26-14-12-37-10 /lock2/0000000092===释放锁资源... 复制代码