自己实现一个分布式锁
我的博客大纲
我的后端学习大纲
b3.搭建一个使用redis的基础项目:
- 1.添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
- 2.更改配置文件
spring.redis.host=192.168.148.3
- 3.redis中添加库存:
- 4.编写service层
package com.atguigu.distributed.lock.service;import com.atguigu.distributed.lock.pojo.Stock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.locks.ReentrantLock;@Service
public class StockService {@Autowiredprivate StringRedisTemplate redisTemplate;public void redisDeduct(){//1.查询库存信息String stock = redisTemplate.opsForValue().get("stock").toString();//2.判断库存是否充足if (stock != null && stock.length() != 0){Integer st = Integer.valueOf(stock);if (st > 0){//扣减库存redisTemplate.opsForValue().set("stock",String.valueOf(--st));}}}
}
- 5.项目结构:
b4.Redis分布式锁方式实现:
b4-1.实现流程
- 1.借助于redis中的命令
setnx(key, value)
,key不存在就新增,存在就什么都不做。 - 2.同时
有多个客户端发送setnx命令
,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)
- 3.特征:独占排他使用
- 4.操作:
- 1.多个客户端同时获取锁(setnx)
- 2.获取成功,执行业务逻辑,执行完成释放锁(del)
- 3.其他客户端等待重试(递归)
b4-2.编码实现redis分布式锁:
- 1.改造
StockService
方法:
@Service
public class StockService {@Autowiredprivate StringRedisTemplate redisTemplate;public void deduct() {// 加锁setnxBoolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "111");// 重试:递归调用if (!lock){try {Thread.sleep(50);this.deduct();} catch (InterruptedException e) {e.printStackTrace();}} else {try {// 1. 查询库存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判断库存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣减库存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {// 解锁this.redisTemplate.delete("lock");}}}
}
- 2.其中,加锁也可以使用循环:
// 加锁,获取锁失败重试
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "111")){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}
}
- 3.解锁:
// 释放锁
this.redisTemplate.delete("lock");
- 4.使用Jmeter压力测试如下:
40.2. 防死锁:
a.死锁及解决办法:
a1.问题:
- 1.setnx刚刚获取到锁,当前服务器宕机,导致del释放锁无法执行,进而导致锁无法锁无法释放(死锁)
a2.解决办法:
- 1.解决:给锁
设置过期时间
,自动释放锁。
a3.设置过期时间两种方式:
- 1.通过expire设置过期时间(缺乏原子性:
如果在setnx和expire之间出现异常,锁也无法释放
) - 2.使用set指令设置过期时间:
set key value ex 3 nx
(既达到setnx的效果,又设置了过期时间)
- 3.压力测试肯定也没有问题。
40.3. 使用UUID防误删
a.问题及解决办法
a1.问题:
- 1.可能会释放其他服务器的锁。
a2.场景:
- 1.如果业务逻辑的执行时间是7s。执行流程如下
- 1 index1业务逻辑没执行完,3秒后锁被自动释放。
- 2.index2获取到锁,执行业务逻辑,3秒后锁被自动释放。
- 3.index3获取到锁,执行业务逻辑
- 4.index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。最终等于没锁的情况。
a3.解决:
-
1.setnx获取锁时,设置一个指
定的唯一值(例如:uuid)
;释放前获取这个值,判断是否自己的锁
-
实现如下:
问题:删除操作缺乏原子性。
场景:
- index1执行删除时,查询到的lock值确实和uuid相等
- index1执行删除前,lock刚好过期时间已到,被redis自动释放
- index2获取了lock
- index1执行删除,此时会把index2的lock删除
- 解决方案:没有一个命令可以同时做到判断 + 删除,
所有只能通过其他方式实现(**LUA脚本**)
40.6. 可重入锁
a.可重入锁使用场景:
- 1.由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加锁,将会加锁失败。
- 2.当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下行。
- 3.用一段 Java 代码解释可重入:
public synchronized void a() {b();
}public synchronized void b() {// pass
}
- 4.假设 X 线程在 a 方法获取锁之后,继续执行 b 方法,如果此时不可重入,线程就必须等待锁释放,再次争抢锁。锁明明是被 X 线程拥有,却还需要等待自己释放锁,然后再去抢锁,这看起来就很奇怪,我释放我自己
- 5.可重入性就可以解决这个尴尬的问题,当线程拥有锁之后,往后再遇到加锁方法,直接将加锁次数加 1,然后再执行方法逻辑。退出加锁方法之后,加锁次数再减 1,当加锁次数为 0 时,锁才被真正的释放。
- 6.可以看到可重入锁最大特性就是计数,计算加锁的次数。所以当可重入锁需要在分布式环境实现时,我们也就需要统计加锁次数。
- 7.解决方案:redis + Hash
b.锁操作:
b1. 加锁脚本
- 1.Redis 提供了 Hash (哈希表)这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的锁的重入次数,然后利用 lua 脚本判断逻辑。
if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1)
thenredis.call('hincrby', KEYS[1], ARGV[1], 1);redis.call('expire', KEYS[1], ARGV[2]);return 1;
elsereturn 0;
end
- 2.假设值为:KEYS:[lock], ARGV[uuid, expire]
- 3.如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次数加1。
b2. 解锁脚本
-- 判断 hash set 可重入 key 的值是否等于 0
-- 如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
-- 如果为 0 代表 可重入次数被减 1
-- 如果为 1 代表 该可重入 key 解锁成功
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then return nil;
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then return 0;
else redis.call('del', KEYS[1]); return 1;
end;
b3. 代码实现
- 1.由于加解锁代码量相对较多,这里可以封装成一个工具类:
- DistributedLockClient工厂类具体实现:
@Component
public class DistributedLockClient {@Autowiredprivate StringRedisTemplate redisTemplate;private String uuid;public DistributedLockClient() {this.uuid = UUID.randomUUID().toString();}public DistributedRedisLock getRedisLock(String lockName){return new DistributedRedisLock(redisTemplate, lockName, uuid);}
}
- DistributedRedisLock实现如下:
public class DistributedRedisLock implements Lock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private long expire = 30;public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = uuid;}@Overridepublic void lock() {this.tryLock();}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {try {return this.tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}/*** 加锁方法* @param time* @param unit* @return* @throws InterruptedException*/@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time != -1){this.expire = unit.toSeconds(time);}String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +" redis.call('hincrby', KEYS[1], ARGV[1], 1) " +" redis.call('expire', KEYS[1], ARGV[2]) " +" return 1 " +"else " +" return 0 " +"end";while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), getId(), String.valueOf(expire))){Thread.sleep(50);}return true;}/*** 解锁方法*/@Overridepublic void unlock() {String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +"then " +" return nil " +"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +"then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end";Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), getId());if (flag == null){throw new IllegalMonitorStateException("this lock doesn't belong to you!");}}@Overridepublic Condition newCondition() {return null;}/*** 给线程拼接唯一标识* @return*/String getId(){return uuid + ":" + Thread.currentThread().getId();}
}
b4. 使用及测试
- 1.在业务代码中使用:
public void deduct() {DistributedRedisLock redisLock = this.distributedLockClient.getRedisLock("lock");redisLock.lock();try {// 1. 查询库存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判断库存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣减库存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {redisLock.unlock();}
}
- 2.测试:
- 3.测试可重入性:
40.7. 自动续期
- 1.lua脚本:
if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then redis.call('expire', KEYS[1], ARGV[2]); return 1;
else return 0;
end
- 2.在RedisDistributeLock中添加renewExpire方法:
public class DistributedRedisLock implements Lock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private long expire = 30;public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = uuid + ":" + Thread.currentThread().getId();}@Overridepublic void lock() {this.tryLock();}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {try {return this.tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}/*** 加锁方法* @param time* @param unit* @return* @throws InterruptedException*/@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time != -1){this.expire = unit.toSeconds(time);}String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +" redis.call('hincrby', KEYS[1], ARGV[1], 1) " +" redis.call('expire', KEYS[1], ARGV[2]) " +" return 1 " +"else " +" return 0 " +"end";while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))){Thread.sleep(50);}// 加锁成功,返回之前,开启定时器自动续期this.renewExpire();return true;}/*** 解锁方法*/@Overridepublic void unlock() {String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +"then " +" return nil " +"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +"then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end";Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid);if (flag == null){throw new IllegalMonitorStateException("this lock doesn't belong to you!");}}@Overridepublic Condition newCondition() {return null;}// String getId(){// return this.uuid + ":" + Thread.currentThread().getId();// }private void renewExpire(){String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +" return redis.call('expire', KEYS[1], ARGV[2]) " +"else " +" return 0 " +"end";new Timer().schedule(new TimerTask() {@Overridepublic void run() {if (redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {renewExpire();}}}, this.expire * 1000 / 3);}
}
- 3.在tryLock方法中使用:
- 4.构造方法作如下修改:
- 5.解锁方法作如下修改:
40.8. 手写分步式锁小结
a.特征:
- 1.独占排他:setnx
- 2.防死锁:
- redis客户端程序获取到锁之后,立马宕机。给锁添加过期时间
- 不可重入:可重入
- 3.防误删:先判断是否自己的锁才能删除
- 4.原子性:
- 加锁和过期时间之间:set k v ex 3 nx
- 判断和释放锁之间:lua脚本
- 5.可重入性:hash(key field value) + lua脚本
- 6.自动续期:Timer定时器 + lua脚本
- 7.在集群情况下,导致锁机制失效:
- 客户端程序10010,从主中获取锁
- 从还没来得及同步数据,主挂了
- 于是从升级为主
- 客户端程序10086就从新主中获取到锁,导致锁机制失效
b.锁操作:
b1.加锁:
- 1.setnx:独占排他 死锁、不可重入、原子性
- 2.set k v ex 30 nx:独占排他、死锁 不可重入
- 3.hash + lua脚本:可重入锁
- 判断锁是否被占用(exists),如果没有被占用则直接获取锁(hset/hincrby)并设置过期时间(expire)
- 如果锁被占用,则判断是否当前线程占用的(hexists),如果是则重入(hincrby)并重置过期时间(expire)
- 否则获取锁失败,将来代码中重试
- 4.Timer定时器 + lua脚本:实现锁的自动续期
- 判断锁是否自己的锁(hexists == 1),如果是自己的锁则执行expire重置过期时间
b2.解锁
- 1.del:导致误删
- 2.先判断再删除同时保证原子性:lua脚本
- 3.hash + lua脚本:可重入
- 判断当前线程的锁是否存在,不存在则返回nil,将来抛出异常
- 存在则直接减1(hincrby -1),判断减1后的值是否为0,为0则释放锁(del),并返回1
- 不为0,则返回0
b3.重试:
- 1.递归 循环
40.9. 红锁算法
a.redis集群状态下的问题:
- 客户端A从master获取到锁
- 在master将锁同步到slave之前,master宕掉了。
- slave节点被晋级为master节点
- 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。
b.安全失效:
b1.概述:
- 1.解决集群下锁失效,参照redis官方网站针对redlock文档:https://redis.io/topics/distlock
- 2.在算法的分布式版本中,我们假设有N个Redis服务器。这些节点是完全独立的,因此我们不使用复制或任何其他隐式协调系统。**前几节已经描述了如何在单个实例中安全地获取和释放锁,在分布式锁算法中,将使用相同的方法在单个实例中获取和释放锁。**将N设置为5是一个合理的值,因此需要在不同的计算机或虚拟机上运行5个Redis主服务器,确保它们以独立的方式发生故障。
b2.为了获取锁,客户端执行以下操作:
- 1.客户端以毫秒为单位获取当前时间的时间戳,作为起始时间。
- 2.客户端尝试在所有N个实例中顺序使用相同的键名、相同的随机值来获取锁定。每个实例尝试获取锁都需要时间,客户端应该设置一个远小于总锁定时间的超时时间。例如,如果自动释放时间为10秒,则尝试获取锁的超时时间可能在5到50毫秒之间。这样可以防止客户端长时间与处于故障状态的Redis节点进行通信:如果某个实例不可用,尽快尝试与下一个实例进行通信。
- 3.客户端获取当前时间 减去在步骤1中获得的起始时间,来计算获取锁所花费的时间。当且仅当客户端能够在大多数实例(至少3个)中获取锁时,并且获取锁所花费的总时间小于锁有效时间,则认为已获取锁。
- 4.如果获取了锁,则将锁有效时间减去 获取锁所花费的时间,如步骤3中所计算。
- 5.如果客户端由于某种原因(无法锁定N / 2 + 1个实例或有效时间为负)而未能获得该锁,它将尝试解锁所有实例(即使没有锁定成功的实例)
b3.总结:
- 1.每台计算机都有一个本地时钟,我们通常可以依靠不同的计算机来产生很小的时钟漂移。只有在拥有锁的客户端将在锁有效时间内(如步骤3中获得的)减去一段时间(仅几毫秒)的情况下终止工作,才能保证这一点。以补偿进程之间的时钟漂移
- 2.当客户端无法获取锁时,它应该在随机延迟后重试,以避免同时获取同一资源的多个客户端之间不同步(这可能会导致脑裂的情况:没人胜)。同样,客户端在大多数Redis实例中尝试获取锁的速度越快,出现裂脑情况(以及需要重试)的窗口就越小,因此理想情况下,客户端应尝试将SET命令发送到N个实例同时使用多路复用。
- 3.值得强调的是,对于未能获得大多数锁的客户端,尽快释放(部分)获得的锁有多么重要,这样就不必等待锁定期满才能再次获得锁(但是,如果发生了网络分区,并且客户端不再能够与Redis实例进行通信,则在等待密钥到期时需要付出可用性损失)。