当前位置: 首页 > news >正文

spring-boot 整合 redisson 实现延时队列(文末有彩蛋)

应用场景

通常在一些需要经历一段时间或者到达某个指定时间节点才会执行的功能,比如以下这些场景:

  • 订单超时提醒
  • 收货自动确认
  • 会议提醒
  • 代办事项提醒

为什么使用延时队列

对于数据量小且实时性要求不高的需求来说,最简单的方法就是定时扫描数据库。

但是,当数量达到数百万、上千万级别且时,定时扫库就显得非常低效且消耗资源,

甚至有些时间间隔小实时性要求高的情况,上一次扫描还没结束,下一次就又开始了,

这时候如果使用延时队列就会比较合适

延时队列的几种方式:

  • Quartz 定时任务实现扫库
  • DelayQueue JDK中提供了一组实现延迟队列的API
  • Redis sorted set
  • Redis 过期键监听回调
  • RabbitMQ 死信队列
  • RabbitMQ 基于插件实现延迟队列
  • Wheel 时间轮训算法

Redisson 实现延时队列

顾名思义 Redis son 就是 Redis 的儿子,举个栗子先:

1.引入 pom

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>${lastest.version}</version>
</dependency>

2.封装一个 RedissonQueue 类

@Service
public class RedissonQueue {public static final String QUEUE = "delayQueue";// 默认超时时间,30秒public static final Integer DEFAULT_TIMEOUT = 30;@Resourceprivate RedissonClient redissonClient;// 加入任务并设置到期时间public void offer(String taskId, Integer timeout) {RDelayedQueue<String> delayedQueue = delayedQueue();delayedQueue.offer(taskId, Objects.isNull(timeout) ? DEFAULT_TIMEOUT : timeout, TimeUnit.SECONDS);}// 移除任务public void remove(String taskId) {RDelayedQueue<String> delayedQueue = delayedQueue();delayedQueue.removeIf(messageId -> messageId.equals(taskId));}// 任务列表public RDelayedQueue<String> delayedQueue() {RBlockingDeque<String> blockingDeque = blockingDeque();return redissonClient.getDelayedQueue(blockingDeque);}public RBlockingDeque<String> blockingDeque() {return redissonClient.getBlockingDeque(QUEUE);}public boolean isShutdown() {return redissonClient.isShutdown();}public void shutdown() {redissonClient.shutdown();}}

3.交给 Spring 管理

@Slf4j
@Service
public class RedissonService implements ApplicationRunner {@Resourceprivate RedissonQueue redissonQueue;@Resource(name = "threadPoolTaskExecutor")private ThreadPoolTaskExecutor executor;@Overridepublic void run(ApplicationArguments args) {RBlockingDeque<String> blockingDeque = redissonQueue.blockingDeque();executor.execute(() -> {while (true) {if (redissonQueue.isShutdown()) {return;} else {String messageId = null;try {messageId = blockingDeque.take();} catch (InterruptedException e) {log.warn("RedissonConsumer error:{}", e.getMessage());}if (!Objects.isNull(messageId) && !messageId.isEmpty()) {log.warn("timeout messageId : {}", messageId);}}}});}// 初始化,启动服务就执行一次@PostConstructpublic void init() {redissonQueue.delayedQueue();}@PreDestroypublic void shutdown() {redissonQueue.shutdown();}}

4.测试接口

@Operation(summary = "添加任务", description = "添加任务")
@PostMapping
public ResponseEntity<?> add(@RequestParam(value = "taskId", required = false) String taskId,@RequestParam(value = "timeout", required = false) Integer timeout) {taskId = StringUtils.isEmpty(taskId) ? String.valueOf(snowflake.nextId()) : taskId;redissonQueue.offer(taskId, timeout);return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}@Operation(summary = "移除任务", description = "移除任务")
@DeleteMapping(value = "/{taskId}")
public ResponseEntity<?> remove(@PathVariable("taskId") String taskId) {redissonQueue.remove(taskId);return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}

5.测试结果

添加10个任务

在这里插入图片描述

删除第1个任务

在这里插入图片描述

可以看到第一个任务删除后没有被执行(没有设置到期时间,默认为30秒到期)

在这里插入图片描述

实现原理

  • redisson_delay_queue_timeout:delayQueue,sorted set 数据类型,存放所有延迟任务,按延迟任务的到期时间戳(提交任务时间戳 +
    延迟时间)排序,所以列表最前面第一个元素就是整个延迟队列中最早被执行的任务。
  • redisson_delay_queue:delayQueue,list 数据类型,也是存放所有任务。
  • delayQueue,list 数据类型,被称为目标队列,这个里面存放的任务都是已经到延迟时间的,可以被消费者获取的任务,所以上面示例中
    RBlockingQueue 的 take 方法是从此目标队列中获取任务的。
  • redisson_delay_queue_channel:delayQueue,是一个 channel,用来通知客户端开启一个延迟任务
  • 生产者提交任务时将任务放到 redisson_delay_queue_timeout:delayQueue 中,提交任务的时间戳+延迟时间
  • 客户端会有一个延迟任务,这个延迟任务会向 Redis Server 发送一段 lua 脚本,Redis 执行 lua 脚本中的命令,此操作是原子性的

lua 脚本主要干两件事

  • 将到了延迟时间的任务从 redisson_delay_queue_timeout:delayQueue 中移除,存到 delayQueue 这个目标队列
  • 获取到 redisson_delay_queue_timeout:delayQueue 中最早到期时间的任务的到期时间戳,发布到 redisson_delay_queue_channel:
    delayQueue channel 中

当客户端监听到 redisson_delay_queue_channel:delayQueue 这个 channel 的消息时,会再次提交一个客户端延迟任务,延迟时间就是消息(最早到期时间任务的到期时间戳)当前时间戳
这个时间其实也就是 redisson_delay_queue_channel:delayQueue 中最早到期时间的任务的剩余的延迟时间。
一旦时间来到最早到期时间任务的到期时间戳,redisson_delay_queue_timeout:delayQueue 中最早到期时间的任务已经到期,客户端的延迟任务也同时到期,
于是开始执行 lua 脚本操作,及时将到期任务放到目标队列中。然后再次发布剩余的延迟任务中最早到期任务的到期时间戳到 channel
中,
如此循环运行下去,保证 redisson_delay_queue_timeout:delayQueue 中到期数据能及时放到目标队列中。
这里存在一个特殊情况,需要项目启动时就执行一次延时队列。因为由于没有客户端延迟任务的执行,
可能会出现 redisson_delay_queue_timeout:delayQueue 队列中有到期但是没有被放到目标队列的可能,启动就执行一次是为了保证到期的数据能被及时放到目标队列中。

结论

  • Redisson 方案理论上没有延迟,但当消息数量剧增,消费者消费缓慢这种情况下,可能会导致延迟任务消费的延迟。

  • 消息丢失问题 Redisson 方案最大程度上减轻消息丢失的可能性,因为所有任务都是存在 list 和 sorted set 两种数据类型中,Redis
    有持久化机制。除非整个 redis 集群宕机,可能丢失一小部分数据。

  • 广播任务问题,是不会出现的,因为每个客户端都是从同一个目标队列中获取任务。

Redisson 这种实现方案是比较合适且靠谱的,一般中小型项目建议用 Redisson 实现延迟队列,规模较大的项目直接上 MQ。

整合DEMO仓库地址

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • TiDB实践—索引加速+分布式执行框架创建索引提升70+倍
  • SpringBoot RestHighLevelClient 按版本更新
  • 自动驾驶AVM环视算法–全景和标定全功能算法实现和exe测试demo
  • vscode配置latex环境制作【文档、简历、resume】
  • Chapter16 渲染优化技术——Shader入门精要学习笔记
  • 企业培训 | CATIA数字样机培训
  • 为什么Spring不推荐@Autowired用于字段注入
  • Facebook Dating:社交平台的约会新体验
  • 专业140+总分420+天津大学815信号与系统考研经验天大电子信息与通信工程,真题,大纲,参考书。
  • docker部署Guacamole手册
  • SpringBoot应用从jar包部署改为war包部署要做哪些修改
  • SpringCloud---服务注册(Eureka)
  • Ubuntu 24.04 LTS 桌面安装MT4或MT5 (MetaTrader)教程
  • JAVA的接口和实现类
  • Power Shell查看进程、排序、打印出前五
  • Dubbo 整合 Pinpoint 做分布式服务请求跟踪
  • Fundebug计费标准解释:事件数是如何定义的?
  • HTML中设置input等文本框为不可操作
  • HTTP那些事
  • iOS小技巧之UIImagePickerController实现头像选择
  • java8-模拟hadoop
  • Java知识点总结(JavaIO-打印流)
  • java中具有继承关系的类及其对象初始化顺序
  • laravel with 查询列表限制条数
  • Netty源码解析1-Buffer
  • node.js
  • rabbitmq延迟消息示例
  • vue从入门到进阶:计算属性computed与侦听器watch(三)
  • 初探 Vue 生命周期和钩子函数
  • 多线程事务回滚
  • 两列自适应布局方案整理
  • 如何优雅地使用 Sublime Text
  • 使用Gradle第一次构建Java程序
  • 适配mpvue平台的的微信小程序日历组件mpvue-calendar
  • 专访Pony.ai 楼天城:自动驾驶已经走过了“从0到1”,“规模”是行业的分水岭| 自动驾驶这十年 ...
  • ​如何使用QGIS制作三维建筑
  • #Linux杂记--将Python3的源码编译为.so文件方法与Linux环境下的交叉编译方法
  • #我与Java虚拟机的故事#连载12:一本书带我深入Java领域
  • (1)STL算法之遍历容器
  • (PySpark)RDD实验实战——取最大数出现的次数
  • (二)windows配置JDK环境
  • (幽默漫画)有个程序员老公,是怎样的体验?
  • (转)C#开发微信门户及应用(1)--开始使用微信接口
  • (转)linux 命令大全
  • (转)PlayerPrefs在Windows下存到哪里去了?
  • *算法训练(leetcode)第四十五天 | 101. 孤岛的总面积、102. 沉没孤岛、103. 水流问题、104. 建造最大岛屿
  • ..thread“main“ com.fasterxml.jackson.databind.JsonMappingException: Jackson version is too old 2.3.1
  • .net core 管理用户机密
  • .Net Remoting常用部署结构
  • .NET 读取 JSON格式的数据
  • .NET 分布式技术比较
  • .NET编程C#线程之旅:十种开启线程的方式以及各自使用场景和优缺点
  • @Autowired 和 @Resource 区别的补充说明与示例
  • [ C++ ] STL_stack(栈)queue(队列)使用及其重要接口模拟实现
  • [ vulhub漏洞复现篇 ] struts2远程代码执行漏洞 S2-005 (CVE-2010-1870)