当前位置: 首页 > news >正文

RabbitMQ怎么保证可靠性

RabbitMQ怎么保证可靠性

  • 前言
  • 生产端问题
    • 解决方案
    • 代码
    • 验证
  • RabbitMQ问题
  • 消费端问题
    • 解决方案
    • 代码
    • 验证
  • 总结

前言

RabbitMQ相信大家都非常熟悉了,今天咱们来聊聊怎么保证RabbitMQ的可靠性。

那什么时候会出现问题呢?

第一种是生产端出现的问题。我们向队列中发送消息的时候,消息不一定可以发送到MQ中,这个时候如果我们不做任何处理,这样消息丢失了。

第二种则是RabbitMQ出现的问题。也就是说现在生产端的成功将消息发送到了RabbitMQ,但由于MQ并没有做持久化,这样宕机重启之后消息可能就丢失了。

第三种则是消费端的问题。消费端处理消息时如果出现异常,默认的解决方式是在重复消费多次,当次数超过阈值时直接删除消息,这也导致消息丢失。

接下来咱们就看看怎么应对以上三种问题。

生产端问题

解决方案

这里我们需要清楚发送的一个大体流程。

生产端发送消息到MQ之后,会收到一个结果,这个结果有acknack两种。

其中ack代表消息成功到达了交换机,但并不意味者消息到达了队列。不过ack的情况下消息未送达队列,会有相应的错误信息提醒。

nack就代表消息并未送达交换机

那么,怎么才能知道消息发送情况呢?

可以设置callback来获取消息发送结果。

代码

局部callback设置如下

    @GetMapping("testmq")public Result testmq(){String orderId = String.valueOf(UUID.randomUUID());String messageData = "下订单!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map=new HashMap<>();map.put("orderId",orderId);map.put("messageData",messageData);map.put("createTime",createTime);//        设置发送的callbackCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());correlationData.getFuture().addCallback(result -> {// 判断结果if (result.isAck()) {log.info("发送成功");} else {log.error("消息未达到交换机,发送失败");}}, ex -> {log.error("出现异常,发送失败");});rabbitTemplate.convertAndSend(RabbitMQConfig.NORMALEXCHANGE, RabbitMQConfig.TESTROUTING, map, message -> {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}, correlationData);return Result.succ("ok");}

验证

消息发送成功
在这里插入图片描述
交换机名称有误
在这里插入图片描述
队列路由出错
虽然没有错误,但给了我们warning。
在这里插入图片描述

RabbitMQ问题

这里就比较简单了,那就是做下持久化就可以了

首先是交换机,队列和消息的持久化
交换机

@BeanDirectExchange normalExchange() {/*** durable 是否持久化* autoDelete 没有queue绑定时是否自动删除*/return new DirectExchange(NORMALEXCHANGE, true, false);}

队列

@Beanpublic Queue cleanDQueue() {return QueueBuilder.durable(CLEANQUEUE).build();}

消息的持久化

rabbitTemplate.convertAndSend(RabbitMQConfig.NORMALEXCHANGE, RabbitMQConfig.TESTROUTING, map, message -> {// 设置消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}, correlationData);

消费端问题

解决方案

消费端出现错误时,会进行重试,当重试次数超过阈值之后有三种解决方案,如下

  • RejectAndDontRequeueRecoverer:超过阈值,直接丢失消息。
  • ImmediateRequeueMessageRecoverer:超过阈值,返回nack,然后消息重新入队。
  • RepublishMessageRecoverer:超过阈值直接将消费失败的消息投递到指定交换机。

这里我们以RepublishMessageRecoverer为例做下演示。

代码

首先需要声明消费消息失败后传递的交换机和队列

   @BeanDirectExchange normalExchange() {/*** durable 是否持久化* autoDelete 没有queue绑定时是否自动删除*/return new DirectExchange(NORMALEXCHANGE, true, false);}
//  用于处理消费失败消息的队列@Beanpublic Queue republishQueue() {return QueueBuilder.durable(REPULISHQUEUE).build();}
//  绑定失败消费消息队列@BeanBinding bindingRepublish() {return BindingBuilder.bind(republishQueue()).to(normalExchange()).with(REPULISHROUTING);}

然后配置下RepublishMessageRecoverer策略,随便找个config注入下bean就可以。

 //  设置RepublishMessageRecoverer,消费失败的消息转移到另一队列中,交给管理员手动处理@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {/*** NORMALEXCHANGE   接收消费失败消息的交换机* REPULISHROUTING  接收消费失败消息的路由key*/return new RepublishMessageRecoverer(rabbitTemplate, NORMALEXCHANGE, REPULISHROUTING);}

验证

咱们看看如果消费出错会咋样

我们可以看到被消费的队列中信息被删除了。
在这里插入图片描述
然后我们设置的转入队列中的消息数加一,这时候我们可以接收下该队列中的信息,存储到数据库中,方便维护人员手动进行处理。
在这里插入图片描述

总结

从生产端、RabbitMQ以及消费端三方面介绍了一下怎么保证RabbitMQ的可靠性,另外还有关于死信队列和延迟队列的内容在这篇博客中,大家有兴趣可以看一下。

RabbitMQ的死信队列和延迟队列

在这里插入图片描述

相关文章:

  • [C#]使用OpenCvSharp图像滤波中值滤波均值滤波高通滤波双边滤波锐化滤波自定义滤波
  • 国产操作系统上Vim的详解03--使用Vundle插件管理器来安装和使用插件 _ 统信 _ 麒麟 _ 中科方德
  • 数据结构与算法笔记:基础篇 - 散列表(下):为什么散列表和链表经常会一起使用?
  • linux flask | 接口保持在后台一直运行、python后端接口长期调用、python后台持续运行方法、python提供后端接口
  • 2024上海初中生古诗文大会倒计时4个多月:单选题真题和独家解析
  • 使用亮数据代理IP爬取PubMed文章链接和邮箱地址
  • 常见八大排序(纯C语言版)
  • Vue2工程化
  • python的视频处理FFmpeg库使用
  • 深入解析MongoDB中的锁机制
  • Web UI自动化测试_Selenium+Python
  • 杰理AC632N提升edr的hid传输速率, 安卓绝对坐标触摸点被识别成鼠标的修改方法
  • c++处理string类型的工具和常用方法总结
  • 一文搞懂大模型训练加速框架 DeepSpeed 的使用方法!
  • 03-07Java自动化之JAVA基础之循环
  • 03Go 类型总结
  • CSS选择器——伪元素选择器之处理父元素高度及外边距溢出
  • E-HPC支持多队列管理和自动伸缩
  • HTTP 简介
  • Javascripit类型转换比较那点事儿,双等号(==)
  • Javascript Math对象和Date对象常用方法详解
  • js写一个简单的选项卡
  • Python十分钟制作属于你自己的个性logo
  • React Native移动开发实战-3-实现页面间的数据传递
  • React Transition Group -- Transition 组件
  • Sass 快速入门教程
  • Travix是如何部署应用程序到Kubernetes上的
  • vuex 笔记整理
  • Web设计流程优化:网页效果图设计新思路
  • 构造函数(constructor)与原型链(prototype)关系
  • 开发了一款写作软件(OSX,Windows),附带Electron开发指南
  • 你不可错过的前端面试题(一)
  • 区块链分支循环
  • 设计模式走一遍---观察者模式
  • 想写好前端,先练好内功
  • 一起来学SpringBoot | 第十篇:使用Spring Cache集成Redis
  • 用quicker-worker.js轻松跑一个大数据遍历
  • shell使用lftp连接ftp和sftp,并可以指定私钥
  • ​马来语翻译中文去哪比较好?
  • !$boo在php中什么意思,php前戏
  • # 数论-逆元
  • # 消息中间件 RocketMQ 高级功能和源码分析(七)
  • #考研#计算机文化知识1(局域网及网络互联)
  • $(this) 和 this 关键字在 jQuery 中有何不同?
  • (二)WCF的Binding模型
  • (附表设计)不是我吹!超级全面的权限系统设计方案面世了
  • (附源码)ssm高校运动会管理系统 毕业设计 020419
  • (一)搭建springboot+vue前后端分离项目--前端vue搭建
  • (转)原始图像数据和PDF中的图像数据
  • (轉貼) VS2005 快捷键 (初級) (.NET) (Visual Studio)
  • .NET MVC、 WebAPI、 WebService【ws】、NVVM、WCF、Remoting
  • .net MVC中使用angularJs刷新页面数据列表
  • .NET 材料检测系统崩溃分析
  • .NET 简介:跨平台、开源、高性能的开发平台
  • .pub是什么文件_Rust 模块和文件 - 「译」