当前位置: 首页 > news >正文

Rabbit高级特性 - 消息重试机制(两种实现)

文章目录

  • 消息重试机制
    • 概述
    • 实现方式一:基于消息手动确认机制,返回 nack 实现
      • 配置文件
      • 交换机、队列、绑定
      • 生产者接口
      • 消费者
      • 演示和结论
    • 实现方式二:基于重试配置实现
      • 配置文件
      • 交换机、队列、绑定
      • 生产者接口
      • 消费者
      • 演示和结论

消息重试机制


概述

消息重试机制就是在消息处理失败之后重新发送,主要时为了解决消息发送过程可能会出现的问题,例如 网络故障、服务临时不可用 等.

Ps:如果时程序逻辑引起的错误,那么即使重试多少次都是没有用的,但是可以通过配置重试次数来解决.

实现方式一:基于消息手动确认机制,返回 nack 实现

配置文件

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manual # 手动确认

交换机、队列、绑定

    @Bean("ackExchange")fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE)@Bean("ackQueue")fun ackQueue() = Queue(MQConst.ACK_QUEUE)@Beanfun ackBinding(@Qualifier("ackExchange") exchange: DirectExchange,@Qualifier("ackQueue") queue: Queue,): Binding {return BindingBuilder.bind(queue).to(exchange).with(MQConst.ACK_BINDING)}

生产者接口

@RestController
@RequestMapping("/mq3")
class MQ3Api(val rabbitTemplate: RabbitTemplate
) {@RequestMapping("/ack")fun ack(): String {rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")return "ok"}}

消费者

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {val deliveryTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")val a = 1 / 0channel.basicAck(deliveryTag, false)} catch (e: Exception) {//通过返回 nack,并设置 requeue 为 ture 实现消息重新入队,并进行重试channel.basicNack(deliveryTag, false, true) }}}

演示和结论

在这里插入图片描述
deliverTag 自增的原因: 引发异常后,会返回 nack,并且参数 requeue = true,表示重新入队,然后进行重试,将队列中的消息再次发送给生产者,因此 deliverTag 会自增.

缺点: 如果是由于程序逻辑异常引起的重试,那么无论重试多少次都没用,并且不断重试会导致负载飙升,性能下降.

实现方式二:基于重试配置实现

配置文件

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: auto # 开启重试机制,这里必须是 auto,否则不生效!retry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 失败等待时常max-attempts: 5 # 最大重试次数(包括第一次消费)

Ps:开启重试机制,acknowledge-mode 必须指定为 auto,否则不生效!

交换机、队列、绑定

    @Bean("ackExchange")fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE)@Bean("ackQueue")fun ackQueue() = Queue(MQConst.ACK_QUEUE)@Beanfun ackBinding(@Qualifier("ackExchange") exchange: DirectExchange,@Qualifier("ackQueue") queue: Queue,): Binding {return BindingBuilder.bind(queue).to(exchange).with(MQConst.ACK_BINDING)}

生产者接口

    @RequestMapping("/ack")fun ack(): String {rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")return "ok"}

消费者

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")val a = 1 / 0}}

演示和结论

在这里插入图片描述
deliverTag 不自增的原因: 因为是消息已经发出去了,即使失败了也不会重回队列,而是直接重新发一遍消息.

好处: 不仅可以控制重试次数(防止类似于上面讲到的确认应答引起的无限重试),还可以控制每次重试的间隔时间(防止负载飙升).

在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • nextjs 实现TodoList网页应用案例
  • 分布式存储ceph知识点整理
  • Flink单机和集群环境部署教程
  • opencv 三维重建基础
  • PDF转Markdown的利器(MinerU版)
  • jupyter项目使用Anaconda环境内核
  • 算法---动态规划专练(1)
  • Spring boot tomcat使用自定义线程池监控线程数量告警
  • 云手机在海外社交媒体运营中的作用
  • 【视觉SLAM】 十四讲ch7习题
  • 使用Docker Compose进行容器编排的最佳实践
  • 产业园物业满意度调研指标设计
  • c++中的标准库
  • C++标准模板(STL)- 类型支持 (类型属性,检查类型是否拥有强结构相等性,std::has_strong_structural_equality)
  • 清除 Nuxt 状态缓存:clearNuxtState
  • 实现windows 窗体的自己画,网上摘抄的,学习了
  • [译]Python中的类属性与实例属性的区别
  • 【comparator, comparable】小总结
  • 【刷算法】从上往下打印二叉树
  • CSS中外联样式表代表的含义
  • docker-consul
  • ES学习笔记(10)--ES6中的函数和数组补漏
  • JAVA_NIO系列——Channel和Buffer详解
  • Javascript设计模式学习之Observer(观察者)模式
  • java中具有继承关系的类及其对象初始化顺序
  • Next.js之基础概念(二)
  • redis学习笔记(三):列表、集合、有序集合
  • 从地狱到天堂,Node 回调向 async/await 转变
  • 第三十一到第三十三天:我是精明的小卖家(一)
  • 前端代码风格自动化系列(二)之Commitlint
  • 深度学习在携程攻略社区的应用
  • 微信端页面使用-webkit-box和绝对定位时,元素上移的问题
  • 详解NodeJs流之一
  • SAP CRM里Lead通过工作流自动创建Opportunity的原理讲解 ...
  • 翻译 | The Principles of OOD 面向对象设计原则
  • ​Java并发新构件之Exchanger
  • #php的pecl工具#
  • (4)通过调用hadoop的java api实现本地文件上传到hadoop文件系统上
  • (C++17) optional的使用
  • (pt可视化)利用torch的make_grid进行张量可视化
  • (第27天)Oracle 数据泵转换分区表
  • (未解决)jmeter报错之“请在微信客户端打开链接”
  • (一)搭建springboot+vue前后端分离项目--前端vue搭建
  • (一)模式识别——基于SVM的道路分割实验(附资源)
  • (一)项目实践-利用Appdesigner制作目标跟踪仿真软件
  • .NET CF命令行调试器MDbg入门(四) Attaching to Processes
  • .NET CORE 第一节 创建基本的 asp.net core
  • .net core 6 redis操作类
  • .net core使用RPC方式进行高效的HTTP服务访问
  • .NET Framework .NET Core与 .NET 的区别
  • .NET处理HTTP请求
  • @Autowired和@Resource的区别
  • @JsonFormat与@DateTimeFormat注解的使用
  • [ JavaScript ] JSON方法
  • [ Linux ] Linux信号概述 信号的产生