当前位置: 首页 > news >正文

【RabbitMQ】RabbitMq消息丢失、重复消费以及消费顺序性的解决方案

RabbitMq消息丢失主要是有三种情况:生产者消息未发送到服务端、服务端消息没有做持久化导致丢失、消费端未收到消息。解决方案依次如下:

  • 开启事务或使用确认机制。对于一些重要的消息,生产者可以开启事务,确保消息发送成功后再提交事务。或者使用确认机制,等待 MQ 服务器的确认响应,确认消息已成功接收
  • 服务端消息持久化以及高可用机制:开启消息的持久化机制,正常消息是存储在内存当中的,开启持久化机制之后,会将消息存储在磁盘当中。还可以开启镜像集群以及自从同步(Raft协议)机制,保证服务宕机之后我的消息不会丢失
  • 消费者确认机制:开启消费者确认机制为auto,由spring确认消息处理成功后完成 ack,如果多次没有收到ack就会重复发送,多次没有就会将消息发送到一场交换机

消息重复消费问题,主要就是一个幂等性的一个问题(无论是一个操作执行多少次,产生的结果合执行一次是相同的),解决方案有几个方面:数据库层面、业务层面、分布式系统层面

  • 数据库层面:
    • 唯一索引:对于数据库层面可以使用唯一索引,对于插入操作,可以在数据表中设置唯一索引,确保相同的数据不会被重复插入。
    • 乐观锁:可以对插入数据添加一个版本号字段,每一次更新数据时,先检查版本号是否与预期一致,如果一致就进行增加版本号,,否则不进行操作
  • 业务层面:
    • 状态控制:对于一些像是点赞的问题,可以通过判断它的状态来进行判断是否执行,就比如我当前点赞那么就将状态设置为1,表示已点赞状态。当再次执行点赞操作时,通过检查当前状态,如果已经是 1,则不进行重复的点赞操作,直接返回已点赞的结果或者忽略此次请求,从而保证了无论点赞操作被执行多少次,最终的效果都是一致的
    • 去重表:创建一个去重表,用于记录已经处理过的操作。在执行关键操作之前,先检查去重表中是否存在相应的记录。如果存在,则说明该操作已经执行过,直接返回结果;如果不存在,则执行操作并在去重表中插入一条记录。
  • 分布式系统层面:
    • 分布式锁:在分布式的环境当中使用分布式锁来保证同一时间只有一个节点能够执行特定操作,就可以避免重复执行
    • 全局唯一ID:为每个操作生成一个全局唯一的 ID,在执行操作时将这个 ID 作为参数传递给服务。服务在处理操作时,先检查是否已经处理过具有相同 ID 的操作。如果已经处理过,则直接返回结果;如果没有处理过,则执行操作并记录这个 ID

消费顺序性问题,主要有以下几个层面来解决:生成者层面,消费队列层面、消费者层面

  • 生产者层面:单线程发送,确保同一个业务流程的消息由同一个生产者线程发送,这样可以保证消息到达队列的顺序性
  • 消费的队列层面:分区或者队列划分,对于支持分区或多个队列的消息队列系统,可以将需要保证顺序的消息发送到同一个分区或队列中
  • 消费者层面:
    • 单线程消费,消费者使用给单线程来消费特定的分区合队列中的消息,可以确保消息按照消息队列顺序进行消费
    • 业务层面判断:消费者可以在本地存储已处理消息的状态,例如记录已处理的消息的序号或唯一标识。在消费下一个消息之前,检查该消息的序号是否符合预期顺序,如果不符合则等待或进行相应的处理
    • 重试机制:当消费消息出现错误需要重试时,要确保重试的消息不会打乱顺序。可以将重试的消息放入一个单独的队列或延迟处理,等前面的消息都处理完成后再进行重试

相关文章:

  • 媒界:助力民生保障 长城“消防炮”即将批量交付硬核守护万家灯火
  • 【hot100-java】【最长公共子序列】
  • 数据加密标准(DES)详解:原理、步骤及Python实现
  • Python连接Kafka收发数据等操作
  • MySQl查询分析工具 EXPLAIN ANALYZE
  • SpringSecurity -- 入门使用
  • 在某服务中,两方法递归调用导致堆栈溢出
  • 【第十六章:Sentosa_DSML社区版-机器学习之生存分析】
  • “投其所招”-智能投标领军者丨OPENAIGC开发者大赛高校组AI创作力奖|
  • 基于RepLKNet31B模型在RML201610a数据集上的调制识别【代码+数据集+python环境+GUI系统】
  • Rust 全局变量的最佳实践 lazy_static/OnceLock/Mutex/RwLock
  • # linux从入门到精通(三)
  • UDP通信
  • [数据结构] 二叉树题目 (二)
  • 阿博图书馆管理系统:SpringBoot技术应用
  • [nginx文档翻译系列] 控制nginx
  • Android Studio:GIT提交项目到远程仓库
  • CSS魔法堂:Absolute Positioning就这个样
  • ES学习笔记(10)--ES6中的函数和数组补漏
  • Intervention/image 图片处理扩展包的安装和使用
  • jquery ajax学习笔记
  • Redis学习笔记 - pipline(流水线、管道)
  • vue中实现单选
  • 发布国内首个无服务器容器服务,运维效率从未如此高效
  • 前端之React实战:创建跨平台的项目架构
  • 巧用 TypeScript (一)
  • 如何设计一个比特币钱包服务
  • 线上 python http server profile 实践
  • 新版博客前端前瞻
  • 原生js练习题---第五课
  • k8s使用glusterfs实现动态持久化存储
  • 昨天1024程序员节,我故意写了个死循环~
  • # 透过事物看本质的能力怎么培养?
  • #HarmonyOS:软件安装window和mac预览Hello World
  • (2)(2.4) TerraRanger Tower/Tower EVO(360度)
  • (2)STM32单片机上位机
  • (C语言)深入理解指针2之野指针与传值与传址与assert断言
  • (floyd+补集) poj 3275
  • (LNMP) How To Install Linux, nginx, MySQL, PHP
  • (vue)el-checkbox 实现展示区分 label 和 value(展示值与选中获取值需不同)
  • (二)什么是Vite——Vite 和 Webpack 区别(冷启动)
  • (二)延时任务篇——通过redis的key监听,实现延迟任务实战
  • (附源码)spring boot儿童教育管理系统 毕业设计 281442
  • (附源码)springboot 个人网页的网站 毕业设计031623
  • (附源码)ssm户外用品商城 毕业设计 112346
  • (三) diretfbrc详解
  • (四)activit5.23.0修复跟踪高亮显示BUG
  • (四)进入MySQL 【事务】
  • (算法)求1到1亿间的质数或素数
  • (转)用.Net的File控件上传文件的解决方案
  • (自适应手机端)响应式新闻博客知识类pbootcms网站模板 自媒体运营博客网站源码下载
  • .gitignore文件设置了忽略但不生效
  • .NET Micro Framework初体验(二)
  • .NET 动态调用WebService + WSE + UsernameToken
  • .NET 中的轻量级线程安全