前言
上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁中,我们通过利用ZooKeeper的临时节点特性,实现了一个分布式锁。
但是是通过轮询的方式去判断不断尝试获取锁,空转对于CPU还是有一定消耗的,同时,对于多个线程竞争锁激烈的时候,很容易出现羊群效应。
为了解决上面两个问题。本文来看一下如何实现一个升级版的分布式锁。
设计
我们依然实现java.util.concurrent.locks.Lock接口。
和上一文中实现方式不同的是,我们使用ZooKeeper的EPHEMERAL_SEQUENTIAL临时顺序节点。
当首次获取锁时,会创建一个临时节点,如果这个临时节点末尾数字是当前父节点下同名节点中最小的,则获取锁成功。
否则,则监听上一个数字较大的节点,直到上一个节点被释放,则再次尝试获取锁成功。这样可以避免多个线程同时获取一把锁造成的竞争。
同时使用了ZooKeeper提供的watch功能,避免了轮询带来的CPU空转。
获取锁后使用一个volatile int类型的state进行计数,来实现锁的可重入机制。
DistributedFairLock
public class DistributedFairLock implements Lock {
private static Logger logger = LoggerFactory.getLogger(DistributedFairLock.class);
//ZooKeeper客户端,进行ZooKeeper操作
private ZooKeeper zooKeeper;
//根节点名称
private String dir;
//加锁节点
private String node;
//ZooKeeper鉴权信息
private List<ACL> acls;
//要加锁节点
private String fullPath;
//加锁标识,为0时表示未获取到锁,每获取一次锁则加一,释放锁时减一。减到0时断开连接,删除临时节点。
private volatile int state;
//当前锁创建的节点id
private String id;
//通过CountDownLatch阻塞,直到监听上一节点被取消,再进行后续操作
private CountDownLatch countDownLatch;
/**
* Constructor.
*
* @param zooKeeper the zoo keeper
* @param dir the dir
* @param node the node
* @param acls the acls
*/
public DistributedFairLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
this.zooKeeper = zooKeeper;
this.dir = dir;
this.node = node;
this.acls = acls;
this.fullPath = dir.concat("/").concat(this.node);
init();
}
private void init() {
try {
Stat stat = zooKeeper.exists(dir, false);
if (stat == null) {
zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
}
} catch (Exception e) {
logger.error("[DistributedFairLock#init] error : " + e.toString(), e);
}
}
}
lock
public void lock() {
try {
//加锁
synchronized (this) {
//如果当前未持有锁
if (state <= 0) {
//创建节点
if (id == null) {
id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
}
//获取当前路径下所有的节点
List<String> nodes = zooKeeper.getChildren(dir, false);
SortedSet<String> sortedSet = new TreeSet<>();
for (String node : nodes) {
sortedSet.add(dir.concat("/").concat(node));
}
//获取所有id小于当前节点顺序的节点
SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
if (!lessSet.isEmpty()) {
//监听上一个节点,就是通过这里避免多锁竞争和CPU空转,实现公平锁的
Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
if (stat != null) {
countDownLatch = new CountDownLatch(1);
countDownLatch.await();
}
}
}
state++;
}
} catch (InterruptedException e) {
logger.error("[DistributedFairLock#lock] error : " + e.toString(), e);
Thread.currentThread().interrupt();
} catch (KeeperException ke) {
logger.error("[DistributedFairLock#lock] error : " + ke.toString(), ke);
if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
Thread.currentThread().interrupt();
}
}
}
tryLock
public boolean tryLock() {
try {
synchronized (this) {
if (state <= 0) {
if (id == null) {
id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
}
List<String> nodes = zooKeeper.getChildren(dir, false);
SortedSet<String> sortedSet = new TreeSet<>();
for (String node : nodes) {
sortedSet.add(dir.concat("/").concat(node));
}
SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
if (!lessSet.isEmpty()) {
return false;
}
}
state++;
}
} catch (InterruptedException e) {
logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
return false;
} catch (KeeperException ke) {
logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
return false;
}
}
return true;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
try {
synchronized (this) {
if (state <= 0) {
if (id == null) {
id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
}
List<String> nodes = zooKeeper.getChildren(dir, false);
SortedSet<String> sortedSet = new TreeSet<>();
for (String node : nodes) {
sortedSet.add(dir.concat("/").concat(node));
}
SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
if (!lessSet.isEmpty()) {
Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
if (stat != null) {
countDownLatch = new CountDownLatch(1);
countDownLatch.await(time, unit);
}
}
}
state++;
}
} catch (InterruptedException e) {
logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
return false;
} catch (KeeperException ke) {
logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
return false;
}
}
return true;
}
unlock
public void unlock() {
synchronized (this) {
if (state > 0) {
state--;
}
//当不再持有锁时,删除创建的临时节点
if (state == 0 && zooKeeper != null) {
try {
zooKeeper.delete(id, -1);
id = null;
} catch (Exception e) {
logger.error("[DistributedFairLock#unlock] error : " + e.toString(), e);
}
}
}
}
LockWatcher
private class LockWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
synchronized (this) {
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
}
}
总结
上面就是我们改良后,通过临时顺序节点和watch机制实现的公平可重入分布式锁。
源代码可见:aloofJr
通过watch机制避免轮询带来的CPU空转。
通过顺序临时节点避免了羊群效应。
如果对以上方式有更好的优化方案,欢迎一起讨论。
更多文章
见我的博客:https://nc2era.com
written by AloofJr,转载请注明出处