当前位置: 首页 > news >正文

Redis 实现消息队列

Redis 实现消息队列

文章目录

  • Redis 实现消息队列
    • 导引
    • 1. 基于List结构的消息队列
    • 2. 基于PubSub的消息队列
    • 3. 基于Stream的消息队列(推荐)
      • 3.1 XADD
      • 3.2 XREAD
      • 3.3 XGROUP

导引

消息队列(Message Queue),从概念上来理解就是用来存放消息的队列,最简单的消息队列模型包括以下三个角色:

  • 生产者:发送消息到消息队列
  • 消息队列:存储和管理信息,也被称为消息代理(Message Broker)
  • 消费者:从消息队列中获取消息并处理消息

Redis也为我们提供了三种不同的方式来实现消息队列:

  1. List结构:基于List结构模拟消息队列
  2. PubSub:基本的点对点消息模型
  3. Stream:比较完善的消息队列模型(推荐

1. 基于List结构的消息队列

这种方式比较简单,因为Redis的list数据结构是一个双向链表,很容易模拟出队列的效果。

队列是入口和出口不在一边,对此我们可以利用:LPUSH 结合 BRPOP,或者 RPUSH 结合 BLPOP 来实现先进先出的效果

在这里插入图片描述

:这里使用BRPOP而不是RPOP是因为BRPOP能够实现阻塞的效果而RPOP不能

使用该方式实现消息队列的优缺点如下:

优点

  • 利用Redis存储,不受限与JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 能够满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

2. 基于PubSub的消息队列

PubSub(发布订阅),是Redis2.0版本引入的消息传递模型,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能收到相关消息

它有以下命令:

  • SUBSCRIBE channel [channel]:订阅一个或多个频道

    在这里插入图片描述

  • PUBLISH channel msg:向一个频道发送消息

    在这里插入图片描述

  • PSUBSCRIBE pattern[pattern]:订阅与pattern格式匹配的所有频道

    在这里插入图片描述

具体操作如下所示:

在这里插入图片描述

该方式实现的消息队列支持多消费者的使用,但也存在着以下弊端:

  • 不能支持数据持久化,一旦redis宕机数据就会丢失
  • 无法避免消息丢失
  • 消息堆积有上限,超出上限后数据会丢失

3. 基于Stream的消息队列(推荐)

Stream是Redis5.0引入的一种新数据类型,能够实现功能完善的消息队列,因为它本身就是一个消息队列,所以我们可以直接通过命令来使用它:

3.1 XADD

作用:发送消息

在这里插入图片描述

其中:

  • key:队列名称

  • [NOMKSTREAM]:如果队列不存在,是否自动创建队列,默认是自动创建

  • [MAXLEN|MINID [=|~] threshold [LIMIT count]]:设置消息队列的最大消息数量

  • *|ID:消息的唯一id,表示由Redis自动生成,格式是“时间戳-递增数字”,一般推荐使用来自动生成

  • field value [field value …]:发送到队列中的消息,以键值对的格式录入,可以多个同时录入

举个栗子🌰:

创建一个名为 users 的队列,并向其中发送一个消息,内容是:{name=Json, age=25},并使用Redis自动生成ID

在这里插入图片描述

3.2 XREAD

作用:读取消息

在这里插入图片描述

其中:

  • [COUNT count]:指定每次读取消息的最大数量
  • [BLOCK milliseconds]:当队列中没有消息时,阻塞指定时长,单位为秒
  • STREAMS key [key …]:要从哪个队列中读取消息,key就是队列名,可以指定多个队列
  • ID [ID …]起始ID,只返回大于该ID的消息,其中0代表从第一个消息开始,$代表从最新的消息开始

举个栗子🌰:

读取users队列中的第一个消息

在这里插入图片描述

:在上述测试中我们只往users队列中添加了一个消息,这个时候如果ID使用$来获取最新消息,且设置了阻塞等待的话,此时读取信息将在阻塞时间过后返回空:

在这里插入图片描述

3.3 XGROUP

消费者组,一个消费者组中可以有多个消费者来操作同一个消息队列

在这里插入图片描述

通常由以下命令组成:

  • 创建消费者组:

    XGROUP Create key groupName ID [MKSTREAM]
    

    其中:

    • key:队列名称
    • groupName:消费者组名称
    • ID:起始ID标识,0代表队列中第一个消息,$代表队列最后一个消息
    • MKSTREAM:队列不存在时自动创建队列

    在这里插入图片描述

    :这里要求队列key已经存在才能创建消费者组,否则需要开启MKSTREAM让其自动创建新的队列

    在这里插入图片描述

  • 删除指定的消费者组:

    XGROUP Destroy key groupName
    

    在这里插入图片描述

  • 给指定消费者组添加消费者

    XGROUP CREATECONSUMER key groupName consumerName
    

    在这里插入图片描述

  • 删除指定消费者组中的消费者

    XGROUP DELCONSUMER key groupName consumerName
    

    在这里插入图片描述

  • 从消费者组中读取消息

    XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
    

    在这里插入图片描述

    其中:

    • group:消费者组名称
    • consumer:消费者名称,如果消费者不存在,会根据该名称自动创建一个消费者
    • count:本次查询的最大数量
    • BLOCK milliseconds:阻塞时间,没有消息时会进行等待,以毫秒为单位
    • NOACK:选择后无需手动ACK,获取到消息后自动确认,一般不建议设置,当我们获取完消息后需要手动确认ack
    • STREAMS key:指定队列名称
    • ID:获取消息的起始ID,它有以下情况:
      • >”:表示从下一个未消费的消息开始
      • 其它:根据指定id从pending-list中获取消息,pending-list用于专门存放那些已消费但未确认的消息;例如此时ID为0,表示获取pending-list中的第一个消息.

    同一个消费者组中的消费者读取同一个消息队列时,若ID使用>来读取,则下一个读取的消息一定是前面的消费者没有读取到的消息,直到消息队列中的消息都被读取过后,最后一个读取的消费者返回nil

    举个栗子🌰:

    我们创建一个队列叫list,再添加几条消息

    在这里插入图片描述

    在这里插入图片描述

    创建一个消费者组g1监听list消息队列

    在这里插入图片描述

    通过XREADGROUP命令为消费者组g1添加消费者c1、c2、c3来读取list队列消息

    在这里插入图片描述

可以看到,同一个消费者组中的消费者,它们都在获取同一个队列中的消息,且ID使用>来读取,下一个读取的消息一定是前面的消费者没有读取到的消息,直到消息全部被读完后只返回nil!

每读取完一条消息,我们需要对它进行手动确认,使其从pending-list中移除,使用下述命令可以查看已读取但还未确认的消息:

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

在这里插入图片描述

我们查看一下list中还有多少未确认的消息:

在这里插入图片描述

好的都没被确认,所以需要我们手动去确认消息,使其从pending-list中移除,操作命令如下:

XACK key group ID [ID ...]

在这里插入图片描述

上述的ID为添加消息时自动创建并返回的ID:

在这里插入图片描述

这样所有已读取的消息就会从pending-list中移除了!

这里贴上该消息队列在Java中的实现方式

//获取消息队列中的信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000ms STEAMS list >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("list", ReadOffset.lastConsumed())
);//ACK确认 SACK list g1 id
// 注:因为这里我们只从消息队列中获取一条信息(COUNT 1),所以list.get()使用索引0即可
stringRedisTemplate.opsForStream().acknowledge("list", "g1", list.get(0).getId()); //获取pending_list中的消息 XREADGROUP GROUP g1 c1 COUNT 1 STEAMS list 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("list", ReadOffset.from("0"))
);
// 上述代码可以配合循环实现被消费者组不断监听的消息队列

以上便是对Redis实现消息队列的介绍了!!如果内容对大家有帮助的话请给这篇文章一个三连关注吧💕( •̀ ω •́ )✧✨

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 拥抱变革:旗晟智能巡检机器人系统重塑高风险行业巡检模式
  • 【算法刷题】合并两个有序链表、获取链表的中间节点、反转链表
  • 【面试经验】24届前端校招 字节、阿里、美团、快手、腾讯面试经验汇总
  • 【扒代码】图像数据 Transformer
  • Eclipse插件之Java Dependency Viewer(显示类和包的关系图)
  • 日志Log程序(C++)
  • 深度学习每周学习总结N6:使用Word2vec实现文本分类
  • Spring Cloud全解析:注册中心之zookeeper注册中心
  • 4.MySQL数据类型
  • 2023华为od机试C卷【围棋的气】python实现
  • 哈萨克语驾考学习软件求推荐?
  • Springboot项目基础开发模式+注解
  • 【香橙派系列教程】(十三) 香橙派的摄像头接入
  • 【Pyspark-驯化】一文搞懂Pyspark修改hive表描述以及增加列使用技巧
  • 简单的射箭小游戏网页源码
  • JavaScript-如何实现克隆(clone)函数
  • -------------------- 第二讲-------- 第一节------在此给出链表的基本操作
  • 【翻译】babel对TC39装饰器草案的实现
  • angular学习第一篇-----环境搭建
  • bearychat的java client
  • JavaWeb(学习笔记二)
  • leetcode378. Kth Smallest Element in a Sorted Matrix
  • log4j2输出到kafka
  • mysql外键的使用
  • Netty 框架总结「ChannelHandler 及 EventLoop」
  • Rancher-k8s加速安装文档
  • Vue.js 移动端适配之 vw 解决方案
  • 阿里中间件开源组件:Sentinel 0.2.0正式发布
  • 成为一名优秀的Developer的书单
  • 分布式事物理论与实践
  • 前嗅ForeSpider中数据浏览界面介绍
  • 用jQuery怎么做到前后端分离
  • 中文输入法与React文本输入框的问题与解决方案
  • # 利刃出鞘_Tomcat 核心原理解析(八)-- Tomcat 集群
  • # 手柄编程_北通阿修罗3动手评:一款兼具功能、操控性的电竞手柄
  • #[Composer学习笔记]Part1:安装composer并通过composer创建一个项目
  • #define 用法
  • #java学习笔记(面向对象)----(未完结)
  • #我与Java虚拟机的故事#连载02:“小蓝”陪伴的日日夜夜
  • #在线报价接单​再坚持一下 明天是真的周六.出现货 实单来谈
  • (2)空速传感器
  • (2022版)一套教程搞定k8s安装到实战 | RBAC
  • (35)远程识别(又称无人机识别)(二)
  • (arch)linux 转换文件编码格式
  • (附源码)springboot课程在线考试系统 毕业设计 655127
  • (六)Hibernate的二级缓存
  • (三)uboot源码分析
  • (十一)图像的罗伯特梯度锐化
  • (四)Linux Shell编程——输入输出重定向
  • (四)进入MySQL 【事务】
  • (转)winform之ListView
  • ***微信公众号支付+微信H5支付+微信扫码支付+小程序支付+APP微信支付解决方案总结...
  • .NET CF命令行调试器MDbg入门(三) 进程控制
  • .NET Core MongoDB数据仓储和工作单元模式封装
  • .Net Core 中间件验签