02 单体应用解决超卖的问题
正确示例:将事务包含在锁的控制范围内
保证在锁释放之前,事务已经提交。
//@Transactional(rollbackFor = Exception.class) public synchronized Long createOrder() throws Exception { TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition); Product product = productMapper.selectByPrimaryKey(purchaseProductId); if (product == null) { platformTransactionManager.rollback(transaction1); throw new Exception("购买商品:" + purchaseProductId + "不存在"); } //商品当前库存 Integer currentCount = product.getCount(); //校验库存 if (purchaseProductNum > currentCount) { platformTransactionManager.rollback(transaction1); throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买"); } productMapper.updateProductCount(purchaseProductNum, new Date(), product.getId()); Order order = new Order(); // ... 省略 Set orderMapper.insertSelective(order); OrderItem orderItem = new OrderItem(); orderItem.setOrderId(order.getId()); // ... 省略 Set return order.getId(); platformTransactionManager.commit(transaction1); }
正确示例:使用synchronized的代码块
public Long createOrder() throws Exception { Product product = null; //synchronized (this) { //synchronized (object) { synchronized (DBOrderService2.class) { TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition); product = productMapper.selectByPrimaryKey(purchaseProductId); if (product == null) { platformTransactionManager.rollback(transaction1); throw new Exception("购买商品:" + purchaseProductId + "不存在"); } //商品当前库存 Integer currentCount = product.getCount(); System.out.println(Thread.currentThread().getName() + "库存数:" + currentCount); //校验库存 if (purchaseProductNum > currentCount) { platformTransactionManager.rollback(transaction1); throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买"); } productMapper.updateProductCount(purchaseProductNum, new Date(), product.getId()); platformTransactionManager.commit(transaction1); } TransactionStatus transaction2 = platformTransactionManager.getTransaction(transactionDefinition); Order order = new Order(); // ... 省略 Set orderMapper.insertSelective(order); OrderItem orderItem = new OrderItem(); // ... 省略 Set orderItemMapper.insertSelective(orderItem); platformTransactionManager.commit(transaction2); return order.getId();
正确示例:使用Lock
private Lock lock = new ReentrantLock(); public Long createOrder() throws Exception{ Product product = null; lock.lock(); TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition); try { product = productMapper.selectByPrimaryKey(purchaseProductId); if (product==null){ throw new Exception("购买商品:"+purchaseProductId+"不存在"); } //商品当前库存 Integer currentCount = product.getCount(); System.out.println(Thread.currentThread().getName()+"库存数:"+currentCount); //校验库存 if (purchaseProductNum > currentCount){ throw new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买"); } productMapper.updateProductCount(purchaseProductNum,new Date(),product.getId()); platformTransactionManager.commit(transaction1); } catch (Exception e) { platformTransactionManager.rollback(transaction1); } finally { // 注意抛异常的时候锁释放不掉,分布式锁也一样,都要在这里删掉 lock.unlock(); } TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition); Order order = new Order(); // ... 省略 Set orderMapper.insertSelective(order); OrderItem orderItem = new OrderItem(); // ... 省略 Set orderItemMapper.insertSelective(orderItem); platformTransactionManager.commit(transaction); return order.getId(); }
03 常见分布式锁的使用
上面使用的方法只能解决单体项目,当部署多台机器的时候就会失效,因为锁本身就是单机的锁,所以需要使用分布式锁来实现。
3.1 数据库乐观锁
数据库中的乐观锁,加一个version字段,利用CAS来实现,乐观锁的方式支持多台机器并发安全。但是并发量大的时候会导致大量的update失败
3.2 数据库分布式锁
db操作性能较差,并且有锁表的风险,一般不考虑。
✪ 3.2.1 简单的数据库锁
⍟ select for update
直接在数据库新建一张表:
锁的code预先写到数据库中,抢锁的时候,使用select for update查询锁对应的key,也就是这里的code,阻塞就说明别人在使用锁。
// 加上事务就是为了 for update 的锁可以一直生效到事务执行结束 // 默认回滚的是 RunTimeException @Transactional(rollbackFor = Exception.class) public String singleLock() throws Exception { log.info("我进入了方法!"); DistributeLock distributeLock = distributeLockMapper. selectDistributeLock("demo"); if (distributeLock==null) { throw new Exception("分布式锁找不到"); } log.info("我进入了锁!"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "我已经执行完成!"; }
<select id="selectDistributeLock" resultType="com.deltaqin.distribute.model.DistributeLock"> select * from distribute_lock where businessCode = #{businessCode,jdbcType=VARCHAR} for update </select>
使用唯一键作为限制,插入一条数据,其他待执行的SQL就会失败,当数据删除之后再去获取锁 ,这是利用了唯一索引的排他性。
⍟ insert lock
直接维护一张锁表:
@Autowired private MethodlockMapper methodlockMapper; @Override public boolean tryLock() { try { //插入一条数据 insert into methodlockMapper.insert(new Methodlock("lock")); }catch (Exception e){ //插入失败 return false; } return true; } @Override public void waitLock() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void unlock() { //删除数据 delete methodlockMapper.deleteByMethodlock("lock"); System.out.println("-------释放锁------"); }
3.3 Redis setNx
Redis 原生支持的,保证只有一个会话可以设置成功,因为Redis自己就是单线程串行执行的。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
spring.redis.host=localhost
封装一个锁对象:
@Slf4j public class RedisLock implements AutoCloseable { private RedisTemplate redisTemplate; private String key; private String value; //单位:秒 private int expireTime; /** * 没有传递 value,因为直接使用的是随机值 */ public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){ this.redisTemplate = redisTemplate; this.key = key; this.expireTime=expireTime; this.value = UUID.randomUUID().toString(); } /** * JDK 1.7 之后的自动关闭的功能 */ @Override public void close() throws Exception { unLock(); } /** * 获取分布式锁 * SET resource_name my_random_value NX PX 30000 * 每一个线程对应的随机值 my_random_value 不一样,用于释放锁的时候校验 * NX 表示 key 不存在的时候成功,key 存在的时候设置不成功,Redis 自己是单线程,串行执行的,第一个执行的才可以设置成功 * PX 表示过期时间,没有设置的话,忘记删除,就会永远不过期 */ public boolean getLock(){ RedisCallback<Boolean> redisCallback = connection -> { //设置NX RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent(); //设置过期时间 Expiration expiration = Expiration.seconds(expireTime); //序列化key byte[] redisKey = redisTemplate.getKeySerializer().serialize(key); //序列化value byte[] redisValue = redisTemplate.getValueSerializer().serialize(value); //执行setnx操作 Boolean result = connection.set(redisKey, redisValue, expiration, setOption); return result; }; //获取分布式锁 Boolean lock = (Boolean)redisTemplate.execute(redisCallback); return lock; } /** * 释放锁的时候随机数相同的时候才可以释放,避免释放了别人设置的锁(自己的已经过期了所以别人才可以设置成功) * 释放的时候采用 LUA 脚本,因为 delete 没有原生支持删除的时候校验值,证明是当前线程设置进去的值 * 脚本是在官方文档里面有的 */ public boolean unLock() { // key 是自己才可以释放,不是就不能释放别人的锁 String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class); List<String> keys = Arrays.asList(key); // 执行脚本的时候传递的 value 就是对应的值 Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value); log.info("释放锁的结果:"+result); return result; } }
每次获取的时候,自己线程需要new对应的RedisLock:
public String redisLock(){ log.info("我进入了方法!"); try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){ if (redisLock.getLock()) { log.info("我进入了锁!!"); Thread.sleep(15000); } } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } log.info("方法执行完成"); return "方法执行完成"; }