## Redis
### 锁的问题
#### 非原子操作
`加锁操作`和后面的`设置超时时间`是分开的,并`非原子操作`。解决方案:
**方案一:set命令**
```java
String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
if ("OK".equals(result)) {
return true;
}
return false;
```
在redis中还有`set`命令是原子操作,加锁和设置超时时间,一个命令就能轻松搞定。其中:
- `lockKey`:锁的标识
- `requestId`:请求id
- `NX`:只在键不存在时,才对键进行设置操作
- `PX`:设置键的过期时间为 millisecond 毫秒
- `expireTime`:过期时
**方案二:LUA脚本**
```lua
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end
return redis.call('pttl', KEYS[1]);
```
#### 忘了释放锁
加锁之后,每次都要达到了超时时间才释放锁,不会有点不合理。如果不及时释放锁,会有很多问题。合理流程如下:
![Redis释放锁流程](Redis释放锁流程.jpg)
释放锁的伪代码如下:
```java
try{
String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
if ("OK".equals(result)) {
return true;
}
return false;
} finally {
unlock(lockKey);
}
```
#### 释放了别人的锁
自己只能释放自己加的锁,不允许释放别人加的锁。
**方案一:requestId方案**
伪代码如下:
```java
if (jedis.get(lockKey).equals(requestId)) {
jedis.del(lockKey);
return true;
}
return false;
```
**方案二:LUA脚本方案**
```lua
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
```
#### 大量失败请求
在秒杀场景下,会有什么问题?每1万个同时请求,有1个成功。再1万个同时请求,有1个成功。如此下去,直到库存不足。这就变成均匀分布的秒杀了,跟我们想象中的不一样(应该是谁先来谁得到)。
**解决方案:自旋锁**
在规定的时间,比如500毫秒内,自旋不断尝试加锁(说白了,就是在死循环中,不断尝试加锁),如果成功则直接返回。如果失败,则休眠50毫秒,再发起新一轮的尝试。如果到了超时时间,还未加锁成功,则直接返回失败。
```java
try {
Long start = System.currentTimeMillis();
while(true) {
String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
if ("OK".equals(result)) {
// 创建订单
createOrder();
return true;
}
long time = System.currentTimeMillis() - start;
if (time>=timeout) {
return false;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally{
unlock(lockKey,requestId);
}
return false;
```
#### 锁重入问题
假设需要获取一颗满足条件的菜单树。需要在接口中从根节点开始,递归遍历出所有满足条件的子节点,然后组装成一颗菜单树。在后台系统中运营同学可以动态添加、修改和删除菜单。为了保证在并发的情况下,每次都可能获取最新的数据,这里可以加redis分布式锁。在递归方法中递归遍历多次,每次都是加的同一把锁。递归第一层当然是可以加锁成功的,但递归第二层、第三层...第N层,不就会加锁失败了?
递归方法中加锁的伪代码(会出现异常)如下:
```java
private int expireTime = 1000;
public void fun(int level,String lockKey,String requestId){
try{
String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
if ("OK".equals(result)) {
if(level<=10){
this.fun(++level,lockKey,requestId);
} else {
return;
}
}
return;
} finally {
unlock(lockKey,requestId);
}
}
```
**基于Redisson实现可重入锁**
伪代码如下:
```java
private int expireTime = 1000;
public void run(String lockKey) {
RLock lock = redisson.getLock(lockKey);
this.fun(lock,1);
}
public void fun(RLock lock,int level){
try{
lock.lock(5, TimeUnit.SECONDS);
if(level<=10){
this.fun(lock,++level);
} else {
return;
}
} finally {
lock.unlock();
}
}
```
#### 锁竞争问题
如果有大量需要写入数据的业务场景,使用普通的redis分布式锁是没有问题的。但如果有些业务场景,写入的操作比较少,反而有大量读取的操作。这样直接使用普通的redis分布式锁,会不会有点浪费性能?
**读写锁**
读写锁的特点:
- **读与读是共享的,不互斥**
- **读与写互斥**
- **写与写互斥**
我们以redisson框架为例,它内部已经实现了读写锁的功能。读锁的伪代码如下:
```java
RReadWriteLock readWriteLock = redisson.getReadWriteLock("readWriteLock");
RLock rLock = readWriteLock.readLock();
try {
rLock.lock();
//业务操作
} catch (Exception e) {
log.error(e);
} finally {
rLock.unlock();
}
```
写锁的伪代码如下:
```java
RReadWriteLock readWriteLock = redisson.getReadWriteLock("readWriteLock");
RLock rLock = readWriteLock.writeLock();
try {
rLock.lock();
//业务操作
} catch (InterruptedException e) {
log.error(e);
} finally {
rLock.unlock();
}
```
将读锁和写锁分开,最大的好处是提升读操作的性能,因为读和读之间是共享的,不存在互斥性。而我们的实际业务场景中,绝大多数数据操作都是读操作。所以,如果提升了读操作的性能,也就会提升整个锁的性能。
**锁分段**
此外,为了减小锁的粒度,比较常见的做法是将大锁:`分段`。
比如在秒杀扣库存的场景中,现在的库存中有2000个商品,用户可以秒杀。为了防止出现超卖的情况,通常情况下,可以对库存加锁。如果有1W的用户竞争同一把锁,显然系统吞吐量会非常低。
为了提升系统性能,我们可以将库存分段,比如:分为100段,这样每段就有20个商品可以参与秒杀。
在秒杀的过程中,先把用户id获取hash值,然后除以100取模。模为1的用户访问第1段库存,模为2的用户访问第2段库存,模为3的用户访问第3段库存,后面以此类推,到最后模为100的用户访问第100段库存。
**注意**:将锁分段虽说可以提升系统的性能,但它也会让系统的复杂度提升不少。因为它需要引入额外的路由算法,跨段统计等功能。我们在实际业务场景中,需要综合考虑,不是说一定要将锁分段。
#### 锁超时问题
如果线程A加锁成功了,但是由于业务功能耗时时间很长,超过了设置的超时时间,这时候redis会自动释放线程A加的锁。
**解决方案:自动续期**
自动续期的功能是获取锁之后开启一个定时任务,每隔10秒判断一下锁是否存在,如果存在,则刷新过期时间。如果续期3次,也就是30秒之后,业务方法还是没有执行完,就不再续期了。
我们可以使用`TimerTask`类,来实现自动续期的功能:
```java
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
//自动续期逻辑
}
}, 10000, TimeUnit.MILLISECONDS);
```
获取锁之后,自动开启一个定时任务,每隔10秒钟,自动刷新一次过期时间。这种机制在redisson框架中,有个比较霸气的名字:`watch dog`,即传说中的`看门狗`。当然自动续期功能,我们还是优先推荐使用lua脚本实现,比如:
```lua
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;
```
**需要**:在实现自动续期功能时,还需要设置一个总的过期时间,可以跟redisson保持一致,设置成30秒。如果业务代码到了这个总的过期时间,还没有执行完,就不再自动续期了。
#### 主从复制的问题
如果redis存在多个实例。比如:做了主从或使用了哨兵模式,基于redis的分布式锁的功能,就会出现问题。
比如锁A刚加锁成功master就挂了,还没来得及同步到slave上。这样会导致新master节点中的锁A丢失了。后面,如果有新的线程,使用锁A加锁,依然可以成功,分布式锁失效了。
**解决方案:RedissonRedLock**
RedissonRedLock解决问题的思路如下:
1. 需要搭建几套相互独立的redis环境,假如我们在这里搭建了5套
2. 每套环境都有一个redisson node节点
3. 多个redisson node节点组成了RedissonRedLock
4. 环境包含:单机、主从、哨兵和集群模式,可以是一种或者多种混合
在这里我们以主从为例,架构图如下:
RedissonRedLock加锁过程如下:
1. 获取所有的redisson node节点信息,循环向所有的redisson node节点加锁,假设节点数为N,例子中N等于5
2. 如果在N个节点当中,有N/2 + 1个节点加锁成功了,那么整个RedissonRedLock加锁是成功的
3. 如果在N个节点当中,小于N/2 + 1个节点加锁成功,那么整个RedissonRedLock加锁是失败的
4. 如果中途发现各个节点加锁的总耗时,大于等于设置的最大等待时间,则直接返回失败
从上面可以看出,使用Redlock算法,确实能解决多实例场景中,假如master节点挂了,导致分布式锁失效的问题。但也引出了一些新问题,比如:
- 需要额外搭建多套环境,申请更多的资源,需要评估一下成本和性价比
- 如果有N个redisson node节点,需要加锁N次,最少也需要加锁N/2+1次,才知道redlock加锁是否成功。显然,增加了额外的时间成本,有点得不偿失
**场景选择**
在实际业务场景,尤其是高并发业务中,RedissonRedLock其实使用的并不多。在分布式环境中,CAP是绕不过去的:一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。
- 如果你的实际业务场景,更需要的是**保证数据一致性**,那么请使用CP类型的分布式锁
比如:zookeeper,它是基于磁盘的,性能可能没那么好,但数据一般不会丢
- 如果你的实际业务场景,更需要的是**保证数据高可用性**。那么请使用AP类型的分布式锁
比如:redis,它是基于内存的,性能比较好,但有丢失数据的风险
其实,在绝大多数分布式业务场景中,使用redis分布式锁就够了,真的别太较真。因为数据不一致问题,可以通过最终一致性方案解决。但如果系统不可用了,对用户来说是暴击一万点伤害。
### LUA+SETNX+EXPIRE
先用`setnx`来抢锁,如果抢到之后,再用`expire`给锁设置一个过期时间,防止锁忘记了释放。
- **setnx(key, value)**
`setnx` 的含义就是 `SET if Not Exists`,该方法是原子的。如果 `key` 不存在,则设置当前 `key` 为 `value` 成功,返回 `1`;如果当前 `key` 已经存在,则设置当前 `key` 失败,返回 `0`。
- **expire(key, seconds)**
`expire` 设置过期时间,要注意的是 `setnx` 命令不能设置 `key` 的超时时间,只能通过 `expire()` 来对 `key` 设置。
**使用Lua脚本(SETNX+EXPIRE)**
可以使用Lua脚本来保证原子性(包含setnx和expire两条指令),加解锁代码如下:
```java
/**
* 使用Lua脚本,脚本中使用setnex+expire命令进行加锁操作
*/
public boolean lock(Jedis jedis, String key, String uniqueId, int seconds) {
String luaScript = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
"redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
Object result = jedis.eval(luaScript, Collections.singletonList(key),
Arrays.asList(uniqueId, String.valueOf(seconds)));
return result.equals(1L);
}
/**
* 使用Lua脚本进行解锁操纵,解锁的时候验证value值
*/
public boolean unlock(Jedis jedis, String key, String value) {
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) else return 0 end";
return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
}
```
**STW**
如果在写文件过程中,发生了 FullGC,并且其时间跨度较长, 超过了锁超时的时间, 那么分布式就自动释放了。在此过程中,client2 抢到锁,写了文件。client1 的FullGC完成后,也继续写文件,**注意,此时 client1 的并没有占用锁**,此时写入会导致文件数据错乱,发生线程安全问题。这就是STW导致的锁过期问题。STW导致的锁过期问题,如下图所示:
![STW导致的锁过期问题](STW导致的锁过期问题.png)
STW导致的锁过期问题,大概的解决方案有:
- **方案一: 模拟CAS乐观锁的方式,增加版本号(如下图中的token)**
此方案如果要实现,需要调整业务逻辑,与之配合,所以会入侵代码。
- **方案二:watch dog自动延期机制**
客户端1加锁的锁key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?简单!只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,**它是一个后台线程,会每隔10秒检查一下**,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。Redission采用的就是这种方案, 此方案不会入侵业务代码。
### SET-NX-EX
**方案**:`SET key value [EX seconds] [PX milliseconds] [NX|XX]`
- `EX second` :设置键的过期时间为 `second` 秒。 `SET key value EX second` 效果等同于 `SETEX key second value`
- `PX millisecond` :设置键的过期时间为 `millisecond` 毫秒。 `SET key value PX millisecond` 效果等同于 `PSETEX key millisecond value`
- `NX` :只在键不存在时,才对键进行设置操作。 `SET key value NX` 效果等同于 `SETNX key value`
- `XX` :只在键已经存在时,才对键进行设置操作
客户端执行以上的命令:
- 如果服务器返回 `OK` ,那么这个客户端获得锁
- 如果服务器返回 `NIL` ,那么客户端获取锁失败,可以在稍后再重试
**① 加锁**:使用redis命令 set key value NX EX max-lock-time 实现加锁
```java
Jedis jedis = new Jedis("127.0.0.1", 6379);
private static final String SUCCESS = "OK";
/**
* 加锁操作
* @param key 锁标识
* @param value 客户端标识
* @param timeOut 过期时间
*/
public Boolean lock(String key,String value,Long timeOut){
String var1 = jedis.set(key,value,"NX","EX",timeOut);
if(LOCK_SUCCESS.equals(var1)){
return true;
}
return false;
}
```
- 加锁操作 `jedis.set(key,value,"NX","EX",timeout)`【保证加锁的原子操作】
- `key`是`redis`的`key`值作为锁的标识,`value`在作为客户端的标识,只有`key-value`都比配才有删除锁的权利【保证安全性】
- 通过`timeout`设置过期时间保证不会出现死锁【避免死锁】
- `NX`:只有这个`key`不存才的时候才会进行操作,`if not exists`
- `EX`:设置`key`的过期时间为秒,具体时间由第`5`个参数决定,过期时间设置的合理有效期需要根据业务具体决定,总的原则是任务执行`time*3`
**② 解锁**:使用redis命令 EVAL 实现解锁
```java
Jedis jedis = new Jedis("127.0.0.1", 6379);
private static final String SUCCESS = "OK";
/**
* 加锁操作
* @param key 锁标识
* @param value 客户端标识
* @param timeOut 过期时间
*/
public Boolean lock(String key,String value,Long timeOut){
String var1 = jedis.set(key,value,"NX","EX",timeOut);
if(LOCK_SUCCESS.equals(var1)){
return true;
}
return false;
}
```
- luaScript 这个字符串是个lua脚本,代表的意思是如果根据key拿到的value跟传入的value相同就执行del,否则就返回0【保证安全性】
- jedis.eval(String,list,list);这个命令就是去执行lua脚本,KEYS的集合就是第二个参数,ARGV的集合就是第三参数【保证解锁的原子操作】
**③ 重试**
如果在业务中去拿锁如果没有拿到是应该阻塞着一直等待还是直接返回,这个问题其实可以写一个重试机制,根据重试次数和重试时间做一个循环去拿锁,当然这个重试的次数和时间设多少合适,是需要根据自身业务去衡量的。
```java
/**
* 重试机制
* @param key 锁标识
* @param value 客户端标识
* @param timeOut 过期时间
* @param retry 重试次数
* @param sleepTime 重试间隔时间
* @return
*/
public Boolean lockRetry(String key,String value,Long timeOut,Integer retry,Long sleepTime){
Boolean flag = false;
try {
for (int i=0;i<retry;i++){
flag = lock(key,value,timeOut);
if(flag){
break;
}
Thread.sleep(sleepTime);
}
}catch (Exception e){
e.printStackTrace();
}
return flag;
}
```