redis分布式锁小试

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 一、场景   项目A监听mq中的其他项目的部署消息(包括push_seq, status, environment,timestamp等),然后将部署消息同步到数据库中(项目X在对应环境[environment]上部署的push_seq[项目X的版本])。

一、场景

  项目A监听mq中的其他项目的部署消息(包括push_seq, status, environment,timestamp等),然后将部署消息同步到数据库中(项目X在对应环境[environment]上部署的push_seq[项目X的版本])。那么问题来了,mq中加入包含了两个部署消息 dm1 和 dm2,dm2的push_seq > dm1的push_seq,在分布式的情况下,dm1 和 dm2可能会分别被消费(也就是并行),那么在同步数据库的时候可能会发生 dm1 的数据保存 后于 dm2的数据保存,导致保存项目的部署信息发生异常。

二、解决思路

  将mq消息的并行消费变成串行消费,这里借助redis分布式锁来完成。同一个服务在分布式的状态下,监听到mq消息后,触发方法的执行,执行之前(通过spring aop around来做的)首先获得redis的一个分布式锁,获取锁成功之后才能执行相关的逻辑以及数据库的保存,最后释放锁。

三、主要代码

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* @author: hujunzheng
* @create: 17/9/29 下午2:49
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface RedisLock {
    /**
     * redis的key
     * @return
     */
    String value();
    /**
     * 持锁时间,单位毫秒,默认一分钟
     */
    long keepMills() default 60000;
    /**
     * 当获取失败时候动作
     */
    LockFailAction action() default LockFailAction.GIVEUP;
    
    public enum LockFailAction{
        /**
         * 放弃
         */
        GIVEUP,
        /**
         * 继续
         */
        CONTINUE;
    }
    /**
     * 睡眠时间,设置GIVEUP忽略此项
     * @return
     */
    long sleepMills() default 500;
}

 

import java.lang.reflect.Method;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @author: hujunzheng
* @create: 17/9/29 下午2:49
*/
@Component
@Aspect
public class RedisLockAspect {
    private static final Log log = LogFactory.getLog(RedisLockAspect.class);

    @Autowired
    private RedisCacheTemplate.RedisLockOperation redisLockOperation;

    @Pointcut("execution(* com.hjzgg..StargateDeployMessageConsumer.consumeStargateDeployMessage(..))" +
            "&& @annotation(me.ele.api.portal.service.redis.RedisLock)")
    private void lockPoint(){}

    @Around("lockPoint()")
    public Object arround(ProceedingJoinPoint pjp) throws Throwable{
        MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
        Method method = methodSignature.getMethod();
        RedisLock lockInfo = method.getAnnotation(RedisLock.class);
        
     /*
         String lockKey = lockInfo.value();
if (method.getParameters().length == 1 && pjp.getArgs()[0] instanceof DeployMessage) {
DeployMessage deployMessage = (DeployMessage) pjp.getArgs()[0];
lockKey += deployMessage.getEnv();
System.out.println(lockKey);
}
     */
        boolean lock = false;
        Object obj = null;
        while(!lock){
            long timestamp = System.currentTimeMillis()+lockInfo.keepMills();
            lock = setNX(lockInfo.value(), timestamp);
            //得到锁,已过期并且成功设置后旧的时间戳依然是过期的,可以认为获取到了锁(成功设置防止锁竞争)
            long now = System.currentTimeMillis();
            if(lock || ((now > getLock(lockInfo.value())) && (now > getSet(lockInfo.value(), timestamp)))){
                log.info("得到redis分布式锁...");
                obj = pjp.proceed();
                if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){
                    releaseLock(lockInfo.value());
                }
            }else{
                if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){
                    log.info("稍后重新请求redis分布式锁...");
                    Thread.currentThread().sleep(lockInfo.sleepMills());
                }else{
                    log.info("放弃redis分布式锁...");
                    break;
                }
            }
        }
        return obj;
    }
    private boolean setNX(String key,Long value){
        return redisLockOperation.setNX(key, value);
    }
    private long getLock(String key){
        return redisLockOperation.get(key);
    }
    private Long getSet(String key,Long value){
        return redisLockOperation.getSet(key, value);
    }
    private void releaseLock(String key){
        redisLockOperation.delete(key);
    }
}

四、遇到的问题

  

  开始是将锁加到deploy的方法上的,但是一直aop一直没有作用,换到consumeStargateDeployMessage方法上就可以了。考虑了一下是因为 @Transactional的原因。这里注意下。

   在一篇文章中找到了原因:SpringBoot CGLIB AOP解决Spring事务,对象调用自己方法事务失效.

  只要脱离了Spring容器管理的所有对象,对于SpringAOP的注解都会失效,因为他们不是Spring容器的代理类,SpringAOP,就切入不了。也就是说是 @Transactional注解方法的代理对象并不是spring代理对象。

  参考: 关于proxy模式下,@Transactional标签在创建代理对象时的应用

五、使用spring-redis中的RedisLockRegistry

import java.util.concurrent.locks.Lock;
import org.springframework.integration.redis.util.RedisLockRegistry;

@Bean
public RedisLockRegistry redisLockRegistry(@Value("${xxx.xxxx.registry}") String redisRegistryKey,
                                           RedisTemplate redisTemplate) {
    return new RedisLockRegistry(redisTemplate.getConnectionFactory(), redisRegistryKey, 200000);
}

Lock lock = redisLockRegistry.obtain(appId);

lock.tryLock(180, TimeUnit.SECONDS);
....
lock.unlock();  

六、参考

  其他工具类,请参考这里

七、springboot LockRegistry

  

      分布式锁-RedisLockRegistry源码分析[转]

 

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.integration.redis.util.RedisLockRegistry;
import redis.clients.jedis.JedisShardInfo;


@Ignore
public class RedisLockTest {

  private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockTest.class);
  private static final String LOCK = "xxx.xxx";
  private RedisLockRegistry redisLockRegistry;

  @Before
  public void setUp() {
    JedisShardInfo shardInfo = new JedisShardInfo("127.0.0.1");
    JedisConnectionFactory factory = new JedisConnectionFactory(shardInfo);
    redisLockRegistry = new RedisLockRegistry(factory, "test", 50L);
  }

  private class TaskA implements Runnable {

    @Override
    public void run() {
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      Lock lock = redisLockRegistry.obtain(LOCK);
      try {
        lock.lock();
        LOGGER.info("Lock {} is obtained", lock);
        Thread.sleep(10);
        lock.unlock();
        LOGGER.info("Lock {} is unlocked", lock);
      } catch (Exception ex) {
        LOGGER.error("Lock {} unlock failed", lock, ex);
      }
    }
  }

  private class TimeoutTask implements Runnable {

    @Override
    public void run() {
      Lock lock = redisLockRegistry.obtain(LOCK);
      try {
        lock.lock();
        LOGGER.info("Lock {} is obtained", lock);
        Thread.sleep(5000);
        lock.unlock();
        LOGGER.info("Lock {} is unlocked", lock);
      } catch (Exception ex) {
        LOGGER.error("Lock {} unlock failed", lock, ex);
      }
    }
  }

  @Test
  public void test() throws InterruptedException, TimeoutException {
    ExecutorService service = Executors.newFixedThreadPool(2);
    service.execute(new TimeoutTask());
    service.execute(new TaskA());
    service.shutdown();
    if (!service.awaitTermination(1, TimeUnit.MINUTES)) {
      throw new TimeoutException();
    }
  }
}

 

 

 

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
5月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
150 2
基于Redis的高可用分布式锁——RedLock
|
5月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
|
6月前
|
存储 缓存 NoSQL
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
redis分布式锁、redisson、可重入、主从一致性、WatchDog、Redlock红锁、zookeeper;Redis集群、主从复制,全量同步、增量同步;哨兵,分片集群,Redis为什么这么快,I/O多路复用模型——用户空间和内核空间、阻塞IO、非阻塞IO、IO多路复用,Redis网络模型
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
|
1月前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
162 5
|
2月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
69 16
|
6月前
|
NoSQL Java Redis
分布式锁实现原理问题之使用Redis的setNx命令来实现分布式锁问题如何解决
分布式锁实现原理问题之使用Redis的setNx命令来实现分布式锁问题如何解决
104 0
|
3月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
87 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
3月前
|
NoSQL Redis 数据库
计数器 分布式锁 redis实现
【10月更文挑战第5天】
63 1
|
3月前
|
NoSQL 算法 关系型数据库
Redis分布式锁
【10月更文挑战第1天】分布式锁用于在多进程环境中保护共享资源,防止并发冲突。通常借助外部系统如Redis或Zookeeper实现。通过`SETNX`命令加锁,并设置过期时间防止死锁。为避免误删他人锁,加锁时附带唯一标识,解锁前验证。面对锁提前过期的问题,可使用守护线程自动续期。在Redis集群中,需考虑主从同步延迟导致的锁丢失问题,Redlock算法可提高锁的可靠性。
93 4