缓存失效问题和Redis分布式锁
缓存失效问题
1
、缓存穿透
缓存穿透
是指
查询一个一定不存在的数据
,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的 null
写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。
在流量大时
,可能
DB
就挂掉了,要是有人利用不存在的
key
频繁攻击我们的应用,这就是漏洞。
解决:
缓存空结果、并且设置短的过期时间。
2
、缓存雪崩
缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到 DB
,
DB
瞬时压力过重雪崩。
解决:
原有的失效时间基础上增加一个随机值
,比如
1-5
分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件
3
、缓存击穿
对于一些设置了过期时间的
key
,如果这些
key
可能会在某些时间点被超高并发地访问,是一种非常“
热点
”
的数据。
这个时候,需要考虑一个问题:如果这个
key
在大量请求同时进来前正好失效,那么所有对这个 key
的数据查询都落到
db
,我们称为缓存击穿
解决:
加锁
加锁:
3.1本地锁
public String test() {
//只要是同一把锁,就能锁住需要这个锁的所有线程
//1,synchronized (this):Springboot中所有组件都是单例的
synchronized (this){
//得到锁以后,要再去缓存中确定一次,如果没有才继续查询
String a = redisTemplate.opsForValue().get("a");
//缓存中有,返回
if(!StringUtils.isEmpty(a)){
return a;
}
//继续操作,从数据库里拿到值放到redis里面并返回,这里就不展开了
}
}
3.2分布式锁
3.2.1利用redis实现简易的分布式锁
xshell里针对同一个linux服务器打开多个窗口,启动redis,同时执行
set lock 1 EX 300 NX
因NX表示不存在才能set成功,则几个窗口在抢占过程中只有一个set成功了
java代码:
public Map<String, List<Catelog2Vo>> getCatelogJsonFromDbWithRedisLock() {
String token = UUID.randomUUID().toString();
//1.占分布式锁,去redis占坑
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",token,300,TimeUnit.SECONDS);
if(lock){
//加锁成功...执行业务
//设置过期时间,必须和加锁是同步的,原子的,以防出现异常而没有解锁
//redisTemplate.expire("lock",30,TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> data=null;
try {
data = getStringListMap();
}finally {
//已得到数据,解锁
//redisTemplate.delete("lock");
//防止删除别人的锁,先检查是不是自己的锁
// if(redisTemplate.opsForValue().get("lock").equals(token)){
// redisTemplate.delete("lock");
// }
//为了原子操作,使用lua脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; //删除锁
Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class) , Arrays.asList("lock"), token);
}
return data;
}else{
//加锁失败。。。重试,自旋的方式
//休眠100ms重试
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.2.2Redisson
Redisson 为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力。
引入pom:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>
编写配置类:
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
@Configuration
public class MyRedissonConfig {
//所有redisson操作都是通过RedissonClient对象操作的
@Bean(destroyMethod = "shutdown")
public RedissonClient redisson() throws IOException {
Config config = new Config();
//单节点模式
//可以用"rediss://"来启用 SSL 连接
config.useSingleServer().setAddress("redis://192.168.56.10:6379");
return Redisson.create(config);
}
}
编写测试方法:
@Autowired
private RedissonClient redisson;
@ResponseBody
@GetMapping("/hello")
public String hello() {
//1获取一把锁,只要锁名一样就是同一把锁
RLock lock = redisson.getLock("anyLock");
//2加锁
//阻塞式等待,默认加锁30s
//如果业务超长,锁自动续期30s,而业务一旦完成,就不会继续续期,哪怕不手动解锁,锁也会在默认30s后自动释放,所以不会有死锁问题
//lock.lock();
// 加锁以后 10 秒钟自动解锁,但要注意自动解锁时间一定要大于业务执行时间
// 指定了解锁时间后就不会再自动续期了,未指定的情况下会有一个定时任务,只要当前任务还在占锁,就会一直重新给锁设置过期时间
// 无需调用 unlock 方法手动解锁
//lock.lock(10, TimeUnit.SECONDS);
try{
// 尝试加锁,最多等待 100 秒,没有则放弃加锁。上锁以后 10 秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
System.out.println("加锁成功,执行业务"+Thread.currentThread().getId());
Thread.sleep(3000);
}
}catch (Exception e){
}
finally {
System.out.println("释放锁"+Thread.currentThread().getId());
lock.unlock();
}
return "hello";
}
读写锁:
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("write")
@ResponseBody
public String write(){
RReadWriteLock rwLock = redisson.getReadWriteLock("rw-lock");
String s = null;
RLock lock = rwLock.writeLock();
try{
//1改数据加写锁,读数据加读锁
lock.lock();
s = UUID.randomUUID().toString();
Thread.sleep(5000);
redisTemplate.opsForValue().set("writeValue",s);
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
return s;
}
//保证一定读到最新数据,修改期间写锁是排他锁,读锁是共享锁
//写锁不释放,读锁就会一直等待写锁解除
//读+读 相当于无锁,并发读,同时加锁成功
//写+读 等待写锁释放
//写+写 阻塞方式
//读+写 有读锁,写需要等待
//只要有写,则必然会有等待出现
@ResponseBody
@GetMapping("read")
public String read(){
RReadWriteLock rwLock = redisson.getReadWriteLock("rw-lock");
String writeValue = "";
//加读锁
RLock lock = rwLock.readLock();
lock.lock();
try {
writeValue = (String) redisTemplate.opsForValue().get("writeValue");
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
return writeValue;
}
信号量:
//信号量可用于分布式限流
//事先在redis里存park,值为3,每次访问park的值减1,减到0后就会一直等待park的值大于0
@GetMapping("park")
@ResponseBody
public String park(){
RSemaphore park = redisson.getSemaphore("park");
boolean b = false;
try {
//park.acquire();//获取一个信号量,阻塞式等待
b = park.tryAcquire();//尝试获取一个信号量
if(b){
//执行业务
}else {
return "error";
}
} catch (Exception e) {
e.printStackTrace();
}
return "ok==>"+b;
}
@GetMapping("go")
@ResponseBody
public String go(){
RSemaphore park = redisson.getSemaphore("park");
try {
park.release();//释放一个信号量
} catch (Exception e) {
e.printStackTrace();
}
return "ok";
}
闭锁:
/**
* 放假锁门
* 人走完了才锁
*/
@GetMapping("lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch downLatch = redisson.getCountDownLatch("door");
downLatch.trySetCount(5);//5个班的人都走了才行,每次访问下面的run/id就会减1,到0时本方法执行成功,返回“放假了”
downLatch.await();
return "放假了";
}
@GetMapping("run/{id}")
@ResponseBody
public String run(@PathVariable Long id){
RCountDownLatch downLatch = redisson.getCountDownLatch("door");
downLatch.countDown();//计数减1
return id+"班的人都走了";
}