分布式锁的由来
在我们的日常开发中,一个进程中当多线程的去竞争某一资源的时候,我们通常会用一把锁来保证只有一个线程获取到资源。如加上synchronize关键字或ReentrantLock锁等操作。
那么,如果是多个进程相互竞争一个资源,如何保证资源只会被一个操作者持有呢?
server1、server2、server3 这三个服务都要修改amount这个数据,每个服务更新的值不同,为了保证数据的正确性,三个服务都向lock server服务申请修改权限,最终server2拿到了修改权限,即server2将amount更新为2,其他服务由于没有获取到修改权限则返回更新失败。
特征
- 互斥性 任意时刻,只有一个客户端能持有锁。
- 锁超时释放 持有锁超时,可以释放,防止不必要的资源浪费,也可以防止死锁。
- 可重入性 一个线程如果获取了锁之后,可以再次对其请求加锁。
- 高性能、高可用 加锁和解锁需要开销尽可能低,同时也要保证高可用,避免分布式锁失效。
- 安全性 锁只能被持有的客户端删除,不能被其他客户端删除
核心实现
单实例
我们这里直接使用Lua脚本来保证原子性,其中
NX :表示key不存在的时候,才能set成功,也即保证只有第一个客户端请求才能获得锁,而其他客户端请求只能等其释放锁,才能获取。
EX seconds :设定key的过期时间,时间单位是秒。
PX milliseconds: 设定key的过期时间,单位为毫秒
加锁
- 使用 Setnx 命令保证互斥性。
- 需要设置锁的过期时间,避免死锁。
- Setnx 和设置过期时间需要保持原子性,避免在设置 Setnx 成功之后在设置过期时间客户端崩溃导致死锁。
- 加锁的 Value 值为一个唯一标识。由于分布式任务随机数都不可靠,可以(MAC地址+JVM进程ID) 作为唯一标识。加锁成功后需要把唯一标识返回给 Client 来用进行解锁操作。
脚本如下:
redis.call('set',key,value,'NX','PX',30000)
解锁
- 需要拿加锁成功的唯一标识进行解锁,从而保证加锁和解锁的是同一个客户端。
- 解锁操作需要比较唯一标识是否相等,相等再执行删除操作。
- 这 2 个操作可以采用 Lua 脚本的方式,保持原子性。脚本如下:
redis.call('get',key == value then return tostring(redis.call('del', key)==1) else return 'false' end
缺点
该方式最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:
例如,在Redis的master节点上拿到了锁,但是这个加锁的key还没有同步到slave节点,此时master故障,发生故障转移,slave节点升级为master节点,导致锁丢失。
Redlock(红锁)
浅谈redlock
在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。现在我们假设有5个Redis master节点,同时我们需要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。
核心实现
- 获取当前时间,以毫秒为单位。
- 按顺序向5个master节点请求加锁。客户端设置网络连接和响应超时时间,并且超时时间要小于锁的失效时间。(假设锁自动失效时间为10秒,则超时时间一般在5-50毫秒之间,我们就假设超时时间是50ms吧)。如果超时,跳过该master节点,尽快去尝试下一个master节点。
- 客户端使用当前时间减去开始获取锁时间(即步骤1记录的时间),得到获取锁使用的时间。当且仅当超过一半(N/2+1,这里是5/2+1=3个节点)的Redis master节点都获得锁,并且使用的时间小于锁失效时间时,锁才算获取成功。(如上图,10s> 30ms+40ms+50ms+40ms+50ms)
- 如果取到了锁,key的真正有效时间就变啦,需要减去获取锁所使用的时间。
- 如果获取锁失败(没有在至少N/2+1个master实例上取到锁,又或者获取锁时间已经超过了有效时间),则客户端必须在所有的master节点上解锁(即便有些master节点根本就没有加锁成功,也需要解锁,以防止有些漏网之鱼)。
源码剖析(redisson)
用法
首先,我们来看一下redission封装的redlock算法实现的分布式锁用法,非常简单,跟重入锁(ReentrantLock)有点类似:
Config config = new Config(); config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389") .setMasterName("masterName") .setPassword("password").setDatabase(0); RedissonClient redissonClient = Redisson.create(config); // 还可以getFairLock(), getReadWriteLock() RLock redLock = redissonClient.getLock("REDLOCK_KEY"); boolean isLock; try { isLock = redLock.tryLock(); // 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。 isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS); if (isLock) { //TODO if get lock success, do something; } } catch (Exception e) { } finally { // 无论如何, 最后都要解锁 redLock.unlock(); }
唯一ID
实现分布式锁的一个非常重要的点就是set的value要具有唯一性,redisson的value是怎样保证value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock(“REDLOCK_KEY”),源码在Redisson.java和RedissonLock.java中:
protected final UUID id = UUID.randomUUID(); String getLockName(long threadId) { return id + ":" + threadId; }
获取锁
获取锁的代码为redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),两者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); // 获取锁时向5个redis实例发送的命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间) "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 获取分布式锁的KEY的失效时间毫秒数 "return redis.call('pttl', KEYS[1]);", // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2] Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
释放锁
释放锁的代码为redLock.unlock(),核心源码如下:
protected RFuture unlockInnerAsync(long threadId) { // 向5个redis实例都执行如下命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 如果分布式锁KEY不存在,那么向channel发布一条消息 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + // 如果就是当前线程占有分布式锁,那么将重入次数减1 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除 "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + // 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息 "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3] Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }