当前位置: 首页 > news >正文

RedissonClient 分布式队列工具类

注意:轻量级队列可以使用工具类,重量级数据量 请使用 MQ

本文章基于redis使用redisson客户端实现轻量级队列,以及代码、执行结果演示

一、常见队列了解

  1. 普通队列:先进先出(FIFO),只能在一端添加元素,在另一端移除元素。
  2. 循环队列:利用数组和取模运算实现队尾连接队首。
  3. 双端队列:两端都可以添加和移除元素。
  4. 优先级队列:根据元素的优先级顺序处理元素。
  5. 阻塞队列:在多线程中使用,队空时取元素会等待,队满时加元素会等待。
  6. 有限队列:队列长度固定,队满时新元素加入会导致队头元素自动移除。

二、工具类

基于redisson 实现的分布式工具类,copy即用

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class QueueUtils {private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);/*** 获取客户端实例*/public static RedissonClient getClient() {return CLIENT;}/*** 添加普通队列数据** @param queueName 队列名* @param data      数据*/public static <T> boolean addQueueObject(String queueName, T data) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);return queue.offer(data);}/*** 通用获取一个队列数据 没有数据返回 null(不支持延迟队列)** @param queueName 队列名*/public static <T> T getQueueObject(String queueName) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);return queue.poll();}/*** 通用删除队列数据(不支持延迟队列)*/public static <T> boolean removeQueueObject(String queueName, T data) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);return queue.remove(data);}/*** 通用销毁队列 所有阻塞监听 报错(不支持延迟队列)*/public static <T> boolean destroyQueue(String queueName) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);return queue.delete();}/*** 添加延迟队列数据 默认毫秒** @param queueName 队列名* @param data      数据* @param time      延迟时间*/public static <T> void addDelayedQueueObject(String queueName, T data, long time) {addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);}/*** 添加延迟队列数据** @param queueName 队列名* @param data      数据* @param time      延迟时间* @param timeUnit  单位*/public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);delayedQueue.offer(data, time, timeUnit);}/*** 获取一个延迟队列数据 没有数据返回 null** @param queueName 队列名*/public static <T> T getDelayedQueueObject(String queueName) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);return delayedQueue.poll();}/*** 删除延迟队列数据*/public static <T> boolean removeDelayedQueueObject(String queueName, T data) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);return delayedQueue.remove(data);}/*** 销毁延迟队列 所有阻塞监听 报错*/public static <T> void destroyDelayedQueue(String queueName) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);delayedQueue.destroy();}/*** 添加优先队列数据** @param queueName 队列名* @param data      数据*/public static <T> boolean addPriorityQueueObject(String queueName, T data) {RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);return priorityBlockingQueue.offer(data);}/*** 优先队列获取一个队列数据 没有数据返回 null(不支持延迟队列)** @param queueName 队列名*/public static <T> T getPriorityQueueObject(String queueName) {RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);return queue.poll();}/*** 优先队列删除队列数据(不支持延迟队列)*/public static <T> boolean removePriorityQueueObject(String queueName, T data) {RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);return queue.remove(data);}/*** 优先队列销毁队列 所有阻塞监听 报错(不支持延迟队列)*/public static <T> boolean destroyPriorityQueue(String queueName) {RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);return queue.delete();}/*** 尝试设置 有界队列 容量 用于限制数量** @param queueName 队列名* @param capacity  容量*/public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) {RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);return boundedBlockingQueue.trySetCapacity(capacity);}/*** 尝试设置 有界队列 容量 用于限制数量** @param queueName 队列名* @param capacity  容量* @param destroy   已存在是否销毁*/public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);if (boundedBlockingQueue.isExists() && destroy) {destroyQueue(queueName);}return boundedBlockingQueue.trySetCapacity(capacity);}/*** 添加有界队列数据** @param queueName 队列名* @param data      数据* @return 添加成功 true 已达到界限 false*/public static <T> boolean addBoundedQueueObject(String queueName, T data) {RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);return boundedBlockingQueue.offer(data);}/*** 有界队列获取一个队列数据 没有数据返回 null(不支持延迟队列)** @param queueName 队列名*/public static <T> T getBoundedQueueObject(String queueName) {RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);return queue.poll();}/*** 有界队列删除队列数据(不支持延迟队列)*/public static <T> boolean removeBoundedQueueObject(String queueName, T data) {RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);return queue.remove(data);}/*** 有界队列销毁队列 所有阻塞监听 报错(不支持延迟队列)*/public static <T> boolean destroyBoundedQueue(String queueName) {RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);return queue.delete();}/*** 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等)*/public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer, boolean isDelayed) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);if (isDelayed) {// 订阅延迟队列CLIENT.getDelayedQueue(queue);}queue.subscribeOnElements(consumer);}}

三、普通队列代码测试

3.1 添加进入队列

QueueUtils.addQueueObject 方法添加数据进入队列 test

@RestController
@SaIgnore
public class QueueTestController {@GetMapping("addQueue")public void addQueue() {TestDemo testDemo = new TestDemo();testDemo.setTestKey("testKey");testDemo.setCreateTime(new Date());QueueUtils.addQueueObject("test", testDemo);}
}

 redis中查询加入队列数据:

3.2 获取队列

获取上面添加的 test队列 数据

 @GetMapping("getQueue")public void getQueue() {TestDemo testDemo = new TestDemo();testDemo.setTestKey("testKey");testDemo.setCreateTime(new Date());Object test = QueueUtils.getQueueObject("test");Console.log("test->{}", test);}

按照先进先出的规则,创建时间最早一条被获取,剩下2条为后添加数据

3.3 删除数据

删除test队列数据

@GetMapping("removeQueue")public R<Void> removeQueue() throws ParseException {TestDemo testDemo = new TestDemo();SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");testDemo.setCreateTime(simpleDateFormat.parse("2024-09-04 10:40:53"));testDemo.setTestKey("testKey");boolean test = QueueUtils.removeQueueObject("test", testDemo);return R.ok(test ? "成功":"失败");}

 如上代码,删除时间为:2024-09-04 10:40:53 这条数据,剩下一条

 

3.4 销毁队列
 @GetMapping("destoryQueue")public R<Void> destoryQueue() throws ParseException {boolean test = QueueUtils.destroyQueue("test");return R.ok(test ? "成功":"失败");}

 如图销毁队列后,刷新,则提示键不存在

 

3.5 订阅队列

开启订阅队列:

  • 一般是在程序启动时候开启,比如使用  @PostConstruct 注解
  •  或者实现 ApplicationRunner  接口来实现
    @Component
    public class RuoYiSubcribeInitializer implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {QueueUtils.subscribeBlockingQueue("test",(Consumer<TestDemo>) testDemo ->{Console.log("testDemo->{}", testDemo);},false);}
    }

 @PostConstructpublic void subscribeQueue() {QueueUtils.subscribeBlockingQueue("test",(Consumer<TestDemo>) testDemo ->{Console.log("testDemo->{}", testDemo);},false);}

 每次执行添加操作时候,订阅队列都会获取到数据

订阅队列会监听队列数据,知道队列数据为空

2024-09-04 11:37:49 [XNIO-1 task-1] INFO  c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]开始请求 => URL[GET /addQueue],无参数
testDemo->TestDemo(id=null, deptId=null, userId=null, orderNum=null, testKey=testKey, value=null, version=null, delFlag=null)
2024-09-04 11:37:49 [XNIO-1 task-1] INFO  c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]结束请求 => URL[GET /addQueue],耗时:[15]毫秒
2024-09-04 11:38:12 [XNIO-1 task-1] INFO  c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]开始请求 => URL[GET /addQueue],无参数
2024-09-04 11:38:12 [XNIO-1 task-1] INFO  c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]结束请求 => URL[GET /addQueue],耗时:[13]毫秒
testDemo->TestDemo(id=null, deptId=null, userId=null, orderNum=null, testKey=testKey, value=null, version=null, delFlag=null)

四、有界队列代码测试

4.1设置队列容量

QueueUtils.trySetBoundedQueueCapacity("test", 5);  设置队列容量

 @GetMapping("addBoundedQueue")public void addBoundedQueue() {//销毁队列QueueUtils.destroyQueue("test");//设置队列容量QueueUtils.trySetBoundedQueueCapacity("test", 5);}

 如下图bps,则是记录容量

4.2 添加队列
   @GetMapping("addBounded")public R<Void> addBounded() {//设置队列容量boolean test = QueueUtils.addBoundedQueueObject("test", "vlue111");return R.ok(test ? "成功":"失败");}
  •  如果未设置容量,添加失败
  • 超出容量,添加也会失败

4.3获取数据

获取队列数据,会同时改变容量大小

getBoundedQueueObject,会正确计算容量的大小。
getQueueObject 获取导数据,容量会为0.后面无法添加
 @GetMapping("getBounded")public R<Void> getBounded() {//设置队列容量Object test = QueueUtils.getBoundedQueueObject("test");return R.ok(test.toString());}

 底层逻辑,如果取出一个数据,容量则会加 1


{"code": 200,"msg": "vlue111","data": null
}
<Response body is empty>Response code: 200 (OK); Time: 26ms (26 ms); Content length: 40 bytes (40 B)

五、延时队列代码测试

5.1 延时队列数据流转流程

延时队列数据到期后会存入到普通队列,如下图流程:


+-------------------+
| 添加任务到        |
| 延时队列          |---------------------------------------
+-------------------+   |                                    |v                                    v  
+-------------------+
| 定时检查到期      |                                        
| 任务              |                                     获取数据
+-------------------+|                                                                        v                                    |                                    
+-------------------+
| 延时队列          |---------------------------------------
|  -> 普通队列      |
+------------------->
所以拿数据是从延时队列拿数据,还是从普通队列拿数据,考虑下业务场景
5.2 脚本的实现过程: 

简单了解地底层:

  • struct.pack('dLc0', tonumber(ARGV[1]), string.len(ARGV[2]), ARGV[2]):将过期时间、对象长度和对象本身打包成一个二进制字符串,便于在 Redis 中存储。
  • redis.call('zadd', KEYS[2], ARGV[1], value):将打包后的值 value 添加到有序集合(延时队列)中,其中 ARGV[1] 是过期时间。
  • redis.call('rpush', KEYS[3], value):将打包后的值 value 添加到列表(待处理队列)中。
  • local v = redis.call('zrange', KEYS[2], 0, 0):获取有序集合的第一个元素。
  • if v[1] == value then redis.call('publish', KEYS[4], ARGV[1]) end:如果添加的新元素是有序集合的第一个元素,则通过 Redis 的发布订阅机制通知其他消费者。
    Lua 脚本:local value = struct.pack('dLc0', tonumber(ARGV[1]), string.len(ARGV[2]), ARGV[2]);
    redis.call('zadd', KEYS[2], ARGV[1], value);
    redis.call('rpush', KEYS[3], value);
    local v = redis.call('zrange', KEYS[2], 0, 0);
    if v[1] == value thenredis.call('publish', KEYS[4], ARGV[1]);
    end;
    

 5.3 测试延时队列:

场景:

  • 用于不是立即执行的任务场景:
  • 比如用户创建订单但是不付款,时间到后取消订单

 如图先订阅队列 test,手动开启:

/*** <简述>订阅延时队列* <详细描述>* @author syf* @date 2024/9/6 14:53*/@GetMapping("subscribeDelayQueue")public R<Void> subscribeDelayQueue() {Console.log("开启监听。。。。。。。。。。。。。。。。。。。。。。。。。。。");QueueUtils.subscribeBlockingQueue("test",(Consumer<TestDemo>) testDemo  ->{Console.log("接受到订单->{}", testDemo);Console.log("关闭订单");},false);return R.ok();}

 添加延时队列到test:

 @GetMapping("addDelayQueue")public void addDelayQueue() throws ParseException {Console.log("创建订单。。。。。。。。。。。。。。。。。。。");TestDemo testDemo = new TestDemo();testDemo.setValue("订单编号");QueueUtils.addDelayedQueueObject("test", testDemo, 10, TimeUnit.SECONDS);Console.log("等待10秒。。。。。。。。。。。。。。。。。。。");}

如图 10秒后,订阅队列监听到订单并关闭

开启监听。。。。。。。。。。。。。。。。。。。。。。。。。。。
2024-09-05 19:47:51 [XNIO-1 task-1] INFO  c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]结束请求 => URL[GET /subscribeDelayQueue],耗时:[51]毫秒
2024-09-05 19:47:54 [XNIO-1 task-1] INFO  c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]开始请求 => URL[GET /addDelayQueue],无参数
创建订单。。。。。。。。。。。。。。。。。。。
等待10秒。。。。。。。。。。。。。。。。。。。
2024-09-05 19:47:54 [XNIO-1 task-1] INFO  c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]结束请求 => URL[GET /addDelayQueue],耗时:[57]毫秒
接受到订单->TestDemo(id=null, deptId=null, userId=null, orderNum=null, testKey=null, value=订单编号, version=null, delFlag=null)
关闭订单

六、优先队列代码测试

场景:

vip 用户按照OrderNum,随机生成等级进行排队

 添加vip用户进入队列:

 插入数据时候会按照OrderNum 大小找到位置,就像list索引一样

/*** 添加队列数据** @param queueName 队列名*/@GetMapping("/add")public R<Void> add(String queueName) {// 用完了一定要销毁 否则会一直存在boolean b = QueueUtils.destroyPriorityQueue(queueName);log.info("通道: {} , 删除: {}", queueName, b);for (int i = 0; i < 10; i++) {int randomNum = RandomUtil.randomInt(10);PriorityDemo data = new PriorityDemo();data.setName("data-" + i);data.setOrderNum(randomNum);if (QueueUtils.addPriorityQueueObject(queueName, data)) {log.info("通道: {} , 发送数据: {}", queueName, data);} else {log.info("通道: {} , 发送数据: {}, 发送失败", queueName, data);}}return R.ok("操作成功");}

按照等级获取vip用户:

@GetMapping("/get")public R<Void> get(String queueName) {PriorityDemo data;do {data = QueueUtils.getPriorityQueueObject(queueName);log.info("通道: {} , 获取数据: {}", queueName, data);} while (data != null);return R.ok("操作成功");}

如图orderNum从 0 到7依次被打印

2024-09-06 11:06:57 [XNIO-1 task-1] INFO  c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]结束请求 => URL[GET /demo/queue/priority/get],耗时:[11]毫秒
2024-09-06 11:07:50 [XNIO-1 task-1] INFO  c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]开始请求 => URL[GET /demo/queue/priority/get],参数类型[param],参数:[{"queueName":["test"]}]
2024-09-06 11:07:50 [XNIO-1 task-1] INFO  c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(name=data-9, orderNum=0)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO  c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(name=data-1, orderNum=2)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO  c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(name=data-2, orderNum=2)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO  c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(name=data-3, orderNum=3)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO  c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(name=data-4, orderNum=3)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO  c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(name=data-8, orderNum=3)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO  c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(name=data-0, orderNum=5)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO  c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(name=data-7, orderNum=6)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO  c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(name=data-5, orderNum=7)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO  c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(name=data-6, orderNum=7)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO  c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: null
2024-09-06 11:07:51 [XNIO-1 task-1] INFO  c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]结束请求 => URL[GET /demo/queue/priority/get],耗时:[488]毫秒

  博主精心整理专栏,CV大法即可用,感谢您小手点一点 手动跪拜:  

1- SpringBoot框架常用配置(若依),代码解读:

http://t.csdnimg.cn/jpsSN

2- java常用工具类整理,示例演示:

http://t.csdnimg.cn/gmCfJ

3- CompletableFuture 异步编排实际代码展示

http://t.csdnimg.cn/ZuC0N

4- XXL-JOB 详细学习,手把手带入门

http://t.csdnimg.cn/lyR7Y

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 智能客服的演变:从传统到向量数据库的新时代
  • [iBOT] Image BERT Pre-Training with Online Tokenizer
  • springboot高校实验室预约系统-计算机毕业设计源码58031
  • 无需温度修正,测值准确可靠 GEO ACxxxx型振弦式锚索测力计
  • 机器学习特征分析
  • macos安装ArgoCD
  • Docker 学习 Day 1
  • 鸿蒙开发(API 12 Beta6版)【通用属性协议】 网络篇
  • 十分钟学会Kubernetes(K8S) 部署SpringBoot3.0
  • Java语言程序设计基础篇_编程练习题**17.20 (二进制编辑器)
  • 系统编程-多路IO复用
  • NLP自然语言处理学习过程中知识点总结
  • CSS具有哪些特点呢?
  • 滚雪球学Java(88):连接无限,资源有度:JavaSE数据库连接池深度解析,有两下子!
  • 顶级出图效果!免费在线使用FLux.1 模型,5s出图无限制!
  • #Java异常处理
  • 【从零开始安装kubernetes-1.7.3】2.flannel、docker以及Harbor的配置以及作用
  • Apache的80端口被占用以及访问时报错403
  • CSS魔法堂:Absolute Positioning就这个样
  • CSS实用技巧干货
  • java中的hashCode
  • js算法-归并排序(merge_sort)
  • log4j2输出到kafka
  • MySQL用户中的%到底包不包括localhost?
  • Netty 框架总结「ChannelHandler 及 EventLoop」
  • Phpstorm怎样批量删除空行?
  • PyCharm搭建GO开发环境(GO语言学习第1课)
  • python学习笔记 - ThreadLocal
  • TiDB 源码阅读系列文章(十)Chunk 和执行框架简介
  • 从零开始在ubuntu上搭建node开发环境
  • 动态魔术使用DBMS_SQL
  • 服务器之间,相同帐号,实现免密钥登录
  • 浮动相关
  • 看域名解析域名安全对SEO的影响
  • 力扣(LeetCode)357
  • 聊聊spring cloud的LoadBalancerAutoConfiguration
  • 物联网链路协议
  • 一天一个设计模式之JS实现——适配器模式
  • ionic入门之数据绑定显示-1
  • ​Benvista PhotoZoom Pro 9.0.4新功能介绍
  • ​LeetCode解法汇总2304. 网格中的最小路径代价
  • ​ssh-keyscan命令--Linux命令应用大词典729个命令解读
  • ​如何使用ArcGIS Pro制作渐变河流效果
  • # Apache SeaTunnel 究竟是什么?
  • ### Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLTr
  • #if和#ifdef区别
  • #stm32驱动外设模块总结w5500模块
  • #快捷键# 大学四年我常用的软件快捷键大全,教你成为电脑高手!!
  • (31)对象的克隆
  • (C语言)共用体union的用法举例
  • (delphi11最新学习资料) Object Pascal 学习笔记---第2章第五节(日期和时间)
  • (python)数据结构---字典
  • (附源码)springboot 房产中介系统 毕业设计 312341
  • (论文阅读40-45)图像描述1
  • (四)TensorRT | 基于 GPU 端的 Python 推理