当前位置: 首页 > news >正文

RabbitMQ高级特性 - 消息分发(限流、负载均衡)

文章目录

  • RabbitMQ 消息分发
    • 概述
    • 如何实现消费分发机制(限制每个队列消息数量)
    • 使用场景
      • 限流
        • 背景
        • 实现 demo
      • 非公平发送(负载均衡)
        • 背景
        • 实现 demo

RabbitMQ 消息分发


概述

RabbitMQ 的队列在有多个消费者订阅时,默认会通过轮询的机制将消息分发给不同的消费者,但是有些消费者消费速度慢,有些消费者消费速度快,就会导致消费速度慢的消费者影响整个的任务的吞吐量下降.

例如,公司有1个正式员工和1个实习生,现在有 10 个任务分配平均给他们(各 5 个),而由于实习生干活比较慢,就会导致整个完成任务的吞吐量下降.

消息分发机制给 “正式工” 多分一些任务,给 “实习生” 少分一些任务.

如何实现消费分发机制(限制每个队列消息数量)

可以在配置文件中配置 prefetchCount(或者使用原生的 channel.basicQos(int prefetchCount)),来限制当前消息通道上(channel)的每一个消费所能保持的最大未确认消息的数量.

例如 prefetchCount 为 10,并且一个 channel 上有两个消费者,那么每个消费者都最多接收 10 条未确认的消息. 此时整个 channel 上未确认消息总数可能达到 20 条.

具体使用:例如配置 prefetch = 5,那么 RabbitMQ 就会为消费者计数. 发送一条消息计数+1,消费一条消息计数-1,当达到了上限5,mq队列 就不会再发送消息,直到消费者确认了某条消息(类似 TCP 中的华滑动窗口).

使用场景

限流

背景

假设,订单系统每秒最多处理 1000 请求,正常情况下,该订单系统可以满足日常使用.
但是在突发的秒杀场景下,请求瞬间增多,每秒 1w qps,这不得把订单系统打成筛子.

问题:mq 在中间的话,不是已经有削峰填谷的作用了么?为什么还要使用 mq 的 prefetch 限流机制?
尽管消息队列可以延缓高峰压力,但消费者的处理能力还是有限的(如果不配置 prefetch,消费者自身从队列中取消息的量是不可控的). 如果消费者一次性取走过多的消息,就可能会导致资源紧张. prefetch 限流就是用来控制每个消费者取消息的数量,确保消费者不会过载.

实现 demo

假设限制未确认消息上限为 5,发送消息数量为 20.

a)配置 prefetch 参数,设置应答方式为手动应答.

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manual # 手动确认prefetch: 5

b)配置交换机队列

@Configuration
class MQConfig {@Beanfun transQueue() = Queue(MQConst.TRANS_QUEUE)@Beanfun qosExchange() = DirectExchange(MQConst.QOS_EXCHANGE)@Beanfun qosQueue() = Queue(MQConst.QOS_QUEUE)@Beanfun qosBinding(): Binding = BindingBuilder.bind(qosQueue()).to(qosExchange()).with(MQConst.QOS_BINDING_KEY)}

c)接口(生产者)

@RestController
@RequestMapping("/mq")
class MQApi(val rabbitTemplate: RabbitTemplate,
) {@RequestMapping("/qos")fun qos(): String {for (i in 1..20) {rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, "qos msg $i")}return "ok"}}

d)消费者

@Component
class QosListener {@RabbitListener(queues = [MQConst.QOS_QUEUE])fun handMessage(message: Message,channel: Channel) {val deliverTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")// 这里不主动应答,模拟超长业务// channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}}

e)效果如下:
可以观察到,消费者只接收到 5 个消息,但由于没有主动应答,队列 就不会给消费者发送新的消息.
在这里插入图片描述
在这里插入图片描述

Ps:此时如果直接关闭程序,这 5 个为应答的消息就会重回队列,成为 Ready 状态.
如下可以直接清理掉这些消息:
在这里插入图片描述

非公平发送(负载均衡)

背景

假设有两个消费者,mq 默认会按照轮询的策略将消息分发给消费者.

*但有一个中情况就比较尴尬:打个比方 一个是正式工,另一个是实习生,正式工就处理的很快,而实习生就很慢,就会造成整个任务的进度被拖慢. *

因此我们可以通过 负载均衡 的方式,让处理的快的消费者多处理一些,处理慢的消费者少处理一些.

具体的:只需要配置 prefetch,并开启自动应答即可. 这样一来,处理的快的消费者,自动应答的就更快,接收的消息也就更多.

实现 demo

a)配置文件

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manual # 手动确认prefetch: 1 # 具体配置为多少,需要根据实际业务以及系统承受能力(压测)

b)生产者

    @RequestMapping("/qos")fun qos(): String {for (i in 1..20) {rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, "qos msg $i")}return "ok"}

c)两个消费者

@Component
class QosListener {@RabbitListener(queues = [MQConst.QOS_QUEUE])fun fastHandMessage(message: Message,channel: Channel) {val deliverTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")Thread.sleep(1000)println("正式工: 任务处理完成!")channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}@RabbitListener(queues = [MQConst.QOS_QUEUE])fun slowHandMessage(message: Message,channel: Channel) {val deliverTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")Thread.sleep(2000)println("实习生: 任务处理完成!")channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}}

d)效果如下:
在这里插入图片描述

在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Leetcode 第 135 场双周赛题解
  • 深入JVM:类加载器和双亲委派模型
  • 如何搭建一个圈子社区系统?开源社交陪玩交友圈子论坛帖子系统保姆级搭建教程!
  • 益九未来CEO曾宪军:创新引领,打造智能售货机行业新标杆
  • vue项目路径使用@报错
  • VS Code C/C++ MSVC编译器
  • 【React 】react 创建项目配置 jsconfig.json 的作用
  • Axure RP界面设计初探:基础操作与实用技巧
  • JavaScript青少年简明教程:异常处理
  • Java 面试常见问题之——static 的用法
  • Android 在布局中tools使用
  • Linux 调试追踪: trace-cmd 和 kernelshark
  • 16个好用到爆的Python实用脚本!
  • 如何用密码保护你的 WordPress 管理员 (wp-admin) 目录
  • 互联网之光与人工智能之光交相辉映,如何抓住5G人工智能红利
  • Druid 在有赞的实践
  • Elasticsearch 参考指南(升级前重新索引)
  • ES6 ...操作符
  • HashMap ConcurrentHashMap
  • IDEA 插件开发入门教程
  • JavaScript 基础知识 - 入门篇(一)
  • JavaScript服务器推送技术之 WebSocket
  • python docx文档转html页面
  • Python_OOP
  • vue和cordova项目整合打包,并实现vue调用android的相机的demo
  • 读懂package.json -- 依赖管理
  • 诡异!React stopPropagation失灵
  • 回顾 Swift 多平台移植进度 #2
  • 入门级的git使用指北
  • 微服务入门【系列视频课程】
  • 写代码的正确姿势
  • 主流的CSS水平和垂直居中技术大全
  • (2)STM32单片机上位机
  • (20050108)又读《平凡的世界》
  • (Demo分享)利用原生JavaScript-随机数-实现做一个烟花案例
  • (poj1.2.1)1970(筛选法模拟)
  • (Redis使用系列) Springboot 使用Redis+Session实现Session共享 ,简单的单点登录 五
  • (ZT) 理解系统底层的概念是多么重要(by趋势科技邹飞)
  • (定时器/计数器)中断系统(详解与使用)
  • (二) Windows 下 Sublime Text 3 安装离线插件 Anaconda
  • (四) 虚拟摄像头vivi体验
  • (五)大数据实战——使用模板虚拟机实现hadoop集群虚拟机克隆及网络相关配置
  • (一)appium-desktop定位元素原理
  • (转)jQuery 基础
  • .NET/C# 使窗口永不获得焦点
  • .net经典笔试题
  • /etc/fstab和/etc/mtab的区别
  • @component注解的分类
  • [.NET 即时通信SignalR] 认识SignalR (一)
  • [.net]官方水晶报表的使用以演示下载
  • [AI 大模型] 百度 文心一言
  • [Algorithm][综合训练][kotori和气球][体操队形][二叉树中的最大路径和]详细讲解
  • [android] 手机卫士黑名单功能(ListView优化)
  • [Android]常见的数据传递方式
  • [Ariticle] 厚黑之道 一 小狐狸听故事