当前位置: 首页 > news >正文

redis stream restTemplate消息监听队列框架搭建

整体思路

        1. pom增加redis依赖;

        2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;

        3. 将消息订阅bean及监听器注册到配置中;

1. pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.6</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

2. 消息监听器实现代码

package cn.thuniwhir.fileserver.redis;import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @Description: TODO**/
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);// 创建一个线程池private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());@Overridepublic void onMessage(MapRecord message) {// 异步处理消息threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));});}
}

3. redis订阅bean及监听器注册

package cn.thuniwhir.fileserver.redis;import cn.thuniwhir.fileserver.context.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import java.time.Duration;
import java.util.stream.Collectors;/*** @Description: TODO**/
@Configuration
public class RedisMQConfig {@Autowiredprivate RedisMQListener redisMQListener;@Autowiredprivate RedisUtils redisUtils;private static RedisTemplate<Object, Object> redisTemplate;private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {this.redisTemplate = redisTemplate;}@Beanpublic Subscription subscription(RedisConnectionFactory redisConnectionFactory) {if (redisUtils.hasKey(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {StreamInfo.XInfoGroups xInfoGroups = redisTemplate.opsForStream().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);if (xInfoGroups.isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);} else {if (xInfoGroups.stream().filter(xInfoGroups1 -> xInfoGroups1.groupName().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(Collectors.toList()).isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}}} else {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build();StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), StreamOffset.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ReadOffset.lastConsumed()), redisMQListener);streamMessageListenerContainer.start();return subscription;}}

4. 测试生产消息 消息监听成功

4.1 生产消息

@RequestMapping("/produceMessage")public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {String key = jsonObject.getString("key");String value = jsonObject.getString("value");MapRecord<Object, String, Object> mapRecord = MapRecord.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Collections.singletonMap(key, value));redisTemplate.opsForStream().add(mapRecord);System.out.println("produceMessage Thread Name:" + Thread.currentThread().getName());return formatResult(null);}

4.2 消息监听器监听消息到达 代码见第二节

4.3 测试结果

相关文章:

  • 零基础也可以探索 PyTorch 中的上采样与下采样技术
  • 以太网交换机——稳定安全,构筑数据之桥
  • 【2019】360Java工程师客观题总结
  • AI绘画软件Stable Diffusion模型/Lora/VAE文件存放位置
  • vue前端开发自学demo,父子组件之间传递数据demo2
  • 235.【2023年华为OD机试真题(C卷)】机器人搬砖(二分查找-JavaPythonC++JS实现)
  • 硬核加码!星邦蓝助力全球运力最大固体火箭“引力一号”海上首飞
  • python处理目录下文本文件去除空格和空行
  • 关于Python里xlwings库对Excel表格的操作(三十二)
  • Rust类型之字符串
  • 活动回顾∣“全邻友好,艺术大咖交流会”——员村街开展社区微型养老博览会长者文艺汇演活动
  • 内 存 取 证
  • 查看SQL Server的表字段类型、长度、描述以及是否可为null
  • Redis 教程
  • untiy使用http下载资源
  • -------------------- 第二讲-------- 第一节------在此给出链表的基本操作
  • 【EOS】Cleos基础
  • 【译】React性能工程(下) -- 深入研究React性能调试
  • 4. 路由到控制器 - Laravel从零开始教程
  • angular2开源库收集
  • C++11: atomic 头文件
  • CSS 三角实现
  • Druid 在有赞的实践
  • E-HPC支持多队列管理和自动伸缩
  • Next.js之基础概念(二)
  • Octave 入门
  • v-if和v-for连用出现的问题
  • 从地狱到天堂,Node 回调向 async/await 转变
  • 搭建gitbook 和 访问权限认证
  • 基于web的全景—— Pannellum小试
  • 三分钟教你同步 Visual Studio Code 设置
  • 深度学习之轻量级神经网络在TWS蓝牙音频处理器上的部署
  • Java总结 - String - 这篇请使劲喷我
  • postgresql行列转换函数
  • 阿里云移动端播放器高级功能介绍
  • 完善智慧办公建设,小熊U租获京东数千万元A+轮融资 ...
  • ​总结MySQL 的一些知识点:MySQL 选择数据库​
  • #微信小程序(布局、渲染层基础知识)
  • (14)学习笔记:动手深度学习(Pytorch神经网络基础)
  • (html5)在移动端input输入搜索项后 输入法下面为什么不想百度那样出现前往? 而我的出现的是换行...
  • (附源码)小程序 交通违法举报系统 毕业设计 242045
  • (三分钟)速览传统边缘检测算子
  • (十八)devops持续集成开发——使用docker安装部署jenkins流水线服务
  • (五) 一起学 Unix 环境高级编程 (APUE) 之 进程环境
  • (译)2019年前端性能优化清单 — 下篇
  • (转)创业家杂志:UCWEB天使第一步
  • *** 2003
  • .bat批处理(三):变量声明、设置、拼接、截取
  • .Net - 类的介绍
  • .net 8 发布了,试下微软最近强推的MAUI
  • .NET 中使用 Mutex 进行跨越进程边界的同步
  • /usr/bin/perl:bad interpreter:No such file or directory 的解决办法
  • @property python知乎_Python3基础之:property
  • @WebServiceClient注解,wsdlLocation 可配置
  • [ 网络基础篇 ] MAP 迈普交换机常用命令详解