当前位置: 首页 > news >正文

(二)延时任务篇——通过redis的key监听,实现延迟任务实战

前言

本节内容是关于使用redis的过期key,通过开启其监听失效策略,模拟订单延迟任务的执行流程。其核心原理是通过使用redis订阅与发布的方式,将过期失效的key通过广播的方式,发布给客户端,客户端可以监听此消息进而消费消息。需要注意的是官方并不推荐此方式,因为其容易造成数据丢失,例如没有客户端消费消息,消息也会丢失。对于一些安全性要求比较低的场景,可以使用此方式实现延迟队列。

正文

  • 引入redis的pom依赖
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
  • application.yml中配置redis连接
spring:data:redis:host: 127.0.0.1port: 6379database: 0connect-timeout: 30000timeout: 30000lettuce:pool:enabled: truemax-active: 200max-idle: 50max-wait: -1min-idle: 10shutdown-timeout: 100
  •  配置redis的缓冲池,并注入redis的消息监听容器
package com.yundi.tps.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.cache.CacheManager;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.cache.support.CompositeCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.util.concurrent.TimeUnit;@Configuration
public class RedisConfig {@Beanpublic CacheManager cacheManager(RedisConnectionFactory connectionFactory) {// redis缓存管理器RedisCacheConfiguration defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig().serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));RedisCacheManager redisCacheManager = RedisCacheManager.builder(connectionFactory).cacheDefaults(defaultCacheConfig).transactionAware().build();return new CompositeCacheManager(redisCacheManager);}@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);serializer.serialize(objectMapper);template.setValueSerializer(serializer);template.setKeySerializer(new StringRedisSerializer());template.afterPropertiesSet();return template;}@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);return container;}
}

  • 实现一个KeyExpirationEventMessageListener过期的监听器RedisKeyExpirationListener
package com.yundi.tps.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/*** Redis失效事件 key** @param message* @param pattern*/@Overridepublic void onMessage(Message message, byte[] pattern) {//notify-keyspace-events Ex// 匹配规则String patternRule = new String(pattern);log.info("patternRule:{}", patternRule);// 监听的通道byte[] channel = message.getChannel();log.info("channel:{}", new String(channel));// 过期的keyString expireKey = message.toString();log.info("expireKey:{}", expireKey);//TODO 处理订单的后续业务逻辑}}
  •  实现一个创建订单的延时任务接口,模拟订单超时
package com.yundi.tps.controller;import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.yundi.xyxc.tps.common.ApiResponse;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.TimeUnit;@Tag(name = "订单管理")
@RestController
@RequestMapping("/api/tps/order")
public class OrderController {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Operation(summary = "创建订单")@PostMapping("save")public ApiResponse save() {String orderId = String.valueOf(IdWorker.getId());stringRedisTemplate.opsForValue().setIfAbsent(orderId, orderId, 60, TimeUnit.SECONDS);return ApiResponse.ok();}
}
  • 开启redis key的失效监听,在redis配置中添加以下配置
notify-keyspace-events Ex

  •  启动redis服务和客户端项目,发送延时订单任务,看客户端是否能够消费到此延迟任务

结语

需要注意的是,该方式实现的延迟任务安全性较低,对于安全性高的场景,并不推荐此种方式。关于使用redis的key监听,实现延迟任务实战内容到这里就结束了,下期见。。。。。。

相关文章:

  • leetcode日记(63)颜色分类
  • Android开发之ActivityManagerService
  • 【区块链】JavaScript连接web3钱包,实现测试网络中的 Sepolia ETH余额查询、转账功能
  • 免费!OpenAI发布最新模型GPT-4o mini,取代GPT-3.5,GPT-3.5退出历史舞台?
  • 【Linux】常见指令的使用
  • IT服务运营中的过程要素管理(至简)
  • ChatGPT小狐狸AI付费创作系统v3.0.3+前端
  • QT--聊天室
  • 【Nacos安装】
  • MySQL,GROUP BY子句的作用是什么?having和where的区别在哪里说一下jdbc的流程
  • NSS [SWPUCTF 2022 新生赛]funny_php
  • 增量学习中Task incremental、Domain incremental、Class incremental 三种学习模式的概念及代表性数据集?
  • AgentBench: Evaluating LLMs As Agents
  • C语言 | Leetcode C语言题解之第283题移动零
  • <新>植物大战僵尸杂交版v2.3丨附PC+手机+Mac安装教程,IOS安装新教程!​
  • [译] 怎样写一个基础的编译器
  • [译]如何构建服务器端web组件,为何要构建?
  • 78. Subsets
  • Asm.js的简单介绍
  • django开发-定时任务的使用
  • java中的hashCode
  • JS创建对象模式及其对象原型链探究(一):Object模式
  • JS进阶 - JS 、JS-Web-API与DOM、BOM
  • NLPIR语义挖掘平台推动行业大数据应用服务
  • php的插入排序,通过双层for循环
  • Twitter赢在开放,三年创造奇迹
  • 批量截取pdf文件
  • 使用 @font-face
  • 视频flv转mp4最快的几种方法(就是不用格式工厂)
  • 基于django的视频点播网站开发-step3-注册登录功能 ...
  • #NOIP 2014# day.1 T3 飞扬的小鸟 bird
  • #我与Java虚拟机的故事#连载13:有这本书就够了
  • (2)nginx 安装、启停
  • (2024,LoRA,全量微调,低秩,强正则化,缓解遗忘,多样性)LoRA 学习更少,遗忘更少
  • (32位汇编 五)mov/add/sub/and/or/xor/not
  • (arch)linux 转换文件编码格式
  • (C#)获取字符编码的类
  • (cos^2 X)的定积分,求积分 ∫sin^2(x) dx
  • (ibm)Java 语言的 XPath API
  • (Java实习生)每日10道面试题打卡——JavaWeb篇
  • (pt可视化)利用torch的make_grid进行张量可视化
  • (待修改)PyG安装步骤
  • (读书笔记)Javascript高级程序设计---ECMAScript基础
  • (多级缓存)多级缓存
  • (机器学习-深度学习快速入门)第一章第一节:Python环境和数据分析
  • (五)c52学习之旅-静态数码管
  • (原)本想说脏话,奈何已放下
  • .bat批处理(二):%0 %1——给批处理脚本传递参数
  • .net 7 上传文件踩坑
  • .NET Core 将实体类转换为 SQL(ORM 映射)
  • .NET CORE使用Redis分布式锁续命(续期)问题
  • .Net Remoting(分离服务程序实现) - Part.3
  • .NET Standard 支持的 .NET Framework 和 .NET Core
  • .NET/C# 如何获取当前进程的 CPU 和内存占用?如何获取全局 CPU 和内存占用?
  • .NET6使用MiniExcel根据数据源横向导出头部标题及数据