当前位置: 首页 > news >正文

T-3.2-把Redis当作消息队列合不合适

List队列

如果想把Redis当作消息队列来使用,最先想到的就是List这个数据类型,List的底层是一个双向链表,在头部和尾部操作元素的时间复杂度是O(1),所以List符合消息队列的模型

生产者使用lpush发布消息

127.0.0.1:6379> lpush queue msg1
(integer) 1
127.0.0.1:6379> lpush queue msg2
(integer) 2

消费者使用rpop拉取消息

127.0.0.1:6379> rpop queue
"msg1"
127.0.0.1:6379> rpop queue
"msg2"

List作为消息队列的模型:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AiTyF829-1664716808981)(T-3.2-%E6%8A%8ARedis%E5%BD%93%E4%BD%9C%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E5%90%88%E4%B8%8D%E5%90%88%E9%80%82.assets/image-20221002190354612.png)]

但是这样会有一个问题,如果队列中已经没有消息了,就会返回null

127.0.0.1:6379> rpop queue
(nil)

我们在编写消费者逻辑时一般是一个死循环,这个循环会不断地从队列里面拉取消息,伪代码如下:

while(true){
    msg = redis.rpop("queue");
    if(msg == null)continue;
    //处理消息
    handle(msg);
}

如果此时队列为空,消费者就会一直拉取消息这会造成CPU空转,浪费CPU资源,还会增加Redis的压力

那怎么解决呢?

当队列为空的时候,我们让消费者休眠一段时间,再去拉取消息,代码修改为:

while(true){
	msg = redis.rpop("queue");
	if(msg == null){
		//没有消息就睡眠3秒
		sleep(3);
		continue;
	}
	//处理消息
	handle(msg);
}

虽然CPU空转的问题解决了,但是又有了一个新的问题,如果在休眠的过程中有新的消息来了,消费者不能第一时间获取新的消息,那么就会存在延迟

Redis提供了blpop和brpop,这里的b指的是block

使用blpop和brpop,当队列为空的时候,消费者拉取消息时就阻塞等待,有了消息才返回

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ObqnKriB-1664716808982)(T-3.2-%E6%8A%8ARedis%E5%BD%93%E4%BD%9C%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E5%90%88%E4%B8%8D%E5%90%88%E9%80%82.assets/image-20221002191910652.png)]

现在可以这样修改代码

while(true){
	//0表示不设置超时时间
	msg = redis.brpop("queue", 0);
	if(msg == null)continue;
	//处理消息
	handle(msg);
}

如果设置了超时时间,那么在指定的时间后会返回null

注意:如果设置的超时时间太长,这个连接太久没有活跃,可能会被Redis Server判定为无效连接,然后会把这个客户端踢下线,所以如果使用这种方案,客户端需要有重连机制

总结一下List消息队列模型的缺点:

  • 不支持重复消费:当其中一个消费者拉取消息后,这条消息就从List中删除了,其他的消费者就不能再次消费了
  • 消息丢失:消费者拉取消息后,如果发生了宕机,那么这条消息就丢失了

发布/订阅模型:Pub/Sub

在分析List消息队列的时候有两个问题,其中一个就是不支持重复消费,Pub/Sub正好可以解决这个问题

启动两个消费者,订阅同一个队列

127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1

此时两个消费者都会被阻塞,等待新消息的到来

启动一个生产者,发布消息

127.0.0.1:6379> publish queue msg1
(integer) 2

此时,两个消费者就会解除阻塞,收到生产者发来的消息

127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
1) "message"
2) "queue"
3) "msg1"

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2Ub5CZsi-1664716808982)(T-3.2-%E6%8A%8ARedis%E5%BD%93%E4%BD%9C%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E5%90%88%E4%B8%8D%E5%90%88%E9%80%82.assets/image-20221002193518367.png)]

Pub/Sub的底层就是维护了一个字典,key就是频道,value是一个链表,当一个消费者订阅一个频道的时候,就会把该消费者添加到key对应的链表中,生产者发布消息就是将消息发送给链表中所有的消费者

Pub/Sub支持阻塞式拉取消息,还支持重复消费,但是他的缺点是:

  • 消息丢失

消费者宕机,Redis宕机,消息堆积都有可能造成消息丢失,原因如下:

Pub/Sub没有基于任何数据类型,也没有做任何的数据存储,它只是单纯的为生产者和消费者建立数据转发通道,把符合规则的数据从一端转发到另一端,在这个过程中没有任何的数据存储,一切都是实时转发的

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ytIQIXrn-1664716808983)(T-3.2-%E6%8A%8ARedis%E5%BD%93%E4%BD%9C%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E5%90%88%E4%B8%8D%E5%90%88%E9%80%82.assets/image-20221002194254212.png)]

如果消费者宕机,新的消息会因为找不到消费者被丢弃,当消费者重新上线,就只能接收新的消息,所以在使用Pub/Sub时,必须先订阅频道,生产者才能发布消息,否则消息会丢失

此外,因为没有持久化,所以Pub/Sub的操作不会写入到RDB或者AOF中,当redis宕机重启,消息也会全部丢失

最后就是消息积压的时候,每一个消费者订阅一个队列时,Redis都会这个给消费者分配一个缓冲区,当生产者发布消息时,Redis就会把消息写入到对应消费者的缓冲区里,然后消费者再从缓冲区中读取消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eF91K0Ix-1664716808983)(T-3.2-%E6%8A%8ARedis%E5%BD%93%E4%BD%9C%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E5%90%88%E4%B8%8D%E5%90%88%E9%80%82.assets/image-20221002195008428.png)]

如果消费者读取消息慢,会造成消息积压,缓冲区内存持续增长,如果超过了配置的上线,Redis会强制把这个消费者踢下线,这样消费者就消费失败,造成数据丢失

总结一下Pub/Sub的优缺点:

  • 优点:阻塞式拉取消息,支持订阅发布
  • 缺点:不支持数据持久化,消费者宕机,Redis宕机,消息堆积都会造成消息丢失

还有一点,如果消费者处理消息失败,也无法再次重新消费,因为消息已经从缓冲区里删除了

Pub/Sub有点鸡肋,目前只有哨兵集群和Redis实例通信时,采用了 Pub/Sub 的方案,因为哨兵正好符合即时通讯的业务场景

趋于成熟的队列:Stream

Stream是Redis的一个项目disque集成到Redis里的,通过xadd和xread完成最简单的生产、消费模型

生产者发布消息

127.0.0.1:6379> xadd queue * name zhangsan
"1664712059246-0"
127.0.0.1:6379> xadd queue * name lisi
"1664712066901-0"

[*]代表自动生成唯一消息id,格式是时间戳+自增序号

消费者拉取消息

127.0.0.1:6379> xread streams queue 0-0
1) 1) "queue"
   2) 1) 1) "1664712059246-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1664712066901-0"
         2) 1) "name"
            2) "lisi"

0-0表示拉取所有消息

如果想继续拉取消息,需要传入上一条消息的id

127.0.0.1:6379> xread streams queue 1664712066901-0
(nil)

没有消息会返回null

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ciWAkI3R-1664716808984)(T-3.2-%E6%8A%8ARedis%E5%BD%93%E4%BD%9C%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E5%90%88%E4%B8%8D%E5%90%88%E9%80%82.assets/image-20221002200551777.png)]

现在来看一下Stream是怎么解决消息队列的要求的

  • 阻塞式拉取消息

只需要增加block参数即可,这时消费者会阻塞,直到接收到新的消息

127.0.0.1:6379> xread block 0 streams queue 1664712066901-0
1) 1) "queue"
   2) 1) 1) "1664712542730-0"
         2) 1) "name"
            2) "erha"
  • 订阅/发布模式

xgroup:创建消费者组

xreadgroup:在指定消费组下,开启消费者拉取消息

生产者发布两个消息

127.0.0.1:6379> xadd queue * name zhangsan
"1664712939101-0"
127.0.0.1:6379> xadd queue * name lisi
"1664712943974-0"

创建两个消费组

# group1从头拉取消息
127.0.0.1:6379> xgroup create queue group1 0-0
OK
# group2从头拉取消息
127.0.0.1:6379> xgroup create queue group2 0-0
OK

第一个消费者消费第一个消费组

127.0.0.1:6379> xreadgroup group group1 consumer streams queue >
1) 1) "queue"
   2) 1) 1) "1664712939101-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1664712943974-0"
         2) 1) "name"
            2) "lisi"

第二个消费者消费第二个消费组

127.0.0.1:6379> xreadgroup group group2 consumer streams queue >
1) 1) "queue"
   2) 1) 1) "1664712939101-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1664712943974-0"
         2) 1) "name"
            2) "lisi"

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5fv45BXc-1664716808984)(T-3.2-%E6%8A%8ARedis%E5%BD%93%E4%BD%9C%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E5%90%88%E4%B8%8D%E5%90%88%E9%80%82.assets/image-20221002202424899.png)]

  • 消息处理异常时,Stream保证消息不丢失

除了拉取消息时用到了消息ID,这里为了保证重新消费,也要用到这个消息ID,当一个消费者处理完消息后,需要执行xack命令告知Redis,这时Redis就会把这条消息标记为处理完成

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xWpr2nml-1664716808985)(T-3.2-%E6%8A%8ARedis%E5%BD%93%E4%BD%9C%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E5%90%88%E4%B8%8D%E5%90%88%E9%80%82.assets/image-20221002204249676.png)]

如果消费者异常或者宕机,就不会发送xack,那么Redis就会依旧保留这条消息,等到消费者重新上线后,Redis就会把之前没有处理成功的数据,重新发给这个消费者,这样一来,即使消费者异常,也不会丢失数据了

  • Stream数据会写入到RDB和AOF做持久化

Stream是新增加的数据类型,它与其它数据类型一样,会持久化到RDB或者AOF中,这样就算 Redis宕机重启,Stream中的数据也可以从RDB或AOF中恢复回来

  • Stream处理消息堆积

其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:

生产者限流:避免消费者处理不及时,导致持续积压

丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息

而 Redis 在实现 Stream 时,采用了第 2 个方案

在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸

127.0.0.1:6379> xadd queue maxlen 1000 * name zhangsan
"1664715484247-0"

当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息,这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的

既然它的功能这么强大,这是不是意味着,Redis真的可以作为专业的消息队列中间件来使用呢?

不行,就算Redis能做到以上这些,也只是趋近于专业的消息队列

与专业的消息队列对比

一个专业的消息队列,必须要做到两大块:

  • 消息不丢
  • 消息可堆积

我们从一个消息队列的使用模型来分析一下

使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-z39Bd56h-1664716808985)(T-3.2-%E6%8A%8ARedis%E5%BD%93%E4%BD%9C%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E5%90%88%E4%B8%8D%E5%90%88%E9%80%82.assets/image-20221002210228988.png)]

先从消息不丢分析

消息是否会发生丢失,其重点也就在于以下 3 个环节:

生产者会不会丢消息?

消费者会不会丢消息?

队列中间件会不会丢消息?

  • 生产者会不会丢消息?

当生产者在发布消息时,可能发生以下异常情况:

1.消息没发出去:网络故障或其它问题导致发布失败,中间件直接返回失败

2.不确定是否发布成功:网络问题导致发布超时,可能数据已发送成功,但读取响应结果超时了

如果是情况 1,消息根本没发出去,那么重新发一次就好了

如果是情况 2,生产者没办法知道消息到底有没有发成功?所以,为了避免消息丢失,它也只能继续重试,直到发布成功为止

生产者一般会设定一个最大重试次数,超过上限依旧失败,需要记录日志报警处理

也就是说,生产者为了避免消息丢失,只能采用失败重试的方式来处理,这也意味着消息可能会重复发送,在使用消息队列时,要保证消息不丢,宁可重发,也不能丢弃

那消费者这边,就需要多做一些逻辑了,对于敏感业务,当消费者收到重复数据数据时,要设计幂等逻辑,保证业务的正确性

从这个角度来看,生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理

所以,无论是Redis还是专业的队列中间件,生产者在这一点上都是可以保证消息不丢的

  • 消费者会不会丢消息

这种情况就是我们前面提到的,消费者拿到消息后,还没处理完成,就异常宕机了或者处理消息异常,那消费者还能否重新消费失败的消息?

要解决这个问题,消费者在处理完消息后,必须告知队列中间件,队列中间件才会把标记已处理,否则仍旧把这些数据发给消费者

这种方案需要消费者和中间件互相配合,才能保证消费者这一侧的消息不丢

无论是Redis的Stream,还是专业的队列中间件,例如RabbitMQ、Kafka,其实都是这么做的

所以,从这个角度来看,Redis 也是合格的

  • 队列中间件会不会丢消息

Redis 在以下 2 个场景下,都会导致数据丢失

1.AOF持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能

2.主从复制也是异步的,主从切换时,也存在丢失数据的可能(从库还未同步完成主库发来的数据,就被提成主库)

所以,如果把Redis当做消息队列,在这方面是有可能导致数据丢失的

再来看那些专业的消息队列中间件是如何解决这个问题的

像RabbitMQ或Kafka这类专业的队列中间件,在使用时,一般是部署一个集群,生产者在发布消息时,队列中间件通常会写多个节点,以此保证消息的完整性,这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失也正因为如此,RabbitMQ、Kafka在设计时也更复杂,毕竟,它们是专门针对队列场景设计的,但 Redis 的定位则不同,它的定位更多是当作缓存来用,它们两者在这个方面肯定是存在差异的

  • 消息积压怎么办

因为Redis的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致Redis的内存持续增长,如果超过机器内存上限,就会面临被OOM的风险

但Kafka、RabbitMQ这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多,当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加坦然

综上,我们可以看到,把Redis当作队列来使用时,始终面临的2个问题:

1.Redis本身可能会丢数据

2.面对消息积压,Redis内存资源紧张

如果业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把Redis 当作队列是完全可以的

而且,Redis 相比于 Kafka、RabbitMQ,部署和运维也更加轻量

如果业务场景对于数据丢失非常敏感,而且写入量非常大,消息积压时会占用很多的机器资源,那么建议使用专业的消息队列中间件

总结

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7kkUKL0J-1664716808986)(T-3.2-%E6%8A%8ARedis%E5%BD%93%E4%BD%9C%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E5%90%88%E4%B8%8D%E5%90%88%E9%80%82.assets/image-20221002211804398.png)]

相关文章:

  • 简单个人静态HTML网页设计作品 DIV布局个人介绍网页模板代码 DW个人网站制作成品 web网页制作与实现
  • java基于springboot+element的实现医院预约挂号系统 nodejs
  • 在vue项目中使用canvas实现甘特图
  • 【C++】之模板进阶
  • 100天精通Python(数据分析篇)——第58天:Pandas读写数据库(read_sql、to_sql)
  • Bean的生命周期
  • 哈希桶(详解创建)
  • 回归预测 | MATLAB实现SSA-BP多输入单输出回归预测
  • 【雅思备考】听说读写攻略 | 雅思核心词汇之科技类
  • Python-列表,从基础到进阶用法大总结,进来查漏补缺
  • JDBC模拟SQL注入和避免SQL注入
  • flink在企业IT架构中如何定位-在选型流批一体技术与大数据架构时的避坑指南
  • JUC并发编程之CompletableFuture基础用法
  • SpringBoot+Mybatis-Plus多数据源使用
  • Colab-免费GPU算力
  • 2017届校招提前批面试回顾
  • Android框架之Volley
  • Git同步原始仓库到Fork仓库中
  • Java教程_软件开发基础
  • js写一个简单的选项卡
  • mongodb--安装和初步使用教程
  • React 快速上手 - 07 前端路由 react-router
  • 高度不固定时垂直居中
  • 给Prometheus造假数据的方法
  • 使用Tinker来调试Laravel应用程序的数据以及使用Tinker一些总结
  • 我与Jetbrains的这些年
  • 无服务器化是企业 IT 架构的未来吗?
  • 一个JAVA程序员成长之路分享
  • 源码安装memcached和php memcache扩展
  • 翻译 | The Principles of OOD 面向对象设计原则
  • ​LeetCode解法汇总1276. 不浪费原料的汉堡制作方案
  • #mysql 8.0 踩坑日记
  • (3)Dubbo启动时qos-server can not bind localhost22222错误解决
  • (zt)基于Facebook和Flash平台的应用架构解析
  • (附源码)计算机毕业设计SSM基于java的云顶博客系统
  • (附源码)计算机毕业设计SSM智慧停车系统
  • (十)DDRC架构组成、效率Efficiency及功能实现
  • *p++,*(p++),*++p,(*p)++区别?
  • ./和../以及/和~之间的区别
  • .NET 5.0正式发布,有什么功能特性(翻译)
  • .net core 实现redis分片_基于 Redis 的分布式任务调度框架 earth-frost
  • .NET/C# 中设置当发生某个特定异常时进入断点(不借助 Visual Studio 的纯代码实现)
  • .NetCore 如何动态路由
  • /etc/X11/xorg.conf 文件被误改后进不了图形化界面
  • @RequestParam详解
  • @Transient注解
  • [1]-基于图搜索的路径规划基础
  • [Android]如何调试Native memory crash issue
  • [C]编译和预处理详解
  • [cocos2d-x]关于CC_CALLBACK
  • [Contiki系列论文之2]WSN的自适应通信架构
  • [c语言]小课堂 day2
  • [C语言]一维数组二维数组的大小
  • [Enterprise Library]调用Enterprise Library时出现的错误事件之关闭办法
  • [G-CS-MR.PS02] 機巧之形2: Ruler Circle