当前位置: 首页 > news >正文

从零搭建基于SpringBoot的秒杀系统(八):通过分布式锁解决多线程导致的问题

在前面一篇博客中,通过mysql的优化解决了超卖的问题,但是没有解决同一用户有几率多次购买的问题,这个问题可以通过加锁来解决,解决思路在于一个请求想要购买时需要先获得分布式锁,如果得不到锁就等待。

(一)使用redis实现分布式锁

在config下新建RedisConfig,用来写redis的通用配置文件:

public class RedisConfig {

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Bean
    private RedisTemplate<String,Object> redisTemplate(){
        RedisTemplate<String,Object> redisTemplate=new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        //设置redis key、value的默认序列化规则
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        return redisTemplate;
    }

    @Bean
    public StringRedisTemplate stringRedisTemplate(){
        StringRedisTemplate stringRedisTemplate=new StringRedisTemplate();
        stringRedisTemplate.setConnectionFactory(redisConnectionFactory);
        return stringRedisTemplate;
    }
}

对于redis的入门,可以看我往期的redis教程https://blog.csdn.net/qq_41973594/category_9831988.html

对于redis更加深入的理解,我将在后续博客中发布。

配置完成之后,接下来就可以在代码中直接使用redis,在KillServiceImpl中增加KillItemV3版本,借助redis的原子操作实现分布式锁

@Autowired
private StringRedisTemplate stringRedisTemplate;
//redis分布式锁
public Boolean KillItemV3(Integer killId, Integer userId) throws Exception {

    //借助Redis的原子操作实现分布式锁
    ValueOperations valueOperations=stringRedisTemplate.opsForValue();
    //设置redis的key,key由killid和userid组成
    final String key=new StringBuffer().append(killId).append(userId).toString();
    //设置redis的value
    final String value= String.valueOf(snowFlake.nextId());
    //尝试获取锁
    Boolean result = valueOperations.setIfAbsent(key, value);
    //如果获取到锁才会进行后面的操作
    if (result){
        stringRedisTemplate.expire(key,30, TimeUnit.SECONDS);
        //判断当前用户是否抢购过该商品
        if (itemKillSuccessMapper.countByKillUserId(killId,userId)<=0){
            //获取商品详情
            ItemKill itemKill=itemKillMapper.selectByidV2(killId);
            if (itemKill!=null&&itemKill.getCanKill()==1 && itemKill.getTotal()>0){
                int res=itemKillMapper.updateKillItemV2(killId);
                if (res>0){
                    commonRecordKillSuccessInfo(itemKill,userId);
                    return true;
                }
            }
        }else {
            System.out.println("您已经抢购过该商品");
        }
        //释放锁
        if (value.equals(valueOperations.get(key).toString())){
            stringRedisTemplate.delete(key);
        }
    }
    return false;
}

相比之前的版本,这个版本想要抢购商品需要先获取redis的分布式锁。

在KillService中增加接口代码:

Boolean KillItemV3(Integer killId,Integer userId) throws Exception;

在KillController中增加redis分布式锁版本的代码:

//redis分布式锁版本
@RequestMapping(value = prefix+"/test/execute3",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
@ResponseBody
public BaseResponse testexecute3(@RequestBody @Validated KillDto killDto, BindingResult result, HttpSession httpSession){
    if (result.hasErrors()||killDto.getKillid()<0){
        return new BaseResponse(StatusCode.InvalidParam);
    }
    try {
        Boolean res=killService.KillItemV3(killDto.getKillid(),killDto.getUserid());
        if (!res){
            return new BaseResponse(StatusCode.Fail.getCode(),"商品已经抢购完或您已抢购过该商品");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    BaseResponse baseResponse=new BaseResponse(StatusCode.Success);
    baseResponse.setData("抢购成功");
    return baseResponse;
}

在本地运行redis-server,redis的安装包可以前往官网下载,或者在我的公众号《Java鱼仔》中回复 秒杀系统 获取。运行redis-server

修改Http请求的路径为/kill/test/execute3

运行jmeter,发现没有产生超卖的情况,并且每个用户确实只买到了一件商品。

(二)基于Redisson的分布式锁实现

只用redis实现分布式锁有一个问题,如果不释放锁,这个锁就会一直锁住。解决办法就是给这个锁设置一个时间,并且这个设置时间和设置锁需要是原子操作。可以使用lua脚本保证原子性,但是我们更多地是使用基于Redis的第三方库来实现,该项目用到了Redisson。

在config目录下新建RedissonConfig

@Configuration
public class RedissonConfig {
    @Autowired
    private Environment environment;

    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        config.useSingleServer()
                .setAddress(environment.getProperty("redis.config.host"));
//                .setPassword(environment.getProperty("spring.redis.password"));
        RedissonClient client= Redisson.create(config);
        return client;
    }
}

相关的配置放在application.properties中

#redis
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.password=
redis.config.host=redis://127.0.0.1:6379

在killServiceImpl中新建KillItemV4版本

@Autowired
private RedissonClient redissonClient;
//redisson的分布式锁
@Override
public Boolean KillItemV4(Integer killId, Integer userId) throws Exception {
    Boolean result=false;
    final String key=new StringBuffer().append(killId).append(userId).toString();
    RLock lock=redissonClient.getLock(key);
    //三个参数、等待时间、锁过期时间、时间单位
    Boolean cacheres=lock.tryLock(30,10,TimeUnit.SECONDS);
    if (cacheres){
        //判断当前用户是否抢购过该商品
        if (itemKillSuccessMapper.countByKillUserId(killId,userId)<=0){
            //获取商品详情
            ItemKill itemKill=itemKillMapper.selectByidV2(killId);
            if (itemKill!=null&&itemKill.getCanKill()==1 && itemKill.getTotal()>0){
                int res=itemKillMapper.updateKillItemV2(killId);
                if (res>0){
                    commonRecordKillSuccessInfo(itemKill,userId);
                    result=true;
                }
            }
        }else {
            System.out.println("您已经抢购过该商品");
        }
        lock.unlock();
    }
    return result;
}

与redis的区别在于加锁的过程中设置锁的等待时间和过期时间,在KillService中将KillItemV4的代码加上:

Boolean KillItemV4(Integer killId,Integer userId) throws Exception;

最后在KillController中添加以下接口代码:

//redission分布式锁版本
@RequestMapping(value = prefix+"/test/execute4",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
@ResponseBody
public BaseResponse testexecute4(@RequestBody @Validated KillDto killDto, BindingResult result, HttpSession httpSession){
    if (result.hasErrors()||killDto.getKillid()<0){
        return new BaseResponse(StatusCode.InvalidParam);
    }
    try {
        Boolean res=killService.KillItemV4(killDto.getKillid(),killDto.getUserid());
        if (!res){
            return new BaseResponse(StatusCode.Fail.getCode(),"商品已经抢购完或您已抢购过该商品");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    BaseResponse baseResponse=new BaseResponse(StatusCode.Success);
    baseResponse.setData("抢购成功");
    return baseResponse;
}

运行redis后再次使用jmeter,达到预期不超卖,不多卖的效果。

(三)基于zookeeper的分布式锁

关于zookeeper的内容我会在后续的博客中跟进,这里主要用到zookeeper的进程互斥锁InterProcessMutex,Zookeeper利用path创建临时顺序节点,实现公平锁

// 最常用
public InterProcessMutex(CuratorFramework client, String path){
    // Zookeeper利用path创建临时顺序节点,实现公平锁
    this(client, path, new StandardLockInternalsDriver());
}

首先还是写zookeeper的配置文件,配置文件主要配置Zookeeper的连接地址,命名空间等:

@Configuration
public class ZookeeperConfig {
    @Autowired
    private Environment environment;

    @Bean
    public CuratorFramework curatorFramework(){
        CuratorFramework curatorFramework= CuratorFrameworkFactory.builder()
                .connectString(environment.getProperty("zk.host"))
                .namespace(environment.getProperty("zk.namespace" ))
                .retryPolicy(new RetryNTimes(5,1000))
                .build();
        curatorFramework.start();
        return curatorFramework;
    }
}

application.peoperties配置文件中添加zookeeper相关配置

#zookeeper
zk.host=127.0.0.1:2181
zk.namespace=kill

KillService中添加ItemKillV5版本

Boolean KillItemV5(Integer killId,Integer userId) throws Exception;

接下来在KillServiceImpl中添加Zookeeper分布式锁的相关代码,通过InterProcessMutex按path+killId+userId+"-lock"的路径名加锁,每次调用抢购代码时都需要先acquire尝试获取。超时时间设定为10s

@Autowired
private CuratorFramework curatorFramework;
private final String path="/seckill/zookeeperlock/";
//zookeeper的分布式锁
@Override
public Boolean KillItemV5(Integer killId, Integer userId) throws Exception{
    Boolean result=false;
    InterProcessMutex mutex=new InterProcessMutex(curatorFramework,path+killId+userId+"-lock");
    if (mutex.acquire(10L,TimeUnit.SECONDS)){
        //判断当前用户是否抢购过该商品
        if (itemKillSuccessMapper.countByKillUserId(killId,userId)<=0){
            //获取商品详情
            ItemKill itemKill=itemKillMapper.selectByidV2(killId);
            if (itemKill!=null&&itemKill.getCanKill()==1 && itemKill.getTotal()>0){
                int res=itemKillMapper.updateKillItemV2(killId);
                if (res>0){
                    commonRecordKillSuccessInfo(itemKill,userId);
                    result=true;
                }
            }
        }else {
            System.out.println("您已经抢购过该商品");
        }
        if (mutex!=null){
            mutex.release();
        }
    }
    return result;
}

在KillController中添加zookeeper相关的抢购代码:

//zookeeper分布式锁版本
@RequestMapping(value = prefix+"/test/execute5",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
@ResponseBody
public BaseResponse testexecute5(@RequestBody @Validated KillDto killDto, BindingResult result, HttpSession httpSession){
    if (result.hasErrors()||killDto.getKillid()<0){
        return new BaseResponse(StatusCode.InvalidParam);
    }
    try {
        Boolean res=killService.KillItemV5(killDto.getKillid(),killDto.getUserid());
        if (!res){
            return new BaseResponse(StatusCode.Fail.getCode(),"商品已经抢购完或您已抢购过该商品");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    BaseResponse baseResponse=new BaseResponse(StatusCode.Success);
    baseResponse.setData("抢购成功");
    return baseResponse;
}

zookeeper的安装包可以在官网下载,或者在我的公众号《Java鱼仔》中回复 秒杀系统 获取。在Zookeeper的bin目录下运行zkServer,获取到zookeeper的运行信息:

因为已经配置了redis信息,所以还需要启动redis,接着启动系统,通过jmeter进行压力测试

测试后结果即没有超卖,一个人也只能有一个订单。

(四)总结

我们通过三种方式的分布式锁解决了多线程情况下导致实际情况和业务逻辑不通的情况,到这里为止,这个秒杀系统就算粗略的完成了。当然后续还可以有更多的优化,比如添加购物车等功能模块,页面可以美化,在这里redis只用作了分布式锁的功能,还可以热点抢购数据放入redis中。rabbitmq除了异步外,还具有限流、削峰的作用。

最后放上整体系统的代码:https://github.com/OliverLiy/SecondKill

这个系列博客中所有的工具均可在公众号《Java鱼仔》中回复 秒杀系统 获取。

相关文章:

  • 读《世界是数字的》有感
  • 面试官问我:什么是静态代理?什么是动态代理?注解、反射你会吗?
  • redis入门到精通系列(十):springboot集成redis及redis工具类的编写
  • css3延时动画
  • redis入门到精通系列(十一):redis的缓存穿透、缓存击穿以及缓存雪崩详解
  • 子数组最大值设计02
  • redis入门到精通系列(十二):看完这一篇文章别再说不懂布隆过滤器
  • 如何用SpringBoot(2.3.3版本)快速搭建一个项目?文末有小彩蛋
  • Linux上find命令详解
  • 一步步带你看SpringBoot(2.3.3版本)自动装配原理
  • CCF系列之I’m stuck!(201312-5)
  • SpringBoot配置文件及自动配置原理详解,这应该是SpringBoot最大的优势了吧
  • SpringBoot整合jdbc、durid、mybatis详解,数据库的连接就是这么简单
  • Git学习笔记(一)--- Git的安装与配置
  • SpringBoot整合SpringSecurity详解,认证授权从未如此简单
  • 【译】React性能工程(下) -- 深入研究React性能调试
  • AngularJS指令开发(1)——参数详解
  • CentOS7简单部署NFS
  • JAVA SE 6 GC调优笔记
  • Java 多线程编程之:notify 和 wait 用法
  • js正则,这点儿就够用了
  • Python 使用 Tornado 框架实现 WebHook 自动部署 Git 项目
  • Redux 中间件分析
  • TiDB 源码阅读系列文章(十)Chunk 和执行框架简介
  • 猴子数据域名防封接口降低小说被封的风险
  • 基于Volley网络库实现加载多种网络图片(包括GIF动态图片、圆形图片、普通图片)...
  • 简析gRPC client 连接管理
  • 如何利用MongoDB打造TOP榜小程序
  • 使用agvtool更改app version/build
  • 算法-图和图算法
  • 译自由幺半群
  • 看到一个关于网页设计的文章分享过来!大家看看!
  • 宾利慕尚创始人典藏版国内首秀,2025年前实现全系车型电动化 | 2019上海车展 ...
  • ​猴子吃桃问题:每天都吃了前一天剩下的一半多一个。
  • #Linux(权限管理)
  • (BFS)hdoj2377-Bus Pass
  • (十二)springboot实战——SSE服务推送事件案例实现
  • (详细版)Vary: Scaling up the Vision Vocabulary for Large Vision-Language Models
  • (一)基于IDEA的JAVA基础1
  • (原創) X61用戶,小心你的上蓋!! (NB) (ThinkPad) (X61)
  • (转)LINQ之路
  • (转)人的集合论——移山之道
  • (转)真正的中国天气api接口xml,json(求加精) ...
  • **登录+JWT+异常处理+拦截器+ThreadLocal-开发思想与代码实现**
  • .NET 5种线程安全集合
  • .NET 线程 Thread 进程 Process、线程池 pool、Invoke、begininvoke、异步回调
  • .net 验证控件和javaScript的冲突问题
  • .NET设计模式(11):组合模式(Composite Pattern)
  • .NET正则基础之——正则委托
  • /*在DataTable中更新、删除数据*/
  • @Not - Empty-Null-Blank
  • @property python知乎_Python3基础之:property
  • @RunWith注解作用
  • @SentinelResource详解
  • [145] 二叉树的后序遍历 js