当前位置: 首页 > news >正文

Spring Boot 集成 Redisson 实现消息队列

包含组件内容

  • RedisQueue:消息队列监听标识
  • RedisQueueInit:Redis队列监听器
  • RedisQueueListener:Redis消息队列监听实现
  • RedisQueueService:Redis消息队列服务工具

代码实现

RedisQueue

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** Redis消息队列注解*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisQueue {/*** 队列名*/String value();
}

RedisQueueInit

import jakarta.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** 初始化Redis队列监听器** @author 十八* @createTime 2024-09-09 22:49*/
@Slf4j
@Component
public class RedisQueueInit implements ApplicationContextAware {public static final String REDIS_QUEUE_PREFIX = "redis-queue";final AtomicBoolean shutdownRequested = new AtomicBoolean(false);@Resourceprivate RedissonClient redissonClient;private ExecutorService executorService;public static String buildQueueName(String queueName) {return REDIS_QUEUE_PREFIX + ":" + queueName;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, RedisQueueListener> queueListeners = applicationContext.getBeansOfType(RedisQueueListener.class);if (!queueListeners.isEmpty()) {executorService = createThreadPool();for (Map.Entry<String, RedisQueueListener> entry : queueListeners.entrySet()) {RedisQueue redisQueue = entry.getValue().getClass().getAnnotation(RedisQueue.class);if (redisQueue != null) {String queueName = redisQueue.value();executorService.submit(() -> listenQueue(queueName, entry.getValue()));}}}}private ExecutorService createThreadPool() {return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,Runtime.getRuntime().availableProcessors() * 4,60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),new NamedThreadFactory(REDIS_QUEUE_PREFIX),new ThreadPoolExecutor.CallerRunsPolicy());}private void listenQueue(String queueName, RedisQueueListener redisQueueListener) {queueName = buildQueueName(queueName);RBlockingQueue<?> blockingQueue = redissonClient.getBlockingQueue(queueName);log.info("Redis队列监听开启: {}", queueName);while (!shutdownRequested.get() && !redissonClient.isShutdown()) {try {Object message = blockingQueue.take();executorService.submit(() -> redisQueueListener.consume(message));} catch (RedissonShutdownException e) {log.info("Redis连接关闭,停止监听队列: {}", queueName);break;} catch (Exception e) {log.error("监听队列异常: {}", queueName, e);}}}public void shutdown() {if (executorService != null) {executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {executorService.shutdownNow();}} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();}}shutdownRequested.set(true);if (redissonClient != null && !redissonClient.isShuttingDown()) {redissonClient.shutdown();}}private static class NamedThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public NamedThreadFactory(String prefix) {this.namePrefix = prefix;}@Overridepublic Thread newThread(@NotNull Runnable r) {return new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());}}}

RedisQueueListener

/*** Redis消息队列监听实现** @author 十八* @createTime 2024-09-09 22:51*/
public interface RedisQueueListener<T> {/*** 队列消费方法** @param content 消息内容*/void consume(T content);
}

RedisQueueService

import jakarta.annotation.Resource;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;/*** Redis 消息队列服务** @author 十八* @createTime 2024-09-09 22:52*/
@Component
public class RedisQueueService {@Resourceprivate RedissonClient redissonClient;/*** 添加队列** @param queueName 队列名称* @param content   消息* @param <T>       泛型*/public <T> void send(String queueName, T content) {RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));blockingQueue.add(content);}/*** 添加延迟队列** @param queueName 队列名称* @param content   消息类型* @param delay     延迟时间* @param timeUnit  单位* @param <T>       泛型*/public <T> void sendDelay(String queueName, T content, long delay, TimeUnit timeUnit) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, timeUnit);}/*** 发送延迟队列消息(单位毫秒)** @param queueName 队列名称* @param content   消息类型* @param delay     延迟时间* @param <T>       泛型*/public <T> void sendDelay(String queueName, T content, long delay) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, TimeUnit.MILLISECONDS);}
}

测试

创建监听对象

import cn.yiyanc.infrastructure.redis.annotation.RedisQueue;
import cn.yiyanc.infrastructure.redis.queue.RedisQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author 十八* @createTime 2024-09-10 00:09*/
@Slf4j
@Component
@RedisQueue("test")
public class TestListener implements RedisQueueListener<String> {@Overridepublic void invoke(String content) {log.info("队列消息接收 >>> {}", content);}
}

测试用例

import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 十八* @createTime 2024-09-10 00:11*/
@RestController
@RequestMapping("queue")
public class QueueController {@Resourceprivate RedisQueueService redisQueueService;@PostMapping("send")public void send(String message) {redisQueueService.send("test", message);redisQueueService.sendDelay("test", "delay messaege -> " + message, 1000);}}

测试结果

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • NLP:微调BERT进行文本分类
  • VMware Fusion虚拟机Mac版 安装Win10系统教程
  • 插入排序详解
  • 物理感知扩散的 3D 分子生成模型 - PIDiff 评测
  • 索引设计的5个原则
  • 一文详解Unity下RTMP推送|轻量级RTSP服务|RTSP|RTMP播放模块说明
  • 盘点3款.NetCore(C#)开源免费商城系统
  • 管理依赖版本-maven工程parent项目巧配置
  • C语言CRC16_CCITT_FALSE函数法和查表法实现
  • 【梯度下降算法学习笔记】
  • 基于开源鸿蒙(OpenHarmony)的【智能家居综合应用】系统
  • 评价类——熵权法(Entropy Weight Method, EWM),完全客观评价
  • 老年人养生之道:岁月静好,健康常伴
  • 代码随想录 八股文训练营40天总结
  • Cypress初次安装启动常见问题
  • 【刷算法】从上往下打印二叉树
  • 2018一半小结一波
  • HTML5新特性总结
  • Java Agent 学习笔记
  • Java方法详解
  • mysql innodb 索引使用指南
  • MySQL的数据类型
  • Nodejs和JavaWeb协助开发
  • overflow: hidden IE7无效
  • React中的“虫洞”——Context
  • Redux 中间件分析
  • SpiderData 2019年2月25日 DApp数据排行榜
  • SQLServer插入数据
  • Vim 折腾记
  • Vue.js 移动端适配之 vw 解决方案
  • 浮现式设计
  • 坑!为什么View.startAnimation不起作用?
  • 罗辑思维在全链路压测方面的实践和工作笔记
  • 前端面试题总结
  • 写给高年级小学生看的《Bash 指南》
  • ​草莓熊python turtle绘图代码(玫瑰花版)附源代码
  • ​七周四次课(5月9日)iptables filter表案例、iptables nat表应用
  • ​什么是bug?bug的源头在哪里?
  • ‌分布式计算技术与复杂算法优化:‌现代数据处理的基石
  • # Pytorch 中可以直接调用的Loss Functions总结:
  • #php的pecl工具#
  • (173)FPGA约束:单周期时序分析或默认时序分析
  • (4.10~4.16)
  • (C语言版)链表(三)——实现双向链表创建、删除、插入、释放内存等简单操作...
  • (pojstep1.1.1)poj 1298(直叙式模拟)
  • (Redis使用系列) Springboot 实现Redis 同数据源动态切换db 八
  • (第30天)二叉树阶段总结
  • (几何:六边形面积)编写程序,提示用户输入六边形的边长,然后显示它的面积。
  • (考研湖科大教书匠计算机网络)第一章概述-第五节1:计算机网络体系结构之分层思想和举例
  • (论文阅读32/100)Flowing convnets for human pose estimation in videos
  • (四)Controller接口控制器详解(三)
  • (四)React组件、useState、组件样式
  • (一)ClickHouse 中的 `MaterializedMySQL` 数据库引擎的使用方法、设置、特性和限制。
  • (转)Linux下编译安装log4cxx
  • (转)一些感悟