3.4 zookeeper 瞬时znode节点 + watcher监听机制
临时节点具备数据自动删除的功能。当client与ZooKeeper连接和session断掉时,相应的临时节点就会被删除。zk有瞬时和持久节点,瞬时节点不可以有子节点。会话结束之后瞬时节点就会消失,基于zk的瞬时有序节点实现分布式锁:
- 多线程并发创建瞬时节点的时候,得到有序的序列,序号最小的线程可以获得锁;
- 其他的线程监听自己序号的前一个序号。前一个线程执行结束之后删除自己序号的节点;
- 下一个序号的线程得到通知,继续执行;
- 以此类推,创建节点的时候,就确认了线程执行的顺序。
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.14</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency>
zk 的观察器只可以监控一次,数据发生变化之后可以发送给客户端,之后需要再次设置监控。exists、create、getChildren三个方法都可以添加watcher ,也就是在调用方法的时候传递true就是添加监听。注意这里Lock 实现了Watcher和AutoCloseable:
当前线程创建的节点是第一个节点就获得锁,否则就监听自己的前一个节点的事件:
/** * 自己本身就是一个 watcher,可以得到通知 * AutoCloseable 实现自动关闭,资源不使用的时候 */ @Slf4j public class ZkLock implements AutoCloseable, Watcher { private ZooKeeper zooKeeper; /** * 记录当前锁的名字 */ private String znode; public ZkLock() throws IOException { this.zooKeeper = new ZooKeeper("localhost:2181", 10000,this); } public boolean getLock(String businessCode) { try { //创建业务 根节点 Stat stat = zooKeeper.exists("/" + businessCode, false); if (stat==null){ zooKeeper.create("/" + businessCode,businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } //创建瞬时有序节点 /order/order_00000001 znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //获取业务节点下 所有的子节点 List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false); //获取序号最小的(第一个)子节点 Collections.sort(childrenNodes); String firstNode = childrenNodes.get(0); //如果创建的节点是第一个子节点,则获得锁 if (znode.endsWith(firstNode)){ return true; } //如果不是第一个子节点,则监听前一个节点 String lastNode = firstNode; for (String node:childrenNodes){ if (znode.endsWith(node)){ zooKeeper.exists("/"+businessCode+"/"+lastNode,true); break; }else { lastNode = node; } } synchronized (this){ wait(); } return true; } catch (Exception e) { e.printStackTrace(); } return false; } @Override public void close() throws Exception { zooKeeper.delete(znode,-1); zooKeeper.close(); log.info("我已经释放了锁!"); } @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDeleted){ synchronized (this){ notify(); } } } }
3.5 zookeeper curator
在实际的开发中,不建议去自己“重复造轮子”,而建议直接使用Curator客户端中的各种官方实现的分布式锁,例如其中的InterProcessMutex可重入锁。
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> <exclusions> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency>
@Bean(initMethod="start",destroyMethod = "close") public CuratorFramework getCuratorFramework() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory. newClient("localhost:2181", retryPolicy); return client; }
框架已经实现了分布式锁。zk的Java客户端升级版。使用的时候直接指定重试的策略就可以。
官网中分布式锁的实现是在curator-recipes依赖中,不要引用错了。
@Autowired private CuratorFramework client; @Test public void testCuratorLock(){ InterProcessMutex lock = new InterProcessMutex(client, "/order"); try { if ( lock.acquire(30, TimeUnit.SECONDS) ) { try { log.info("我获得了锁!!!"); } finally { lock.release(); } } } catch (Exception e) { e.printStackTrace(); } client.close(); }
3.6 Redission
重新实现了Java并发包下处理并发的类,让其可以跨JVM使用,例如CHM等。
✪ 3.6.1 非SpringBoot项目引入
引入Redisson的依赖,然后配置对应的XML即可:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.11.2</version> <exclusions> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency>
编写相应的redisson.xml
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:redisson="http://redisson.org/schema/redisson" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://redisson.org/schema/redisson http://redisson.org/schema/redisson/redisson.xsd "> <redisson:client> <redisson:single-server address="redis://127.0.0.1:6379"/> </redisson:client> </beans>
配置对应@ImportResource("classpath*:redisson.xml")资源文件。
✪ 3.6.2 SpringBoot项目引入
或者直接使用springBoot的starter即可。
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.19.1</version> </dependency>
修改application.properties即可:#spring.redis.host=
✪ 3.6.3 设置配置类
@Bean public RedissonClient getRedissonClient() { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); return Redisson.create(config); }
✪ 3.6.4 使用
@Test public void testRedissonLock() { RLock rLock = redisson.getLock("order"); try { rLock.lock(30, TimeUnit.SECONDS); log.info("我获得了锁!!!"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); }finally { log.info("我释放了锁!!"); rLock.unlock(); } }