当前位置: 首页 > news >正文

深入理解和应用RabbitMQ的Work Queues模型

文章目录

        • 1. 场景模拟
        • 2. 消息发送
        • 3. 消息接收
        • 4. 测试
        • 5. 能者多劳
        • 6. 总结

当你在处理消息时,可能会遇到这样的问题:消息的生产速度远远大于消费速度,导致消息堆积。这时候,Work Queues(工作队列)模型就能派上用场。简单来说,Work Queues 让多个消费者绑定到一个队列,共同消费队列中的消息,从而加快消息处理速度。

1. 场景模拟

我们来模拟一个这样的场景。首先,在控制台创建一个名为 work.queue 的队列。

2. 消息发送

我们通过循环发送大量消息来模拟消息堆积的现象。在 publisher 服务中的 SpringAmqpTest 类中添加一个测试方法:

@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}
3. 消息接收

为了模拟多个消费者绑定同一个队列,我们在 consumer 服务的 SpringRabbitListener 中添加两个新的方法:

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

注意到这两个消费者都设置了 Thread.sleep 来模拟任务耗时:

  • 消费者1:Thread.sleep(20),相当于每秒处理50个消息。
  • 消费者2:Thread.sleep(200),相当于每秒处理5个消息。
4. 测试

启动 ConsumerApplication 后,执行 publisher 服务中编写的发送测试方法 testWorkQueue。结果如下:

消费者1接收到消息:【hello, message_0】21:06:00.869555300
消费者2........接收到消息:【hello, message_1】21:06:00.884518
...
消费者1接收到消息:【hello, message_48】21:06:01.920702500
消费者2........接收到消息:【hello, message_49】21:06:05.723106700

可以看到,消费者1和消费者2各自消费了25条消息:

  • 消费者1快速完成了任务。
  • 消费者2则缓慢处理任务。

消息是平均分配给每个消费者的,并没有考虑到各个消费者的处理能力,导致一个消费者空闲,另一个忙碌。这显然是低效的。

5. 能者多劳

spring 中,可以通过简单配置解决这个问题。修改 consumer 服务的 application.yml 文件,添加如下配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次测试,结果如下:

消费者1接收到消息:【hello, message_0】21:12:51.659664200
消费者2........接收到消息:【hello, message_1】21:12:51.680610
...
消费者2........接收到消息:【hello, message_49】21:12:52.746299900

这次,消费者1处理了更多的消息,消费者2则处理了较少的消息,总耗时在1秒左右,大大提升了效率。这充分利用了每一个消费者的处理能力,有效避免了消息积压问题。

6. 总结

Work Queues 模型的使用要点:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。
  • 通过设置 prefetch 来控制消费者预取的消息数量。

这样可以更高效地利用资源,提高消息处理速度。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 00 cadence学习笔记目录
  • python-约瑟夫环(赛氪OJ)
  • Python 爬虫项目实战一:抖音视频下载与网易云音乐下载
  • 什么是DNS缓存?DNS缓存有哪些作用和危害?
  • 六大设计原则和23种设计模式
  • Linux-vim编辑器以及权限-04
  • Docker资源隔离的实现策略以及适用场景
  • 利用formdata自动序列化和xhr上传表单到后端
  • github项目-创建一个新分支
  • HarmonyOS Flex布局
  • 【博客搭建 第二篇章】项目中怎么引入其他的 icon
  • NLP——Transfromer 架构详解
  • HarmonyOS鸿蒙应用开发之Text组件的使用
  • gogs的安装和使用(docker)
  • [Bugku] web-CTF靶场系列系列详解⑥!!!
  • [rust! #004] [译] Rust 的内置 Traits, 使用场景, 方式, 和原因
  • 【跃迁之路】【669天】程序员高效学习方法论探索系列(实验阶段426-2018.12.13)...
  • 4个实用的微服务测试策略
  • el-input获取焦点 input输入框为空时高亮 el-input值非法时
  • js学习笔记
  • MySQL QA
  • Python学习笔记 字符串拼接
  • Tornado学习笔记(1)
  • vue2.0开发聊天程序(四) 完整体验一次Vue开发(下)
  • 对象管理器(defineProperty)学习笔记
  • 力扣(LeetCode)357
  • 巧用 TypeScript (一)
  • 融云开发漫谈:你是否了解Go语言并发编程的第一要义?
  • 提醒我喝水chrome插件开发指南
  • 在 Chrome DevTools 中调试 JavaScript 入门
  • ​​​​​​​GitLab 之 GitLab-Runner 安装,配置与问题汇总
  • # Maven错误Error executing Maven
  • ### Error querying database. Cause: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
  • (02)vite环境变量配置
  • (Git) gitignore基础使用
  • (Matlab)使用竞争神经网络实现数据聚类
  • (多级缓存)多级缓存
  • (附源码)springboot社区居家养老互助服务管理平台 毕业设计 062027
  • (一)appium-desktop定位元素原理
  • (一)Kafka 安全之使用 SASL 进行身份验证 —— JAAS 配置、SASL 配置
  • (转)3D模板阴影原理
  • *算法训练(leetcode)第三十九天 | 115. 不同的子序列、583. 两个字符串的删除操作、72. 编辑距离
  • .Net Core webapi RestFul 统一接口数据返回格式
  • .NET Core实战项目之CMS 第一章 入门篇-开篇及总体规划
  • .net 按比例显示图片的缩略图
  • .NET 分布式技术比较
  • .netcore 如何获取系统中所有session_如何把百度推广中获取的线索(基木鱼,电话,百度商桥等)同步到企业微信或者企业CRM等企业营销系统中...
  • .net遍历html中全部的中文,ASP.NET中遍历页面的所有button控件
  • .NET程序员迈向卓越的必由之路
  • .NET运行机制
  • .vollhavhelp-V-XXXXXXXX勒索病毒的最新威胁:如何恢复您的数据?
  • 。。。。。
  • /var/log/cvslog 太大
  • ??Nginx实现会话保持_Nginx会话保持与Redis的结合_Nginx实现四层负载均衡
  • @CacheInvalidate(name = “xxx“, key = “#results.![a+b]“,multi = true)是什么意思