当前位置: 首页 > news >正文

【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶

 🎉🎉欢迎光临🎉🎉

🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀

🌟特别推荐给大家我的最新专栏《Redis实战与进阶》

本专栏纯属为爱发电永久免费!!!

这是苏泽的个人主页可以看到我其他的内容哦👇👇

努力的苏泽icon-default.png?t=N7T8http://suzee.blog.csdn.net/

最近工作室的一个业务跟另一个业务合并 自然要用到MQ(消息队列Message Queue)那么很显然 就要部署个RabbitMQ到服务器上了  

我们用的是云托管的的服务 那自然是部署中间件到云服务上去了 服务是一路开通 结果到了需要调试的时候 怎么也连不上 (说是内网直连,但关键是 同事们都在线下做本地测试的呀)

直接无语了 面对这一场景 怎么办?业务还要继续 等着交货的  于是我想起了之前学过的技术栈 

Redis 也能作为消息队列的(不过用的比较少所以不大容易记起来 或者也没啥人知道) 于是一顿卡卡操作  步骤还比MQ简单  下面就来看是如何实现的

正片

目录

最近工作室的一个业务跟另一个业务合并 自然要用到MQ(消息队列Message Queue)那么很显然 就要部署个RabbitMQ到服务器上了  

Redis 也能作为消息队列的(不过用的比较少所以不大容易记起来 或者也没啥人知道) 于是一顿卡卡操作  步骤还比MQ简单  下面就来看是如何实现的

正片

Redis作为消息队列的优缺点:

使用Redis作为消息队列的选择相对于使用专门的消息队列系统(如RabbitMQ、Kafka等)有以下优点和:

缺点也很明显:

应用场景:

Redis实现消息队列系统 实现步骤:

配置Redis:

设置RedisTemplate的序列化器。在消息队列中,你可以使用默认的序列化器,即StringRedisSerializer,它会将消息以字符串的形式进行存储和传输。可以通过以下代码设置默认的序列化器:

实现消息的发布和订阅功能。

实战与改良

代码解释

我把消息处理的系统中心化处理,也就是说是这个监听系统他可以监听reserved通道的所有业务类型,我这里列举了四种wait,agree,refuse,over四种 但如果是更大的业务体系 同一个通道可能面临着更多可能性分支  那如果按照第一套的方案 需要对每一个具体业务实现一个监听者 工作量就很大(可能这样耦合会低一些吧)

但是我这样把消息集中处理 然后短信发送系统就专门只做短信发送的事情 xx系统就只做对应的工作 就能把工作上的耦合度大大降低

那么大家应该也注意到我的两个负责序列化和反序列化的方法了吧 这是因为业务需求 要把对象封装成一个类 所以这里的方案就是在信息中心处理器上 自定义一个序列化方案(如果再做得好一点其实可以把这个序列器抽理出来封装为一个抽象方法 用泛型来定义返回结果和参数 这样就能序列化所有引用的类型了)  

遇到的问题:

对了 中途遇到了这样一个错误

原因与分析:

实际业务中的测试

发布服务

订阅服务(监听服务)

测试结果


Redis作为消息队列的优缺点:

使用Redis作为消息队列的选择相对于使用专门的消息队列系统(如RabbitMQ、Kafka等)有以下优点和:

  1. 简单轻量:Redis是一个内存中的数据存储系统,具有轻量级和简单的特点。相比较专门的消息队列系统,使用Redis作为消息队列不需要引入额外的组件和依赖,可以减少系统的复杂性。

  2. 速度快:由于Redis存储在内存中,它具有非常高的读写性能。这对于需要低延迟的应用程序非常有优势。

  3. 多种数据结构支持:Redis提供了丰富的数据结构,如列表、发布/订阅、有序集合等。这使得Redis在处理不同类型的消息和任务时更加灵活。

  4. 数据持久化:Redis可以通过将数据持久化到磁盘来提供数据的持久性。这意味着即使Redis重启,之前的消息也不会丢失。

  5. 广泛的应用场景:Redis不仅可以用作消息队列,还可以用作缓存、数据库、分布式锁等多种用途。如果你的应用程序已经使用了Redis,那么使用Redis作为消息队列可以减少技术栈的复杂性。

缺点也很明显:

  1. 缺少一些高级特性:相对于专门的消息队列系统,Redis在消息队列方面的功能可能相对简单。例如,它可能缺乏一些高级消息传递功能,如消息重试、消息路由、持久化消息等。

  2. 可靠性和一致性:Redis的主要设计目标是提供高性能和低延迟,而不是强一致性和高可靠性。在某些情况下,Redis可能会丢失消息,或者在出现故障时可能无法提供持久性保证。

应用场景:

适用于简单的中小型项目 如果功能简单,访问量并不大可以考虑
如果你的应用程序对可靠性和高级功能有严格要求,并且需要处理大量的消息和复杂的消息路由,那么使用专门的消息队列系统可能更合适。

Redis实现消息队列系统 实现步骤:

配置Redis:

  1. 首先,确保你已经正确地配置了Redis和Lettuce依赖项,并创建了LettuceConnectionFactory对象。

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
      redis:host: port: 6379password: lettuce:pool:max-active: 1000max-idle: 1000min-idle: 0time-between-eviction-runs: 10smax-wait: 10000

  2. 创建一个RedisTemplate对象,并将LettuceConnectionFactory设置为其连接工厂:

 @Beanpublic RedisTemplate<String, String> redisTemplate(LettuceConnectionFactory connectionFactory) {RedisTemplate<String, String> template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);template.setDefaultSerializer(new StringRedisSerializer());return template;}

设置RedisTemplate的序列化器。在消息队列中,你可以使用默认的序列化器,即StringRedisSerializer,它会将消息以字符串的形式进行存储和传输。可以通过以下代码设置默认的序列化器:

redisTemplate.setDefaultSerializer(new StringRedisSerializer());

实现消息的发布和订阅功能。

  • 发布消息:
    redisTemplate.convertAndSend("channel_name", "message_payload");

    在上述代码中,"channel_name"是消息的通道名称,"message_payload"是要发布的消息内容。

  • 订阅消息:
  • 首先,创建一个MessageListener实现类来处理接收到的消息:

public class MessageListenerImpl implements MessageListener {@Overridepublic void onMessage(Message message, byte[] pattern) {// 处理接收到的消息String channel = new String(message.getChannel());String payload = new String(message.getBody());// 执行自定义的逻辑}
}

创建一个LettuceMessageListenerAdapter对象,并提供MessageListener实现类:

LettuceMessageListenerAdapter listenerAdapter = new LettuceMessageListenerAdapter(new MessageListenerImpl());
listenerAdapter.afterPropertiesSet();

创建一个RedisMessageListenerContainer对象,并配置它的LettuceConnectionFactory和监听适配器:

RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
listenerContainer.setConnectionFactory(lettuceConnectionFactory);
listenerContainer.addMessageListener(listenerAdapter, new ChannelTopic("通道名称"));
listenerContainer.start();

通过以上步骤,我们创建了一个LettuceConnectionFactory对象来与Redis服务器建立连接。然后,我们创建了一个MessageListener实现类来处理接收到的消息。接下来,我们创建了一个LettuceMessageListenerAdapter对象,并提供MessageListener实现类。最后,我们创建了一个RedisMessageListenerContainer对象,并配置它的LettuceConnectionFactory和监听适配器,然后启动容器以开始监听指定通道上的消息。

以上的方案 好处就是 可以很明显的知道监听者在哪个部分 监听对应通道的信息 然而 业务当中 如果每一个对应模块的业务和通道都建立一个监听者来进行监听(我们假设每一个就业务所要得到消息以后所执行的逻辑都不相同) 那这个工作量就会暴增 

于是就有了第二种写法 :

实战与改良

/**** @title MessageManager* @author SUZE* @Date 2-17**/
@Component
public class ReservedMessageManager {private String ListenerId;private String UserId;private String message;private final RedisTemplate<String, String> redisTemplate;@Autowiredpublic ReservedMessageManager(RedisTemplate<String, String> redisTemplate) {this.redisTemplate = redisTemplate;subscribeToChannel("reserved");}@Resourceprivate SmsServer smsServer;public void publishMessage(String channel, reserveMessage message) {String  Message=serialize(message);redisTemplate.convertAndSend("channel_name", "message_payload");redisTemplate.convertAndSend(channel, Message);}// 接收到消息时触发的事件private void handleReserveMessage(String channel, reserveMessage reserveMessage) {if (reserveMessage != null) {String userId = reserveMessage.getUserId();String ListenerId=reserveMessage.getListenerId();String message = reserveMessage.getMessage();//TODO 处理接收到的消息逻辑 这里后续要对Message进行一个检测他有wait agree和refused和over四种状态 思种状态就是不一样的发送内容switch (message){//TODO 消息要给两边都发 所以要发两份 发信息的文案case "wait":smsServer.sendSms(userId,ListenerId,message);break;case "agree":smsServer.sendSms(userId,ListenerId,message);break;case "refuse":smsServer.sendSms(userId,ListenerId,message);break;case "over"://这里要操作文档系统了//拒绝的话 那就要监听一下smsServer.sendSms(userId,ListenerId,message);break;}//smsServer.sendSms(userId,ListenerId,message);// 其他处理逻辑...}}public void subscribeToChannel(String channel) {redisTemplate.execute((RedisCallback<Object>) (connection) -> {connection.subscribe((message, pattern) -> {String channelName = new String(message.getChannel());byte[] body = message.getBody();// 解析接收到的消息switch (channelName){case "reserved":reserveMessage reserveMessage = deserializeMessage(new String(body));handleReserveMessage(channelName, reserveMessage);break;//还有其他的通道 例如refuse就是一个 拒绝通道 专门监听拒绝的理由}}, channel.getBytes());return null;});}// 反序列化private reserveMessage deserializeMessage(String body) {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.readValue(body, reserveMessage.class);} catch (IOException e) {// 处理反序列化异常e.printStackTrace();return null;}}// 序列化public String serialize(reserveMessage reserveMessage) throws SerializationException {if (reserveMessage == null) {return null;}try {ObjectMapper objectMapper = new ObjectMapper();return objectMapper.writeValueAsString(reserveMessage);} catch (JsonProcessingException e) {throw new SerializationException("Error serializing object", e);}}}

代码解释

  1. subscribeToChannel方法接受一个channel参数,用于指定要订阅的通道名称。
  2. redisTemplate.execute方法用于执行Redis操作,并传入一个RedisCallback回调函数。
  3. 回调函数使用lambda表达式的形式实现,接受一个connection参数,表示与Redis的连接。
  4. 在回调函数中,调用connection.subscribe方法来订阅通道。该方法接受一个回调函数作为参数,用于处理接收到的消息。
  5. 在消息回调函数中,首先从message对象中获取通道名称和消息体。
  6. 使用new String(message.getChannel())将通道名称转换为字符串类型,并存储在channelName变量中。
  7. 使用message.getBody()获取消息体的字节数组表示,并存储在body变量中。
  8. switch语句中,根据通道名称进行不同的处理。在这个例子中,仅处理"reserved"通道。
  9. 对于"reserved"通道的处理,调用deserializeMessage方法将消息体反序列化为reserveMessage对象,并将其存储在名为reserveMessage的局部变量中。
  10. 调用handleReserveMessage方法,将通道名称和反序列化后的reserveMessage对象作为参数进行处理。
  11. handleReserveMessage方法用于处理接收到的保留消息的逻辑。它检查消息类型,并根据类型执行不同的操作。根据消息类型,它调用smsServer.sendSms方法向指定的userIdlistenerId发送短信。

我把消息处理的系统中心化处理,也就是说是这个监听系统他可以监听reserved通道的所有业务类型,我这里列举了四种wait,agree,refuse,over四种 但如果是更大的业务体系 同一个通道可能面临着更多可能性分支  那如果按照第一套的方案 需要对每一个具体业务实现一个监听者 工作量就很大(可能这样耦合会低一些吧)

但是我这样把消息集中处理 然后短信发送系统就专门只做短信发送的事情 xx系统就只做对应的工作 就能把工作上的耦合度大大降低

那么大家应该也注意到我的两个负责序列化和反序列化的方法了吧 这是因为业务需求 要把对象封装成一个类 所以这里的方案就是在信息中心处理器上 自定义一个序列化方案(如果再做得好一点其实可以把这个序列器抽理出来封装为一个抽象方法 用泛型来定义返回结果和参数 这样就能序列化所有引用的类型了)  

遇到的问题:


对了 中途遇到了这样一个错误

错误信息:com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `TopOne.MessageSystem.entity.reserveMessage` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)

原因与分析:

reserveMessage类缺少默认构造函数,这导致Jackson库无法构造该类的实例。错误消息中提到了以下内容:"Cannot construct instance of TopOne.MessageSystem.entity.reserveMessage (no Creators, like default constructor, exist)"。
为了使Jackson能够正确地反序列化对象,需要在reserveMessage类中添加一个默认构造函数。默认构造函数是一个无参数的构造函数,它不需要任何参数来创建对象。
在你的reserveMessage类中

这个是改好的封装类:
 

@Data
public class reserveMessage {private String UserId;private String ListenerId;private String message;public reserveMessage() {// 默认构造函数}public reserveMessage(String userId, String ListenerId,String message) {this.UserId = userId;this.ListenerId = ListenerId;this.message=message;}}

实际业务中的测试

发布服务

订阅服务(监听服务)

测试结果

成功

这里面的打印是代替了原本业务中的短信发送 也算是成了

这一期就到这 感谢观看

相关文章:

  • 剑指offer面试题16 反转链表
  • 【栈】150. 逆波兰表达式求值
  • 面向对象编程入门:掌握C++类的基础(1/3)
  • MCU中断控制
  • CSRNET图像修复,DNN
  • http协议与apache
  • STM32_ESP8266 连接阿里云 操作图解
  • CSS中伪元素和伪类的区别和作用?
  • Vue3实现带动画效果的tab栏切换
  • Elasticsearch:创建自定义 ES Rally tracks 的分步指南
  • C++结合Lambda表达式在函数内部实现递归
  • MapboxGL JS⽀持哪些地图样式和交互控件?
  • 「数据结构」哈希表2:实现哈希表
  • MySQL高级特性篇(7)-数据库版本控制与迁移
  • SpringSecurity安全框架
  • Angular Elements 及其运作原理
  • css选择器
  • ES6 学习笔记(一)let,const和解构赋值
  • Java|序列化异常StreamCorruptedException的解决方法
  • JavaScript 一些 DOM 的知识点
  • JavaScript对象详解
  • Logstash 参考指南(目录)
  • MQ框架的比较
  • php中curl和soap方式请求服务超时问题
  • rc-form之最单纯情况
  • WinRAR存在严重的安全漏洞影响5亿用户
  • 测试如何在敏捷团队中工作?
  • 纯 javascript 半自动式下滑一定高度,导航栏固定
  • 二维平面内的碰撞检测【一】
  • 浮动相关
  • 看图轻松理解数据结构与算法系列(基于数组的栈)
  • 前端自动化解决方案
  • 巧用 TypeScript (一)
  • 如何选择开源的机器学习框架?
  • 使用iElevator.js模拟segmentfault的文章标题导航
  • 以太坊客户端Geth命令参数详解
  • 走向全栈之MongoDB的使用
  • 机器人开始自主学习,是人类福祉,还是定时炸弹? ...
  • 选择阿里云数据库HBase版十大理由
  • ​ArcGIS Pro 如何批量删除字段
  • # 飞书APP集成平台-数字化落地
  • #QT(智能家居界面-界面切换)
  • $().each和$.each的区别
  • $.type 怎么精确判断对象类型的 --(源码学习2)
  • (02)vite环境变量配置
  • (2020)Java后端开发----(面试题和笔试题)
  • (附源码)springboot建达集团公司平台 毕业设计 141538
  • (附源码)springboot青少年公共卫生教育平台 毕业设计 643214
  • (转)原始图像数据和PDF中的图像数据
  • .apk 成为历史!
  • .CSS-hover 的解释
  • .net core 客户端缓存、服务器端响应缓存、服务器内存缓存
  • .NET 除了用 Task 之外,如何自己写一个可以 await 的对象?
  • .NET 同步与异步 之 原子操作和自旋锁(Interlocked、SpinLock)(九)
  • .NET 中 GetProcess 相关方法的性能