redis缓存优化
分段锁优化
关于一个商品在并发场景防止超卖,通常我们会用到分布式锁。
当我们把库存1000设置进redis,通常我们会对商品id进行加锁,如 lockKey = “product_101_stock”;这样所有的线程都会因为这个锁机制导致扣减库存缓慢。
如果将库存平均拆分成10份,每份100,利用十个lockKey,如product_101_stock_1,product_101_stock_2,…,product_101_stock_10,利用分段锁机制,可以有效提升加锁效率。
缓存击穿(缓存失效)
热点数据在失效的情况下,大量的高并发请求短时间内会绕过缓存,直接打到数据库,造成数据库压力非常大。
或者当热点数据大批量在同一时间都失效了,这时大量的请求都会打到数据库中查询,导致数据库压力过大。
解决方法:
1.热点数据永不过期
2.在设置缓存失效时间的时候,设置一个随机失效时间,这样在同一时间就不会存在大量的key过期。
缓存穿透
缓存穿透是指查询一个根本不存在的数据,缓存层和存储层都不会命中,通常出于容错的考虑,如果从存储层查不到数据则不写入缓
存层。
缓存穿透将导致不存在的数据每次请求都要到存储层去査询, 失去了缓存保护后端存储的意义。
造成缓存穿透的基本原因有两个:
第一、自身业务代码或者数据出现问题,
第二、一些恶意攻击.爬虫等造成大量空命中。
缓存穿透的解决方法:
1.对于不存在的数据设置一个短时效的空缓存。
2.利用布隆过滤器过滤不存在的数据请求
关于布隆过滤器
对于恶意攻击,向服务器请求大量不存在的数据造成的缓存穿透,还可以用布隆过滤器先做一次过滤,对于不存在的数据布隆过滤器一般都能够过滤掉,不让请求再往后端发送。当布隆过滤器说某个值存在时,这个值可能不存在;当它说不存在时,那就肯定不存在。
布隆过滤器原理
- 布隆过滤器内部是一个bit数组。
- 布隆过滤器可能有K个hash算法
- 利用K个hash算法对key的值进行hash运算,并对hash值进行取数组长度模操作,最后的index值就是bit数组的索引下标,最后将下标的bit位置1
index = hash1(key) % bit位长度
4.如果所有的hash值对应的下标都为1,则判定数据存在。
Redis的bitmap只支持2^32大小,对应到内存也就是512MB,误判率万分之一,可以放下2亿左右的数据,性能高,空间占用率及小,省去了大量无效的数据库连接。
redisson实现布隆过滤器
public class RedissonBloomFilter { public static void main(String[] args) { Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.32.128:6379"); // config.useSingleServer().setPassword(""); // 构造Redisson RedissonClient redissonClient = Redisson.create(config); // 初始化布隆过滤器:预计元素为100000000L个,误差率为3% RBloomFilter<Object> bloomFilter = redissonClient.getBloomFilter("phoneList"); bloomFilter.tryInit(100000000L, 0.03); // 将号码插入到布隆过滤器中 bloomFilter.add("10086"); // 判断下面的号码是否在布隆过滤器中 System.out.println(bloomFilter.contains("10000")); System.out.println(bloomFilter.contains("10086")); } }
注意: 布隆过滤器是不能删除数据的,如果要删除数据得重新初始化数据。
缓存雪崩
缓存雪崩指的是缓存层支撑不住或宕掉后,流量会像奔逃的野牛一样,打向后端存储层。
由于缓存层承载着大量请求,有效地保护了存储层,但是如果缓存层由于某些原因不能提供服务(比如超大并发过来,缓存层支撑不住,或者由于缓存设计不好,类似大量请求访问bigkey,导致缓存能支撑的并发急剧下降),于是大量请求都会打到存储层,存储层的调用量会暴增,造成存储层也会级联宕机的情况。
预防和解决缓存雪崩问题, 可以从以下三个方面进行着手。
保证缓存层服务高可用性,比如使用Redis Sentinel或Redis Cluster.
依赖隔离组件为后端限流熔断并降级。比如使用Sentinel或Hystrix限流降级组件。比如服务降级,我们可以针对不同的数据采取不同的处理方式。当业务应用访问的是非核心数据例如电商商品属性,用户信息等)时,暂时停止从缓存中查询这些数据,而是直接返回预定义的默认降级信息、空值或是错误提示信息;当业务应用访问的是核心数据时(例如电商商品库存)时,仍然允许查询缓存,如果缓存缺失,也可以继续通过数据库读取。
提前演练。在项目上线前,演练缓存层宕掉后,应用以及后端的负载情况以及可能出现的问题,在此基础上做一些预案设定。
冷热分离
如果一个电商网站有成千上亿个商品,而商品信息缓存到redis加快访问速度,而不可能所有的商品信息都缓存到redis,这样redis得需要多大才满足。
所以对于热点数据(经常访问的商品)需要常驻到redis,而冷门商品则不必要一直都放在redis中。所以有了冷热分离概念,代码示例如下:
@Service public class ProductService { @Resource private StringRedisTemplate stringRedisTemplate; @Resource private ProductDao productDao; private final static String REDIS_KEY_PREFIX_PRODUCT_CACHE = "product:cache:"; /** * 分布式锁key */ private final static String REDIS_KEY_PREFIX_PRODUCT_CACHE_LOCK = "product:cache:lock:"; private static final Long PRODUCT_CACHE_TIMEOUT = 60 * 60 * 24L; public Product create(Product product){ Product productResult = productDao.create(product); //冷热分离::设置超时时间,如果是冷门商品,则过期时间到了之后,redis之中不会有缓存 ThreadLocalRandom current = ThreadLocalRandom.current(); //生成一个3600秒以内的随机数,防止同时请求导致缓存击穿 long random = current.nextLong(60 * 60); stringRedisTemplate.opsForValue().set(REDIS_KEY_PREFIX_PRODUCT_CACHE + product.getProductId(), JSON.toJSONString(productResult), PRODUCT_CACHE_TIMEOUT + random , TimeUnit.SECONDS); return productResult; } public Product update(Product product){ Product productResult = productDao.update(product); //冷热分离:设置超时时间,如果是冷门商品,则过期时间到了之后,redis之中不会有缓存 ThreadLocalRandom current = ThreadLocalRandom.current(); //生成一个3600秒以内的随机数,防止同时请求导致缓存击穿 long random = current.nextLong(60 * 60); stringRedisTemplate.opsForValue().set(REDIS_KEY_PREFIX_PRODUCT_CACHE + product.getProductId(), JSON.toJSONString(productResult), PRODUCT_CACHE_TIMEOUT + random, TimeUnit.SECONDS); return productResult; } public Product getProduct(Long productId){ Product product = null; String productKey = REDIS_KEY_PREFIX_PRODUCT_CACHE + productId; String productStr = stringRedisTemplate.opsForValue().get(productKey); if(!StringUtil.isBlank(productStr)){ product = JSON.parseObject(productStr, Product.class); //冷热分离:读延期,热门数据永不过期, stringRedisTemplate.expire(productKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS); } product = productDao.get(productId); if(product != null){ ThreadLocalRandom current = ThreadLocalRandom.current(); //生成一个3600秒以内的随机数,防止同时请求导致缓存击穿 long random = current.nextLong(60 * 60); stringRedisTemplate.opsForValue().set(productKey, JSON.toJSONString(product), PRODUCT_CACHE_TIMEOUT + random, TimeUnit.SECONDS); }else{ //设置一个短时效空缓存,防止缓存穿透 stringRedisTemplate.opsForValue().set(productKey,"",5*60,TimeUnit.SECONDS); } return product; } }
突发性热点缓存重建导致系统压力暴增
类似于缓存击穿,某个热点数据或者冷门数据突然被大量请求。这是缓存服务器中是没有该数据缓存,导致数据库压力过大。
利用双重检测锁(double check lock dcl)机制,代码如下:
synchronized版本
public Product getProduct(Long productId){ Product product = null; String productKey = REDIS_KEY_PREFIX_PRODUCT_CACHE + productId; String productStr = stringRedisTemplate.opsForValue().get(productKey); if(!StringUtil.isBlank(productStr)){ product = JSON.parseObject(productStr, Product.class); //读延期,热门数据永不过期, stringRedisTemplate.expire(productKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS); return product; } //双重检测锁机制,当缓存中不存在商品信息时,利用管程锁,单台jvm下,只有一个线程可以进去重建数据。(每台应用服务器都会去重建一次) //当数据重建完成后,缓存中有数据,其他线程可以在缓存中获取到数据信息。 //这里为什么不用分布式锁加自旋的方式,一直在redis缓存中获取数据呢? //这里this肯定不行,因为冷门数据如果是101,如果锁this,则102商品本来是正常都会被锁住。 synchronized (this){ productStr = stringRedisTemplate.opsForValue().get(productKey); if(!StringUtil.isBlank(productStr)){ product = JSON.parseObject(productStr, Product.class); //读延期,热门数据永不过期, stringRedisTemplate.expire(productKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS); return product; } product = productDao.get(productId); if(product != null){ ThreadLocalRandom current = ThreadLocalRandom.current(); //生成一个3600秒以内的随机数,防止同时请求导致缓存击穿 long random = current.nextLong(60 * 60); stringRedisTemplate.opsForValue().set(productKey, JSON.toJSONString(product), PRODUCT_CACHE_TIMEOUT + random, TimeUnit.SECONDS); }else{ //设置一个空缓存,防止缓存穿透 stringRedisTemplate.opsForValue().set(productKey,"",5*60,TimeUnit.SECONDS); } } return product; }
redisson版本
public Product getProductRedisson(Long productId){ Product product = null; String productKey = REDIS_KEY_PREFIX_PRODUCT_CACHE + productId; String productStr = stringRedisTemplate.opsForValue().get(productKey); if(!StringUtil.isBlank(productStr)){ product = JSON.parseObject(productStr, Product.class); //读延期,热门数据永不过期, stringRedisTemplate.expire(productKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS); return product; } //利用redisson的分布式锁实现热点数据重建 RLock hotCreateCacheLock = redisson.getLock(REDIS_KEY_PREFIX_PRODUCT_CACHE_LOCK + productId); hotCreateCacheLock.lock(); try{ productStr = stringRedisTemplate.opsForValue().get(productKey); if(!StringUtil.isBlank(productStr)){ product = JSON.parseObject(productStr, Product.class); //读延期,热门数据永不过期, stringRedisTemplate.expire(productKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS); return product; } //重建缓存逻辑 product = productDao.get(productId); if(product != null){ ThreadLocalRandom current = ThreadLocalRandom.current(); //生成一个3600秒以内的随机数,防止同时请求导致缓存击穿 long random = current.nextLong(60 * 60); stringRedisTemplate.opsForValue().set(productKey, JSON.toJSONString(product), PRODUCT_CACHE_TIMEOUT + random, TimeUnit.SECONDS); }else{ //设置一个空缓存,防止缓存穿透 stringRedisTemplate.opsForValue().set(productKey,"",5*60,TimeUnit.SECONDS); } return product; }catch (Exception e){ e.printStackTrace(); return null; }finally { hotCreateCacheLock.unlock(); } }
双写一致性
关于双写不一致性
线程2在线程1查询商品信息后,修改了商品信息,并在某些情况下,线程2先修改了缓存中的信息,然后线程1再次更新缓存。这样就导致了数据库中的商品信息和缓存中的商品信息不一致。
解决方法 : 利用分布式锁解决双写不一致。
解决思路,将查询商品信息和更新缓存绑定到锁中,这样无论哪个线程在前,数据库中的信息和缓存中的信息都是一致的。
代码示例:
@Service public class ProductService { @Resource private StringRedisTemplate stringRedisTemplate; @Resource private ProductDao productDao; @Resource private Redisson redisson; private final static String REDIS_KEY_PREFIX_PRODUCT_CACHE = "product:cache:"; private final static String REDIS_KEY_PREFIX_PRODUCT_CACHE_LOCK = "product:cache:lock:"; private final static String REDIS_KEY_PREFIX_PRODUCT_UPDATE_CACHE_LOCK = "product:cache:update:lock:"; private static final Long PRODUCT_CACHE_TIMEOUT = 60 * 60 * 24L; public Product create(Product product){ Product productResult = productDao.create(product); //设置超时时间,如果是冷门商品,则过期时间到了之后,redis之中不会有缓存 ThreadLocalRandom current = ThreadLocalRandom.current(); //生成一个3600秒以内的随机数,防止同时请求导致缓存击穿 long random = current.nextLong(60 * 60); stringRedisTemplate.opsForValue().set(REDIS_KEY_PREFIX_PRODUCT_CACHE + product.getProductId(), JSON.toJSONString(productResult), PRODUCT_CACHE_TIMEOUT + random , TimeUnit.SECONDS); return productResult; } public Product update(Product product){ RLock updateCacheLock = redisson.getLock(REDIS_KEY_PREFIX_PRODUCT_UPDATE_CACHE_LOCK + product.getProductId()); //将查询数据库和更新到缓存利用分布式锁锁起来,防止双写不一致性 updateCacheLock.lock(); try{ Product productResult = productDao.update(product); //设置超时时间,如果是冷门商品,则过期时间到了之后,redis之中不会有缓存 ThreadLocalRandom current = ThreadLocalRandom.current(); //生成一个3600秒以内的随机数,防止同时请求导致缓存击穿 long random = current.nextLong(60 * 60); stringRedisTemplate.opsForValue().set(REDIS_KEY_PREFIX_PRODUCT_CACHE + product.getProductId(), JSON.toJSONString(productResult), PRODUCT_CACHE_TIMEOUT + random, TimeUnit.SECONDS); return productResult; }finally { updateCacheLock.unlock(); } } public Product getProduct(Long productId){ Product product = null; String productKey = REDIS_KEY_PREFIX_PRODUCT_CACHE + productId; String productStr = stringRedisTemplate.opsForValue().get(productKey); if(!StringUtil.isBlank(productStr)){ product = JSON.parseObject(productStr, Product.class); //读延期,热门数据永不过期, stringRedisTemplate.expire(productKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS); return product; } //双重检测锁机制,当缓存中不存在商品信息时,利用管程锁,单台jvm下,只有一个线程可以进去重建数据。(每台应用服务器都会去重建一次) //当数据重建完成后,缓存中有数据,其他线程可以在缓存中获取到数据信息。 //这里为什么不用分布式锁加自旋的方式,一直在redis缓存中获取数据呢? //这里this肯定不行,因为冷门数据如果是101,如果锁this,则102商品本来是正常都会被锁住。 synchronized (this){ productStr = stringRedisTemplate.opsForValue().get(productKey); if(!StringUtil.isBlank(productStr)){ product = JSON.parseObject(productStr, Product.class); //读延期,热门数据永不过期, stringRedisTemplate.expire(productKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS); return product; } product = productDao.get(productId); if(product != null){ ThreadLocalRandom current = ThreadLocalRandom.current(); //生成一个3600秒以内的随机数,防止同时请求导致缓存击穿 long random = current.nextLong(60 * 60); stringRedisTemplate.opsForValue().set(productKey, JSON.toJSONString(product), PRODUCT_CACHE_TIMEOUT + random, TimeUnit.SECONDS); }else{ //设置一个空缓存,防止缓存穿透 stringRedisTemplate.opsForValue().set(productKey,"",5*60,TimeUnit.SECONDS); } } return product; } public Product getProductRedisson(Long productId){ Product product = null; String productKey = REDIS_KEY_PREFIX_PRODUCT_CACHE + productId; String productStr = stringRedisTemplate.opsForValue().get(productKey); if(!StringUtil.isBlank(productStr)){ product = JSON.parseObject(productStr, Product.class); //读延期,热门数据永不过期, stringRedisTemplate.expire(productKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS); return product; } //利用redisson的分布式锁实现热点数据重建 RLock hotCreateCacheLock = redisson.getLock(REDIS_KEY_PREFIX_PRODUCT_CACHE_LOCK + productId); hotCreateCacheLock.lock(); try{ productStr = stringRedisTemplate.opsForValue().get(productKey); if(!StringUtil.isBlank(productStr)){ product = JSON.parseObject(productStr, Product.class); //读延期,热门数据永不过期, stringRedisTemplate.expire(productKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS); return product; } //将查询数据库和更新到缓存利用分布式锁锁起来,防止双写不一致性 RLock updateCacheLock = redisson.getLock(REDIS_KEY_PREFIX_PRODUCT_UPDATE_CACHE_LOCK + productId); updateCacheLock.lock(); try{ product = productDao.get(productId); if(product != null){ ThreadLocalRandom current = ThreadLocalRandom.current(); //生成一个3600秒以内的随机数,防止同时请求导致缓存击穿 long random = current.nextLong(60 * 60); stringRedisTemplate.opsForValue().set(productKey, JSON.toJSONString(product), PRODUCT_CACHE_TIMEOUT + random, TimeUnit.SECONDS); }else{ //设置一个空缓存,防止缓存穿透 stringRedisTemplate.opsForValue().set(productKey,"",5*60,TimeUnit.SECONDS); } return product; }finally { updateCacheLock.unlock(); } }catch (Exception e){ e.printStackTrace(); return null; }finally { hotCreateCacheLock.unlock(); } } }
双写不一致分布式锁优化(读写锁)
如果大量请求都是读操作,则可以利用redisson的读写锁来优化
关于并发问题, 读-读之间是没有并发问题,写-写,有并发问题,读-写 ,有并发问题
读写锁原理,
1.线程1读的时候,线程2读,不互斥
2.线程1写的时候,线程2读,互斥,必须要等到线程1写完成后,线程2才能获取到锁
3.线程1写的时候,线程2写,互斥,必须要等到线程1写完成后,线程2才能获取到写锁。
线程1 | 线程2 | 互斥情况 |
读锁 | 读锁 | 不互斥,可以共同获取到锁(锁可重入 ) |
读锁 | 写锁 | 互斥,线程2必须等到线程1释放读锁后才能加写锁。 |
写锁 | 读锁 | 互斥,线程2必须等到线程1释放写锁后才能加读锁。 |
写锁 | 写锁 | 写写互斥,必须串行化 |
代码优化:
public Product update(Product product){ // RLock updateCacheLock = redisson.getLock(REDIS_KEY_PREFIX_PRODUCT_UPDATE_CACHE_LOCK + product.getProductId()); //将查询数据库和更新到缓存利用分布式锁锁起来,防止双写不一致性 // updateCacheLock.lock(); RReadWriteLock updateCacheLock = redisson.getReadWriteLock(REDIS_KEY_PREFIX_PRODUCT_UPDATE_CACHE_LOCK + product.getProductId()); RLock writeLock = updateCacheLock.writeLock(); writeLock.lock(); try{ Product productResult = productDao.update(product); //设置超时时间,如果是冷门商品,则过期时间到了之后,redis之中不会有缓存 ThreadLocalRandom current = ThreadLocalRandom.current(); //生成一个3600秒以内的随机数,防止同时请求导致缓存击穿 long random = current.nextLong(60 * 60); stringRedisTemplate.opsForValue().set(REDIS_KEY_PREFIX_PRODUCT_CACHE + product.getProductId(), JSON.toJSONString(productResult), PRODUCT_CACHE_TIMEOUT + random, TimeUnit.SECONDS); return productResult; }finally { // updateCacheLock.unlock(); writeLock.unlock(); } } public Product getProductRedisson(Long productId){ Product product = null; String productKey = REDIS_KEY_PREFIX_PRODUCT_CACHE + productId; String productStr = stringRedisTemplate.opsForValue().get(productKey); if(!StringUtil.isBlank(productStr)){ product = JSON.parseObject(productStr, Product.class); //读延期,热门数据永不过期, stringRedisTemplate.expire(productKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS); return product; } //利用redisson的分布式锁实现热点数据重建 RLock hotCreateCacheLock = redisson.getLock(REDIS_KEY_PREFIX_PRODUCT_CACHE_LOCK + productId); hotCreateCacheLock.lock(); try{ productStr = stringRedisTemplate.opsForValue().get(productKey); if(!StringUtil.isBlank(productStr)){ product = JSON.parseObject(productStr, Product.class); //读延期,热门数据永不过期, stringRedisTemplate.expire(productKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS); return product; } //将查询数据库和更新到缓存利用分布式锁锁起来,防止双写不一致性 // RLock updateCacheLock = redisson.getLock(REDIS_KEY_PREFIX_PRODUCT_UPDATE_CACHE_LOCK + productId); // updateCacheLock.lock(); RReadWriteLock updateCacheLock = redisson.getReadWriteLock(REDIS_KEY_PREFIX_PRODUCT_UPDATE_CACHE_LOCK + productId); RLock readLock = updateCacheLock.readLock(); //读锁。其他线程在没有写锁的情况,这里的锁是并行执行的,意味着,读锁在写锁没有锁住的情况下,可以并行执行 //当有其他线程在执行update方法时,由于写锁的加入,这里必须等到写锁释放后,才能获取到读锁 readLock.lock(); try{ product = productDao.get(productId); if(product != null){ ThreadLocalRandom current = ThreadLocalRandom.current(); //生成一个3600秒以内的随机数,防止同时请求导致缓存击穿 long random = current.nextLong(60 * 60); stringRedisTemplate.opsForValue().set(productKey, JSON.toJSONString(product), PRODUCT_CACHE_TIMEOUT + random, TimeUnit.SECONDS); }else{ //设置一个空缓存,防止缓存穿透 stringRedisTemplate.opsForValue().set(productKey,"",5*60,TimeUnit.SECONDS); } return product; }finally { // updateCacheLock.unlock(); readLock.unlock(); } }catch (Exception e){ e.printStackTrace(); return null; }finally { hotCreateCacheLock.unlock(); } }
关于双写不一致的其他解决方案
对于并发几率很小的数据(如个人维度的订单数据,用户数据等),这种几乎不用考虑双写一致性,很少发生缓存不一致,可以给缓存数据加上过期时间,每隔一段时间触发读的主动更新即可。
就算并发很高,如果业务上能容忍短时间的缓存数据不一致(如商品名称,商品分类菜单等),缓存加上过期时间依然可以解决大部分业务对缓存的要求。
也可以用阿里开源的canal通过监听数据库的binlog日志及时的去修改缓存,但是引入了新的中间件,增加了系统的复杂度。
双写一致总结
针对读多写少的情况加入缓存以提高性能,如果写多读多的情况又不能容忍双写不一致,那就没必要加缓存了,可以直接操作数据库。放入缓存的数据应该是实时性,一致性要求不是很高的数据。切记不要为了用缓存,同时又要保证绝对的一致性做大量的过度设计和控制,增加系统复杂度。
对于热点缓存初始化的逻辑
采用获取一次缓存,如果为空的情况,获取分布式锁,让一个线程去重建缓存,另外的线程未获取到锁的情况,休眠短时间,然后再自旋获取缓存。
伪代码逻辑
public String get(String key){ String value = redis.get(key); if(value == null){ //获取锁操作 String lockKey = "lock:key"; if(redis.set(lockKey,"1","ex 180","nx")){ value = db.get(key); redis.setex(key,value,timeout); //删除锁操作 redis.delete(lockKey); }else{ Thread.sleep(50); //自旋 return get(key); } } return value; }