不用找了,基于 Redis 的分布式锁实战来了!

简介: 前言:在分布式环境中,我们经常使用锁来进行并发控制,锁可分为乐观锁和悲观锁,基于数据库版本戳的实现是乐观锁,基于redis或zookeeper的实现可认为是悲观锁了。乐观锁和悲观锁最根本的区别在于线程之间是否相互阻塞。

前言:在分布式环境中,我们经常使用锁来进行并发控制,锁可分为乐观锁和悲观锁,


基于数据库版本戳的实现是乐观锁,基于redis或zookeeper的实现可认为是悲观锁了。乐观锁和悲观锁最根本的区别在于线程之间是否相互阻塞。


那么,本文主要来讨论基于redis的分布式锁算法问题。

从2.6.12版本开始,redis为SET命令增加了一系列选项(set [key] NX/XX EX/PX [expiration]):


EX seconds – 设置键key的过期时间,单位时秒


PX milliseconds – 设置键key的过期时间,单位时毫秒


NX – 只有键key不存在的时候才会设置key的值


XX – 只有键key存在的时候才会设置key的值


原文地址:https://redis.io/commands/set

中文地址:http://redis.cn/commands/set.html


注意: 由于SET命令加上选项已经可以完全取代SETNX, SETEX, PSETEX的功能,所以在将来的版本中,redis可能会不推荐使用并且最终抛弃这几个命令。


这里简单提一下,在旧版本的redis中(指2.6.12版本之前),使用redis实现分布式锁一般需要setNX、expire、getSet、del等命令。而且会发现这种实现有很多逻辑判断的原子操作以及本地时间等并没有控制好。


而在旧版本的redis中,redis的超时时间很难控制,用户迫切需要把setNX和expiration结合为一体的命令,把他们作为一个原子操作,这样新版本的多选项set命令诞生了。然而这并没有完全解决复杂的超时控制带来的问题。


接下来,我们的一切讨论都基于新版redis。


在这里,我先提出几个在实现redis分布式锁中需要考虑的关键问题


1、死锁问题;

1.1、为了防止死锁,redis至少需要设置一个超时时间;


1.2、由1.1引申出来,当锁自动释放了,但是程序并没有执行完毕,这时候其他线程又获取到锁执行同样的程序,可能会造成并发问题,这个问题我们需要考虑一下是否归属于分布式锁带来问题的范畴。


2、锁释放问题,这里会有两个问题;

2.1、每个获取redis锁的线程应该释放自己获取到的锁,而不是其他线程的,所以我们需要在每个线程获取锁的时候给锁做上不同的标记以示区分;


2.2、由2.1带来的问题是线程在释放锁的时候需要判断当前锁是否属于自己,如果属于自己才释放,这里涉及到逻辑判断语句,至少是两个操作在进行,那么我们需要考虑这两个操作要在一个原子内执行,否者在两个行为之间可能会有其他线程插入执行,导致程序紊乱。


3、更可靠的锁;

单实例的redis(这里指只有一个master节点)往往是不可靠的,虽然实现起来相对简单一些,但是会面临着宕机等不可用的场景,即使在主从复制的时候也显得并不可靠(因为redis的主从复制往往是异步的)。


关于Martin Kleppmann的Redlock的分析


原文地址:https://redis.io/topics/distlock

中文地址:http://redis.cn/topics/distlock.html


文章分析得出,这种算法只需具备3个特性就可以实现一个最低保障的分布式锁。


安全属性(Safety property): 独享(相互排斥)。在任意一个时刻,只有一个客户端持有锁。


活性A(Liveness property A): 无死锁。即便持有锁的客户端崩溃(crashed)或者网络被分裂(gets partitioned),锁仍然可以被获取。


活性B(Liveness property B): 容错。只要大部分Redis节点都活着,客户端就可以获取和释放锁.


我们来分析一下:

第一点安全属性意味着悲观锁(互斥锁)是我们做redis分布式锁的前提,否者将可能造成并发;


第二点表明为了避免死锁,我们需要设置锁超时时间,保证在一定的时间过后,锁可以重新被利用;


第三点是说对于客户端来说,获取锁和手动释放锁可以有更高的可靠性。


更进一步分析,结合上文提到的关键问题,这里可以引申出另外的两个问题:

怎么才能合理判断程序真正处理的有效时间范围?(这里有个时间偏移的问题)


redis Master节点宕机后恢复(可能还没有持久化到磁盘)、主从节点切换,(N/2)+1这里的N应该怎么动态计算更合理?


接下来再看,redis之父antirez对Redlock的评价


原文地址:http://antirez.com/news/101


文中主要提到了网络延迟和本地时钟的修改(不管是时间服务器或人为修改)对这种算法可能造成的影响。


最后,来点实践吧

I、传统的单实例redis分布式锁实现(关键步骤)

获取锁(含自动释放锁):

SET resource_name my_random_value NX PX 30000  
 手动删除锁(Lua脚本):  
if redis.call("get",KEYS[1]) == ARGV[1] then  
    return redis.call("del",KEYS[1])  
else  
    return 0  
end  

II、分布式环境的redis(多master节点)的分布式锁实现

为了保证在尽可能短的时间内获取到(N/2)+1个节点的锁,可以并行去获取各个节点的锁(当然,并行可能需要消耗更多的资源,因为串行只需要count到足够数量的锁就可以停止获取了);


另外,怎么动态实时统一获取redis master nodes需要更进一步去思考了。


QA,补充一下说明(以下为我与朋友沟通的情况,以说明文中大家可能不够明白的地方):


1、在关键问题2.1中,删除就删除了,会造成什么问题?


线程A超时,准备删除锁;但此时的锁属于线程B;线程B还没执行完,线程A把锁删除了,这时线程C获取到锁,同时执行程序;所以不能乱删。


2、在关键问题2.2中,只要在key生成时,跟线程相关就不用考虑这个问题了吗?


不同的线程执行程序,线程之间肯虽然有差异呀,然后在redis锁的value设置有线程信息,比如线程id或线程名称,是分布式环境的话加个机器id前缀咯(类似于twitter的snowflake算法!),但是在del命令只会涉及到key,不会再次检查value,所以还是需要lua脚本控制if(condition){xxx}的原子性。


3、那要不要考虑锁的重入性?


不需要重入;try…finally 没得重入的场景;对于单个线程来说,执行是串行的,获取锁之后必定会释放,因为finally的代码必定会执行啊(只要进入了try块,finally必定会执行)。


4、为什么两个线程都会去删除锁?(貌似重复的问题。不管怎样,还是耐心解答吧)


每个线程只能管理自己的锁,不能管理别人线程的锁啊。这里可以联想一下ThreadLocal。


5、如果加锁的线程挂了怎么办?只能等待自动超时?


看你怎么写程序的了,一种是问题3的回答;另外,那就自动超时咯。这种情况也适用于网络over了。


6、时间太长,程序异常就会蛋疼,时间太短,就会出现程序还没有处理完就超时了,这岂不是很尴尬?


是呀,所以需要更好的衡量这个超时时间的设置。


实践部分主要代码:


RedisLock工具类:

package com.caiya.cms.web.component;  
import com.caiya.cache.CacheException;  
import com.caiya.cache.redis.JedisCache;  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
import java.util.Objects;  
import java.util.concurrent.TimeUnit;  
/**  
 * redis实现分布式锁  
 * 可实现特性:  
 * 1、使多线程无序排队获取和释放锁;  
 * 2、丢弃未成功获得锁的线程处理;  
 * 3、只释放线程本身加持的锁;  
 * 4、避免死锁  
 *  
 * @author wangnan  
 * @since 1.0  
 */  
public final class RedisLock {  
    private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);  
    /**  
     * 尝试加锁(仅一次)  
     *  
     * @param lockKey       锁key  
     * @param lockValue     锁value  
     * @param expireSeconds 锁超时时间(秒)  
     * @return 是否加锁成功  
     * @throws CacheException  
     */  
    public static boolean tryLock(String lockKey, String lockValue, long expireSeconds) throws CacheException {  
        JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache();  
        try {  
            String response = jedisCache.set(lockKey, lockValue, "nx", "ex", expireSeconds);  
            return Objects.equals(response, "OK");  
        } finally {  
            jedisCache.close();  
        }  
    }  
    /**  
     * 加锁(指定最大尝试次数范围内)  
     *  
     * @param lockKey       锁key  
     * @param lockValue     锁value  
     * @param expireSeconds 锁超时时间(秒)  
     * @param tryTimes      最大尝试次数  
     * @param sleepMillis   每两次尝试之间休眠时间(毫秒)  
     * @return 是否加锁成功  
     * @throws CacheException  
     */  
    public static boolean lock(String lockKey, String lockValue, long expireSeconds, int tryTimes, long sleepMillis) throws CacheException {  
        boolean result;  
        int count = 0;  
        do {  
            count++;  
            result = tryLock(lockKey, lockValue, expireSeconds);  
            try {  
                TimeUnit.MILLISECONDS.sleep(sleepMillis);  
            } catch (InterruptedException e) {  
                logger.error(e.getMessage(), e);  
            }  
        } while (!result && count <= tryTimes);  
        return result;  
    }  
    /**  
     * 释放锁  
     *  
     * @param lockKey   锁key  
     * @param lockValue 锁value  
     */  
    public static void unlock(String lockKey, String lockValue) {  
        JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache();  
        try {  
            String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";  
            Object result = jedisCache.eval(luaScript, 1, lockKey, lockValue);  
//            Objects.equals(result, 1L);  
        } catch (Exception e) {  
            logger.error(e.getMessage(), e);  
        } finally {  
            jedisCache.close();  
        }  
//        return false;  
    }  
    private RedisLock() {  
    }  
}  

使用工具类的代码片段1:

        ...  
        String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();// 跟业务相关的唯一拼接键  
        String lockValue = Constant.DEFAULT_CACHE_NAME + ":" + System.getProperty("JvmId") + ":" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();// 生成集群环境中的唯一值  
        boolean locked = RedisLock.tryLock(lockKey, lockValue, 100);// 只尝试一次,在本次处理过程中直接拒绝其他线程的请求  
        if (!locked) {  
            throw new IllegalAccessException("您的操作太频繁了,休息一下再来吧~");  
        }  
        try {  
            // 开始处理核心业务逻辑  
            Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());  
            ...  
            ...  
        } finally {  
            RedisLock.unlock(lockKey, lockValue);// 在finally块中释放锁  
        }  

使用工具类的代码片段2:

        ...  
        String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();  
        String lockValue = Constant.DEFAULT_CACHE_NAME + ":机器编号:" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();  
        boolean locked = RedisLock.lock(lockKey, lockValue, 100, 20, 100);// 非公平锁,无序竞争(这里需要合理根据业务处理情况设置最大尝试次数和每次休眠时间)  
        if (!locked) {  
            throw new IllegalAccessException("系统太忙,本次操作失败");// 一般来说,不会走到这一步;如果真的有这种情况,并且在合理设置锁尝试次数和等待响应时间之后仍然处理不过来,可能需要考虑优化程序响应时间或者用消息队列排队执行了  
        }  
        try {  
            // 开始处理核心业务逻辑  
            Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());  
            ...  
            ...  
        } finally {  
            RedisLock.unlock(lockKey, lockValue);  
        }  
        ...  

附加:

基于redis的分布式锁实现客户端Redisson:

https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers

相关文章
|
9月前
|
存储 负载均衡 NoSQL
【赵渝强老师】Redis Cluster分布式集群
Redis Cluster是Redis的分布式存储解决方案,通过哈希槽(slot)实现数据分片,支持水平扩展,具备高可用性和负载均衡能力,适用于大规模数据场景。
648 2
|
9月前
|
存储 缓存 NoSQL
【📕分布式锁通关指南 12】源码剖析redisson如何利用Redis数据结构实现Semaphore和CountDownLatch
本文解析 Redisson 如何通过 Redis 实现分布式信号量(RSemaphore)与倒数闩(RCountDownLatch),利用 Lua 脚本与原子操作保障分布式环境下的同步控制,帮助开发者更好地理解其原理与应用。
759 6
|
10月前
|
存储 缓存 NoSQL
Redis核心数据结构与分布式锁实现详解
Redis 是高性能键值数据库,支持多种数据结构,如字符串、列表、集合、哈希、有序集合等,广泛用于缓存、消息队列和实时数据处理。本文详解其核心数据结构及分布式锁实现,帮助开发者提升系统性能与并发控制能力。
|
8月前
|
存储 NoSQL 前端开发
Redis专题-实战篇一-基于Session和Redis实现登录业务
本项目基于SpringBoot实现黑马点评系统,涵盖Session与Redis两种登录方案。通过验证码登录、用户信息存储、拦截器校验等流程,解决集群环境下Session不共享问题,采用Redis替代Session实现数据共享与自动续期,提升系统可扩展性与安全性。
514 3
Redis专题-实战篇一-基于Session和Redis实现登录业务
|
8月前
|
存储 缓存 NoSQL
Redis专题-实战篇二-商户查询缓存
本文介绍了缓存的基本概念、应用场景及实现方式,涵盖Redis缓存设计、缓存更新策略、缓存穿透问题及其解决方案。重点讲解了缓存空对象与布隆过滤器的使用,并通过代码示例演示了商铺查询的缓存优化实践。
353 1
Redis专题-实战篇二-商户查询缓存
|
8月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
771 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
8月前
|
缓存 NoSQL 关系型数据库
Redis缓存和分布式锁
Redis 是一种高性能的键值存储系统,广泛用于缓存、消息队列和内存数据库。其典型应用包括缓解关系型数据库压力,通过缓存热点数据提高查询效率,支持高并发访问。此外,Redis 还可用于实现分布式锁,解决分布式系统中的资源竞争问题。文章还探讨了缓存的更新策略、缓存穿透与雪崩的解决方案,以及 Redlock 算法等关键技术。
|
NoSQL Redis 数据库
用redis实现分布式锁时容易踩的5个坑
云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! 近有不少小伙伴投入短视频赛道,也出现不少第三方数据商,为大家提供抖音爬虫数据。 小伙伴们有没有好奇过,这些数据是如何获取的,普通技术小白能否也拥有自己的抖音爬虫呢? 本文会全面解密抖音爬虫的幕后原理,不需要任何编程知识,还请耐心阅读。
用redis实现分布式锁时容易踩的5个坑
|
NoSQL Java 关系型数据库
浅谈Redis实现分布式锁
浅谈Redis实现分布式锁
|
存储 canal 缓存