当前位置: 首页 > news >正文

【RabbitMQ】消息堆积、推拉模式

消息堆积

原因

消息堆积是指在消息队列中,待处理的消息数量超过了消费者处理能力,导致消息在队列中不断堆积的现象。通常有以下几种原因:

消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理能力。

消费者处理能力不足:消费者消费消息的速度跟不上消息生产的速度,也会导致消息在队列中积压。可能的原因有:

  • 消费端业务逻辑复杂、耗时长。
  • 消费端代码性能低。
  • 系统资源限制,如CPU、内存、磁盘等也会限制消费者处理消息的速率。
  • 异常处理不当,消费者在处理消息时出现异常,导致消息无法被正确处理和确认。

网络问题:因为网络延迟或不稳定,消费者无法及时接收或确认消息,最终导致消息积压。

RabbitMQ服务器配置偏低

解决方案

消息积压可能会导致系统性能下降,影响用户体验,甚至导致系统崩溃。因此,及时发现消息积压并解决对于维护系统稳定性至关重要。

遇到消息积压时,首先要分析消息积压造成的原因,根据原因来调整策略。通常从以下几个方面考虑:

提高消费者效率

  • 增加消费者实例数量,比如新增机器。
  • 优化业务逻辑,比如使用多线程来处理业务。
  • 设置prefetchCount,当一个消费者阻塞时,消息转发到其他未阻塞的消费者。
  • 消息发生异常时,设置合适的重试策略,或者转入到死信队列。

限制生产者效率:比如流量控制、限流算法等。

  • 流量控制:在消息生产者中实现流量控制逻辑,根据消费者处理能力动态调整发送效率。
  • 限流:使用限流工具,为消息发送效率设置一个上限。
  • 设置过期时间:如果消息过期未被消费,可以配置死信队列,以避免消息丢失,同时减少对主队列的压力。

资源与配置优化:比如省级RabbitMQ服务器的硬件,调整RabbitMQ的配置参数等。

推拉模式

概述

RabbitMQ支持两种消息传递模式:推模式(push)和拉模式(pull)。

推模式:消息中间件主动将消息推送给消费者(对消息的获取更加实时,适合对数据实时性要求较高的业务,例如实时数据处理:监控系统、报表系统等)。

拉模式:消费者主动从消息中间件拉取消息(消费端可以按照自己的处理速度来消费,避免消息积压,适合需要流量控制,或者需要大量计算资源的任务,拉取模式允许消费者准备好之后再请求消息,避免资源浪费)。

RabbitMQ主要就是基于推模式工作的,例如最开始谈到的几种工作模式,都是基于推模式来进行实现。它的设计核心是让消息队列中的消费者接收到由生产者发送的消息,使用channel.basicConsume方式订阅队列,MQ就会把消息推送到订阅该队列的消费者。如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方式来进行消费消息。

SpringBoot方式

@Configuration
public class PushAndPull {@Bean("pushQueue")public Queue pushQueue() {return QueueBuilder.durable(Constants.PUSH_QUEUE).build();}@Bean("pushExchange")public Exchange pushExchange() {return ExchangeBuilder.directExchange(Constants.PUSH_EXCHANGE).durable(true).build();}@Bean("pushQueueBind")public Binding pushQueueBind(@Qualifier("pushExchange") Exchange exchange,@Qualifier("pushQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("push").noargs();}@Bean("pullQueue")public Queue pullQueue() {return QueueBuilder.durable(Constants.PULL_QUEUE).build();}@Bean("pullExchange")public Exchange pullExchange() {return ExchangeBuilder.directExchange(Constants.PULL_EXCHANGE).durable(true).build();}@Bean("pullQueueBind")public Binding pullQueueBind(@Qualifier("pullExchange") Exchange exchange,@Qualifier("pullQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("pull").noargs();}
}
@Slf4j
@RestController
@RequestMapping("/pushAndPull")
public class PushAndPullController {@Resourceprivate RabbitTemplate rabbitTemplate;// 推模式@RequestMapping("/push")public void push() {this.rabbitTemplate.convertAndSend(Constants.PUSH_EXCHANGE, "push", "hello push");System.out.println("推模式消息发送成功!");}// 拉模式@RequestMapping("/pull")public void pull() {this.rabbitTemplate.convertAndSend(Constants.PULL_EXCHANGE, "pull", "hello pull");System.out.println("拉模式消息发送成功!");}}
@Configuration
@RestController
public class PushAndPullListener {// 推模式@RabbitListener(queues = Constants.PUSH_QUEUE)public void push(String message) {System.out.println("推模式: " + message);}@Resourceprivate RabbitTemplate rabbitTemplate;// 拉模式@RequestMapping("/pullConsumer")public void pull(String message) {Message receive = this.rabbitTemplate.receive(Constants.PULL_QUEUE);System.out.println("拉模式: " + receive);}}

SDK方式

// 推模式生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");// TODO 发送消息channel.basicPublish(Constants.PUSH_EXCHANGE, "push", null, "推模式".getBytes());System.out.println("推模式发送消息成功!");// TODO 释放资源channel.close();connection.close();}}
// 推模式消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");// TODO 接收消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("推模式消费者接收到消息:" + new String(body));}};channel.basicConsume(Constants.PUSH_QUEUE, true, consumer);// TODO 释放资源}}
// 拉模式生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");// TODO 发送消息channel.basicPublish(Constants.PULL_EXCHANGE, "pull", null, "拉模式".getBytes());System.out.println("拉模式发送消息成功!");// TODO 释放资源channel.close();connection.close();}}
// 拉模式消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");// TODO 接收消息GetResponse getResponse = channel.basicGet(Constants.PULL_QUEUE, true);if (getResponse != null) {System.out.println("拉模式收到消息:" + new String(getResponse.getBody()));}// TODO 释放资源}}

相关文章:

  • 手机通过安装视频采集APP软件,采用国标28181方式注册到AS-V1000视频监控平台来播放实时监控视频画面
  • 甘蔗茎节检测系统源码分享
  • Spring Boot,在应用程序启动后执行某些 SQL 语句
  • vue初学随笔
  • web群集--rocky9.2部署zabbix服务端的详细过程
  • 【JavaEE】——线程的安全问题和解决方式
  • Vue3使用hiprint——批次打印条码
  • 【初阶数据结构】详解二叉树 - 树和二叉树(三)(递归的魅力时刻)
  • LeetCode(Python)-贪心算法
  • css五种定位总结
  • 什么是共享旅游卡?解析共享旅游创业项目认知与代理攻略
  • 【RabbitMQ】RabbitMQ 的概念以及使用RabbitMQ编写生产者消费者代码
  • 【python qdrant 向量数据库 完整示例代码】
  • HTML开发指南
  • MT6765/MT6762(R/D/M)/MT6761(MT8766)安卓核心板参数比较_MTK联发科4G智能模块
  • 【知识碎片】第三方登录弹窗效果
  • 2017-09-12 前端日报
  • CSS中外联样式表代表的含义
  • Flex布局到底解决了什么问题
  • IndexedDB
  • Javascript编码规范
  • javascript从右向左截取指定位数字符的3种方法
  • JavaScript设计模式之工厂模式
  • jquery cookie
  • sessionStorage和localStorage
  • spring boot 整合mybatis 无法输出sql的问题
  • spring boot下thymeleaf全局静态变量配置
  • unity如何实现一个固定宽度的orthagraphic相机
  • vue-cli3搭建项目
  • 构造函数(constructor)与原型链(prototype)关系
  • 关于使用markdown的方法(引自CSDN教程)
  • 记录一下第一次使用npm
  • 七牛云假注销小指南
  • 驱动程序原理
  • 实现菜单下拉伸展折叠效果demo
  • 世界编程语言排行榜2008年06月(ActionScript 挺进20强)
  • 世界上最简单的无等待算法(getAndIncrement)
  • 用 vue 组件自定义 v-model, 实现一个 Tab 组件。
  • 怎么把视频里的音乐提取出来
  • 中国人寿如何基于容器搭建金融PaaS云平台
  • Hibernate主键生成策略及选择
  • Linux权限管理(week1_day5)--技术流ken
  • mysql 慢查询分析工具:pt-query-digest 在mac 上的安装使用 ...
  • 宾利慕尚创始人典藏版国内首秀,2025年前实现全系车型电动化 | 2019上海车展 ...
  • 翻译 | The Principles of OOD 面向对象设计原则
  • ​Linux·i2c驱动架构​
  • ​Python 3 新特性:类型注解
  • ​力扣解法汇总1802. 有界数组中指定下标处的最大值
  • ​一、什么是射频识别?二、射频识别系统组成及工作原理三、射频识别系统分类四、RFID与物联网​
  • ​一些不规范的GTID使用场景
  • #define用法
  • (1)Jupyter Notebook 下载及安装
  • (2)MFC+openGL单文档框架glFrame
  • (3)选择元素——(14)接触DOM元素(Accessing DOM elements)
  • (6)添加vue-cookie