当前位置: 首页 > news >正文

SpringBoot项目中如何使用Redisson队列详解

一、SpringBoot配置Redisson

1.1 引入依赖

<!--Redisson延迟队列-->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.1</version>
</dependency>

1.2 代码配置

@Data
@Component
@RefreshScope
@ConfigurationProperties("spring.redis")
public class RedisConfigProperties {String host;String password;Cluster cluster;
}@Data
class Cluster {Boolean enable;List<String> nodes;
}
@Component
@Configuration
@RequiredArgsConstructor
public class RedissonConfig {private final RedisConfigProperties redisConfig;@Beanpublic RedissonClient redissonClient() {Config config = new Config();if (redisConfig.getCluster() != null && Boolean.TRUE.equals(redisConfig.getCluster().getEnable())) {ClusterServersConfig clusterServersConfig = config.useClusterServers();for(String node : redisConfig.getCluster().getNodes()) {clusterServersConfig.addNodeAddress("redis://" + node);}if (StrUtil.isNotBlank(redisConfig.getPassword())){clusterServersConfig.setPassword(redisConfig.getPassword());}} else {SingleServerConfig serverConfig = config.useSingleServer();serverConfig.setAddress("redis://"+redisConfig.getHost()+":6379");System.out.println("============================================================");System.out.println("redisson设置的地址为:" + "redis://"+redisConfig.getHost()+":6379");System.out.println("============================================================");if (StrUtil.isNotBlank(redisConfig.getPassword())){serverConfig.setPassword(redisConfig.getPassword());}}return Redisson.create(config);}
}

1.3 application.yml中配置

spring:redis:host: 127.0.0.1

二、延时队列具体使用

2.1 编写一个工具类RedisDelayQueueUtil

/*** @Description: redission延迟队列工具类*/
@Slf4j
@Component
@RefreshScope
public class RedisDelayQueueUtil {// day代表单位是天,minutes代表单位是分钟(也可以是秒seconds, 但这个不在下面代码示例处理)@Value("${spring.mode}")private String mode;@Autowiredprivate RedissonClient redissonClient;/*** 添加延迟队列* @param queueCode 队列键* @param value 队列值* @param delay 延迟时间* @param <T>*/public <T> void addDelayQueue(String queueCode, String value, long delay) {try {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);delayedQueue.offer(value, delay, "day".equals(testMode) ? TimeUnit.MINUTES : TimeUnit.DAYS);log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, delay + "天");} catch (Exception e) {throw new RuntimeException("(添加延时队列失败)");}}/*** 删除延迟队列* @param queueCode 队列键* @param value 队列值* @param <T>*/public <T> void removeDelayQueue(String queueCode, String value){try {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);delayedQueue.remove(value);log.info("(删除延时队列成功) 队列键:{},队列值:{}", queueCode, value);} catch (Exception e) {throw new RuntimeException("(删除延时队列失败)");}}/*** 获取延迟队列* @param queueCode 队列键* @param <T>* @return* @throws InterruptedException*/public <T> T getDelayQueue(String queueCode) throws InterruptedException {RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode);redissonClient.getDelayedQueue(blockingDeque);T value  = (T) blockingDeque.take();return value;}/*** @param 移除延时队列全部任务* @param code* @param task*/public void removeTask(String code, String value) {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(code);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);List<String> c = new ArrayList<>();c.add(value);delayedQueue.removeAll(c);}}

2.2 在application.yml中配置时间单位

spring:mode: day

2.3 延迟队列枚举类

@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum RedisDelayQueueEnum {// 这里可以配置多个枚举项,每个枚举项对应一个实现类OVER_TIME("OVER_TIME", "超时触发", "overTimeImpl"),;// 延迟队列 Redis Keyprivate String code;// 中文描述private String name;/*** 延迟队列具体业务实现的 Bean* 可通过 Spring 的上下文获取*/private String beanId;}

2.4 延迟队列枚举类中配置的实现类

@Slf4j
@Component
public class ActOverTimeImpl implements RedisQueueHandle<String> {@Autowired@LazyTestService testService;/*** 任务超时,监听* 可以在里面调用service的代码去处理具体的业务逻辑* @param value*/@Overridepublic void execute(String value) {log.info("接收到延迟任务【超时提醒】:{}", value);testService.dealDelayQueueExpire(value);}
}

2.5 项目启动时使用其它线程控制全部延时队列

@Slf4j
@Component
@AllArgsConstructor
public class AppStartRunner implements ApplicationRunner {private final RedisDelayQueueUtil redisDelayQueueUtil;private final RedissonClient redissonClient;@Override@Order(value = 1)public void run(ApplicationArguments args) {log.info("服务启动了!");RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();for (RedisDelayQueueEnum queueEnum : queueEnums) {new Thread(() -> {while (true) {try {Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());if (value != null) {RedisQueueHandle redisQueueHandle = SpringContextHolder.getBean(queueEnum.getBeanId());redisQueueHandle.execute(value);}} catch (Exception e) {log.error("(Redis延迟队列异常中断) {}", e.getMessage());}}}).start();}log.info("(Redis延迟队列启动成功)");}}

除了希望在一定时间之后触发某些任务外,平时还会有一些资源消耗比较大的任务,如果接口直接对外暴露,多人同时调用时有可能造成系统变慢甚至直接宕机。

在不改变系统配置,不升级系统硬件的情况下,我们可以将这种任务放到一个对列当中排队执行。

三、普通阻塞队列的使用

3.1 管理普阻塞通队列枚举类

@Getter
@AllArgsConstructor
public enum RedisBlockingQueueEnum {CONSUME_RESOURCES_TASK("CONSUME_RESOURCES_TASK", "消耗资源的任务", "consume resourcesImpl"),;private final String code;private final String name;/*** 延迟队列具体业务实现的 Bean* 可通过 Spring 的上下文获取*/private final String beanId;
}

3.2 使用一个RedisBlockingQueueOperator去统一管理添加阻塞队列

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisBlockingQueueOperator {private final RedissonClient redissonClient;public void addConsumeResourcesTaskQueue(Long userId, Long tenantId) {JSONObject jsonObject = new JSONObject();jsonObject.put("userId", userId);jsonObject.put("tenantId", tenantId);RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RedisBlockingQueueEnum.CONSUME_RESOURCES_TASK.getCode());queue.offer(jsonObject.toJSONString());}
}

3.3 Controller中将请求加入阻塞队列

@RestController
@RequiredArgsConstructor
@RequestMapping("/test")
@Tag(description = "test", name = "测试Controller")
@SecurityRequirement(name = HttpHeaders.AUTHORIZATION)
public class TestController {private final RedisBlockingQueueOperator queueOperator;@PostMapping("/consume_resource_task")public R consumeResourcesTask() throws Exception{queueOperator.addCombinedReleaseQueue(1 TenantContextHolder.getTenantId());return R.ok("success!");}}

3.4 编写实现类

@Slf4j
@Component
public class ConsumeResourcesTaskImpl implements RedisQueueHandle<String> {@Autowired@LazyTestService testService;@Overridepublic void execute(String value) throws Exception {JSONObject jsonObject = JSON.parseObject(value);log.info("延迟队列触发【处理耗时任务】:{}", value);testService.dealConsumeResourcesTask(value);}
}

3.5 项目启动时使用其它线程控制全部普通阻塞队列

@Slf4j
@Component
@AllArgsConstructor
public class AppStartRunner implements ApplicationRunner {private final RedissonClient redissonClient;@Override@Order(value = 1)public void run(ApplicationArguments args) {log.info("服务启动了!");for (RedisBlockingQueueEnum queueEnum : RedisBlockingQueueEnum.values()) {new Thread(() -> {RBlockingQueue<String> queue = redissonClient.getBlockingQueue(queueEnum.getCode());while (true) {try {String value = queue.take();if (value == null) continue;RedisQueueHandle redisQueueHandle = SpringContextHolder.getBean(queueEnum.getBeanId());redisQueueHandle.execute(value);} catch (Exception e) {log.error("(Redis阻塞队列异常中断) {}", e.getMessage());}}}).start();}log.info("(Redis Blocking Queue 启动成功)");}}

相关文章:

  • 机器字长与操作系统的关系
  • 每天一个数据分析题(三百四十三)
  • 先导微型数控桌面式加工中心
  • 如何 使用Cubemax配置串口1.5得停止位
  • Vue2中的计算属性(computed)和监听属性(watch)
  • 【因果推断python】6_图因果模型
  • 释放视频潜力:Topaz Video AI for mac/win 一款全新的视频增强与修复利器
  • ROS2在RVIZ2中加载机器人urdf模型
  • 计算属性与监听属性
  • 恒创科技:无法与服务器建立安全连接怎么解决?
  • 国内常用的编程博客网址:技术资源与学习平台
  • Stable Diffusion|插件安装基础教程
  • 前端角色负责人岗
  • 5、css3 自动动画渐变背景
  • 网络安全岗秋招面试题及面试经验分享
  • [PHP内核探索]PHP中的哈希表
  • 2017届校招提前批面试回顾
  • Bytom交易说明(账户管理模式)
  • CSS进阶篇--用CSS开启硬件加速来提高网站性能
  • hadoop入门学习教程--DKHadoop完整安装步骤
  • JavaScript/HTML5图表开发工具JavaScript Charts v3.19.6发布【附下载】
  • Java知识点总结(JDBC-连接步骤及CRUD)
  • JDK9: 集成 Jshell 和 Maven 项目.
  • Js基础知识(四) - js运行原理与机制
  • Making An Indicator With Pure CSS
  • maven工程打包jar以及java jar命令的classpath使用
  • Spark in action on Kubernetes - Playground搭建与架构浅析
  • Tornado学习笔记(1)
  • 成为一名优秀的Developer的书单
  • 飞驰在Mesos的涡轮引擎上
  • 前端知识点整理(待续)
  • 如何优雅的使用vue+Dcloud(Hbuild)开发混合app
  • 如何在招聘中考核.NET架构师
  • ​ArcGIS Pro 如何批量删除字段
  • # 再次尝试 连接失败_无线WiFi无法连接到网络怎么办【解决方法】
  • #define与typedef区别
  • (二)学习JVM —— 垃圾回收机制
  • (附源码)springboot 个人网页的网站 毕业设计031623
  • (附源码)计算机毕业设计SSM教师教学质量评价系统
  • (附源码)计算机毕业设计SSM疫情居家隔离服务系统
  • (考研湖科大教书匠计算机网络)第一章概述-第五节1:计算机网络体系结构之分层思想和举例
  • (力扣记录)235. 二叉搜索树的最近公共祖先
  • (算法)求1到1亿间的质数或素数
  • (译) 理解 Elixir 中的宏 Macro, 第四部分:深入化
  • (转)PlayerPrefs在Windows下存到哪里去了?
  • (转)Windows2003安全设置/维护
  • (转)清华学霸演讲稿:永远不要说你已经尽力了
  • ..回顾17,展望18
  • ./mysql.server: 没有那个文件或目录_Linux下安装MySQL出现“ls: /var/lib/mysql/*.pid: 没有那个文件或目录”...
  • .NET Core 2.1路线图
  • .NET Core MongoDB数据仓储和工作单元模式封装
  • .NET MAUI Sqlite数据库操作(二)异步初始化方法
  • .NET MAUI学习笔记——2.构建第一个程序_初级篇
  • .NET/C# 使用反射调用含 ref 或 out 参数的方法
  • .NET/C# 推荐一个我设计的缓存类型(适合缓存反射等耗性能的操作,附用法)