分布式锁实现原理与最佳实践(2)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 分布式锁实现原理与最佳实践

02 单体应用解决超卖的问题


正确示例:将事务包含在锁的控制范围内

保证在锁释放之前,事务已经提交。

//@Transactional(rollbackFor = Exception.class)
public synchronized Long createOrder() throws Exception {
    TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
    Product product = productMapper.selectByPrimaryKey(purchaseProductId);
    if (product == null) {
        platformTransactionManager.rollback(transaction1);
        throw new Exception("购买商品:" + purchaseProductId + "不存在");
    }
    //商品当前库存
    Integer currentCount = product.getCount();
    //校验库存
    if (purchaseProductNum > currentCount) {
        platformTransactionManager.rollback(transaction1);
        throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买");
    }
    productMapper.updateProductCount(purchaseProductNum, new Date(), product.getId());
    Order order = new Order();
    // ... 省略 Set
    orderMapper.insertSelective(order);
    OrderItem orderItem = new OrderItem();
    orderItem.setOrderId(order.getId());
    // ... 省略 Set
    return order.getId();
    platformTransactionManager.commit(transaction1);
}


正确示例:使用synchronized的代码块

public Long createOrder() throws Exception {
    Product product = null;
    //synchronized (this) {
    //synchronized (object) {
    synchronized (DBOrderService2.class) {
        TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
        product = productMapper.selectByPrimaryKey(purchaseProductId);
        if (product == null) {
            platformTransactionManager.rollback(transaction1);
            throw new Exception("购买商品:" + purchaseProductId + "不存在");
        }
        //商品当前库存
        Integer currentCount = product.getCount();
        System.out.println(Thread.currentThread().getName() + "库存数:" + currentCount);
        //校验库存
        if (purchaseProductNum > currentCount) {
            platformTransactionManager.rollback(transaction1);
            throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买");
        }
        productMapper.updateProductCount(purchaseProductNum, new Date(), product.getId());
        platformTransactionManager.commit(transaction1);
    }
    TransactionStatus transaction2 = platformTransactionManager.getTransaction(transactionDefinition);
    Order order = new Order();
    // ... 省略 Set
    orderMapper.insertSelective(order);
    OrderItem orderItem = new OrderItem();
    // ... 省略 Set
    orderItemMapper.insertSelective(orderItem);
    platformTransactionManager.commit(transaction2);
    return order.getId();


正确示例:使用Lock

private Lock lock = new ReentrantLock();
public Long createOrder() throws Exception{
    Product product = null;
    lock.lock();
    TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
    try {
        product = productMapper.selectByPrimaryKey(purchaseProductId);
        if (product==null){
            throw new Exception("购买商品:"+purchaseProductId+"不存在");
        }
        //商品当前库存
        Integer currentCount = product.getCount();
        System.out.println(Thread.currentThread().getName()+"库存数:"+currentCount);
        //校验库存
        if (purchaseProductNum > currentCount){
            throw new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买");
        }
        productMapper.updateProductCount(purchaseProductNum,new Date(),product.getId());
        platformTransactionManager.commit(transaction1);
    } catch (Exception e) {
        platformTransactionManager.rollback(transaction1);
    } finally {
        // 注意抛异常的时候锁释放不掉,分布式锁也一样,都要在这里删掉
        lock.unlock();
    }
    TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
    Order order = new Order();
    // ... 省略 Set
    orderMapper.insertSelective(order);
    OrderItem orderItem = new OrderItem();
    // ... 省略 Set
    orderItemMapper.insertSelective(orderItem);
    platformTransactionManager.commit(transaction);
    return order.getId();
}


03 常见分布式锁的使用

上面使用的方法只能解决单体项目,当部署多台机器的时候就会失效,因为锁本身就是单机的锁,所以需要使用分布式锁来实现。


3.1 数据库乐观锁

数据库中的乐观锁,加一个version字段,利用CAS来实现,乐观锁的方式支持多台机器并发安全。但是并发量大的时候会导致大量的update失败


3.2 数据库分布式锁

db操作性能较差,并且有锁表的风险,一般不考虑。

✪ 3.2.1 简单的数据库锁

image.png

⍟ select for update

直接在数据库新建一张表:

image.png

锁的code预先写到数据库中,抢锁的时候,使用select for update查询锁对应的key,也就是这里的code,阻塞就说明别人在使用锁。

// 加上事务就是为了 for update 的锁可以一直生效到事务执行结束
// 默认回滚的是 RunTimeException
@Transactional(rollbackFor = Exception.class)
public String singleLock() throws Exception {
    log.info("我进入了方法!");
    DistributeLock distributeLock = distributeLockMapper.
        selectDistributeLock("demo");
    if (distributeLock==null) {
        throw new Exception("分布式锁找不到");
    }
    log.info("我进入了锁!");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "我已经执行完成!";
}
<select id="selectDistributeLock" resultType="com.deltaqin.distribute.model.DistributeLock">
  select * from distribute_lock
  where businessCode = #{businessCode,jdbcType=VARCHAR}
  for update
</select>

使用唯一键作为限制,插入一条数据,其他待执行的SQL就会失败,当数据删除之后再去获取锁 ,这是利用了唯一索引的排他性。

⍟ insert lock

直接维护一张锁表:

@Autowired
private MethodlockMapper methodlockMapper;
@Override
public boolean tryLock() {
    try {
        //插入一条数据   insert into
        methodlockMapper.insert(new Methodlock("lock"));
    }catch (Exception e){
        //插入失败
        return false;
    }
    return true;
}
@Override
public void waitLock() {
    try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
@Override
public void unlock() {
    //删除数据   delete
    methodlockMapper.deleteByMethodlock("lock");
    System.out.println("-------释放锁------");
}


3.3 Redis setNx

Redis 原生支持的,保证只有一个会话可以设置成功,因为Redis自己就是单线程串行执行的。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring.redis.host=localhost

封装一个锁对象:

@Slf4j
public class RedisLock implements AutoCloseable {
    private RedisTemplate redisTemplate;
    private String key;
    private String value;
    //单位:秒
    private int expireTime;
    /**
     * 没有传递 value,因为直接使用的是随机值
     */
    public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){
        this.redisTemplate = redisTemplate;
        this.key = key;
        this.expireTime=expireTime;
        this.value = UUID.randomUUID().toString();
    }
    /**
     * JDK 1.7 之后的自动关闭的功能
     */
    @Override
    public void close() throws Exception {
        unLock();
    }
    /**
     * 获取分布式锁
     * SET resource_name my_random_value NX PX 30000
     * 每一个线程对应的随机值 my_random_value 不一样,用于释放锁的时候校验
     * NX 表示 key 不存在的时候成功,key 存在的时候设置不成功,Redis 自己是单线程,串行执行的,第一个执行的才可以设置成功
     * PX 表示过期时间,没有设置的话,忘记删除,就会永远不过期
     */
    public boolean getLock(){
        RedisCallback<Boolean> redisCallback = connection -> {
            //设置NX
            RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
            //设置过期时间
            Expiration expiration = Expiration.seconds(expireTime);
            //序列化key
            byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
            //序列化value
            byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
            //执行setnx操作
            Boolean result = connection.set(redisKey, redisValue, expiration, setOption);
            return result;
        };
        //获取分布式锁
        Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
        return lock;
    }
    /**
     * 释放锁的时候随机数相同的时候才可以释放,避免释放了别人设置的锁(自己的已经过期了所以别人才可以设置成功)
     * 释放的时候采用 LUA 脚本,因为 delete 没有原生支持删除的时候校验值,证明是当前线程设置进去的值
     * 脚本是在官方文档里面有的
     */
    public boolean unLock() {
        // key 是自己才可以释放,不是就不能释放别人的锁
        String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
                "    return redis.call(\"del\",KEYS[1])\n" +
                "else\n" +
                "    return 0\n" +
                "end";
        RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class);
        List<String> keys = Arrays.asList(key);
        // 执行脚本的时候传递的 value 就是对应的值
        Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
        log.info("释放锁的结果:"+result);
        return result;
    }
}

每次获取的时候,自己线程需要new对应的RedisLock:

public String redisLock(){
    log.info("我进入了方法!");
    try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){
        if (redisLock.getLock()) {
            log.info("我进入了锁!!");
            Thread.sleep(15000);
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
    log.info("方法执行完成");
    return "方法执行完成";
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
存储 调度
分布式锁设计问题之云存储的最佳实践中保障分布式锁的容错能力如何解决
分布式锁设计问题之云存储的最佳实践中保障分布式锁的容错能力如何解决
|
6天前
|
NoSQL Java API
分布式锁的实现原理与应用场景,5 分钟彻底搞懂!
本文详细解析了分布式锁的实现原理与应用场景,包括线程锁、进程锁和分布式锁的区别,以及分布式锁的四种要求和三种实现方式(数据库乐观锁、ZooKeeper、Redis)。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
分布式锁的实现原理与应用场景,5 分钟彻底搞懂!
|
4月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
4月前
|
NoSQL Java Redis
分布式锁实现原理问题之使用Redis的setNx命令来实现分布式锁问题如何解决
分布式锁实现原理问题之使用Redis的setNx命令来实现分布式锁问题如何解决
|
1月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
28 1
|
1月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
40 1
|
26天前
|
存储 缓存 数据处理
深度解析:Hologres分布式存储引擎设计原理及其优化策略
【10月更文挑战第9天】在大数据时代,数据的规模和复杂性不断增加,这对数据库系统提出了更高的要求。传统的单机数据库难以应对海量数据处理的需求,而分布式数据库通过水平扩展提供了更好的解决方案。阿里云推出的Hologres是一个实时交互式分析服务,它结合了OLAP(在线分析处理)与OLTP(在线事务处理)的优势,能够在大规模数据集上提供低延迟的数据查询能力。本文将深入探讨Hologres分布式存储引擎的设计原理,并介绍一些关键的优化策略。
84 0
|
2月前
|
网络协议 安全 Java
分布式(基础)-RMI的原理
分布式(基础)-RMI的原理
|
5月前
|
Prometheus 运维 监控
解锁分布式云多集群统一监控的云上最佳实践
为应对分布式云多集群监控的挑战,阿里云可观测监控 Prometheus 版结合 ACK One,凭借高效纳管与全局监控方案有效破解了用户在该场景的监控运维痛点,为日益增长的业务需求提供了一站式、高效、统一的监控解决方案,实现成本与运维效率的双重优化。助力企业的数字化转型与业务快速增长,在复杂多变的云原生时代中航行,提供了一个强有力的罗盘与风帆。
55764 24
|
3月前
|
Kubernetes Go 数据库
go-zero 分布式事务最佳实践
go-zero 分布式事务最佳实践
下一篇
无影云桌面