当前位置: 首页 > news >正文

RabbitMQ如何保证可靠性

在RabbitMQ中可以将消息传递的链路简化成如下图:

从上图可以发现,主要分为三个角色:Producer、Consumer、RabbitMQ Broker

正常情况下,Producer生产消息,安全的到打Broker的Exchange,然后根据转发规则,存储在Queue上,最后再推送给订阅的Consumer。

因此可以将链路分成三个部分:

1.消息从Producer到RabbitMQ的Exchange

2.消息从Exchange到Queue

3.消息从Queue到Consumer

当然还有一种情况,就是消息已经到达服务器了,但服务器挂了,因此还需要有持久化来恢复消息。

综上,要想保证RabbitMQ的可靠性,可以从这四个方面去入手:

1.保证消息从Producer到达Broker的Exchange

2.保证消息从Exchange转发到对应的Queue上

3.保证消息的持久化

4.保证消息正确被消费者消费

发布确认机制

发布确认主要分为两种模式:confirm模式和return模式

confirm模式是针对消息从Producer到Exchange

return模式是针对消息从Exchange到Queue

Confirm模式

Spring中yml配置:

spring:rabbitmq:port: 5672host: xxxusername: adminpassword: adminvirtual-host: demo#消息发送确认publisher-confirm-type: correlated

因为这个机制是在发送方这边,因此在创建RabbitTemplate时配置。

代码:

    @Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactoryconnectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack,String cause) {System.out.printf("");if (ack){System.out.printf("消息接收成功, id:%s \n",correlationData.getId());}else {System.out.printf("消息接收失败, id:%s, cause: %s",correlationData.getId(), cause);}}});return rabbitTemplate;}

RabbitMQ无法去区分要确认的是哪条消息,因此在发送消息的时候还要添加一个标识。

代码:

    public String fanout(){for (int i = 0; i < 10; i++) {CorrelationData correlationData = new CorrelationData("" + i);rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", "hello——" + i, correlationData);}return "发送成功~";}

Return模式

消息到达Exchange后,会根据路由规则进行转发到队列中,但可能存在队列与路由键不匹配或者没有绑定的队列等,消息无法转发到队列中,我们可以设置对这种情况的队列回退到Producer。

yml配置同上

代码:

    @Bean("confirmRabbitmqTemplate2")//回退模式public RabbitTemplate confirmRabbitTemplate2(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息被退回");}});return rabbitTemplate;}

生产者代码同上

持久化机制

持久化主要针对的是交换机、队列、消息。

代码:

//交换机持久化
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();//队列持久化
QueueBuilder.durable(Constants.WORK_QUEUE).build();//消息持久化
Message message = new Message("nihao".getBytes(), new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);User user = new User(i, "test--" + i, 10 + i);rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, user);

消息确认机制

消息确认机制主要是用在消费者这边,当消费这从Broker获取了消息,处理完后需要发送一个Ack告诉Broker,然后Broker就可以将消息删除了。

在RabbitMQ中可以设置两种模式:手动确认和自动确认

但在Spring-AMQP中提供了三种机制:NONE、MANUAL、AUTO

NONE:表示不做任何处理,消息一旦发出去就会被确认删除。

AUTO:与NONE不同的是,如果消费者在处理消息的时候发生异常了,并不会确认消息。

MANUAL:手动确认,消费者需要显示调用ACK。

xml配置:

spring:rabbitmq:port: 5672host: xxxusername: adminpassword: adminvirtual-host: demolistener:simple:
#        acknowledge-mode: none
#        acknowledge-mode: autoacknowledge-mode: manual

代码:

    @RabbitListener(queues = Constants.WORK_QUEUE)public void listenerQueue1(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try{System.out.println("listener1 : " + message);int a = 3 / 0;channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}

总结:

保证消息的可靠性的方式总共分为四个方式:

1.保证消息从Producer到达Broker的Exchange -> Confirm模式

2.保证消息从Exchange转发到对应的Queue上 -> Return模式

3.保证消息的持久化 -> 设置durable参数

4.保证消息正确被消费者消费 -> 手动应答

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 文档控件DevExpress Office File API v24.1 - 支持基于Unix系统的打印
  • 正则表达式扩展应用
  • Linux/C 高级——shell脚本
  • elasticsearch教程
  • 学习记录——day28 信号量集
  • 未来展望:PLC远程控制网关与工业物联网融合的发展趋势
  • 【Linux】系列入门摘抄笔记-4-查看文件内容命令cat/more/less/tail
  • web基础与http协议与配置
  • 美的神机后续
  • 【Datawhale AI夏令营第四期】 Datawhale AI夏令营第四期 魔搭-AIGC方向 Task01笔记
  • Android 文件上传与下载
  • 引导过程与服务控制
  • springbootAl农作物病虫害预警系统-计算机毕业设计源码21875
  • 数据库|SQLServer数据库:数据的基本查询
  • 应急响应:D盾的简单使用.
  • @angular/forms 源码解析之双向绑定
  • 《Java8实战》-第四章读书笔记(引入流Stream)
  • 【技术性】Search知识
  • 【许晓笛】 EOS 智能合约案例解析(3)
  • 【跃迁之路】【733天】程序员高效学习方法论探索系列(实验阶段490-2019.2.23)...
  • HashMap剖析之内部结构
  • LeetCode18.四数之和 JavaScript
  • LeetCode541. Reverse String II -- 按步长反转字符串
  • npx命令介绍
  • Vue ES6 Jade Scss Webpack Gulp
  • win10下安装mysql5.7
  • 创建一种深思熟虑的文化
  • 得到一个数组中任意X个元素的所有组合 即C(n,m)
  • 分布式任务队列Celery
  • 机器学习学习笔记一
  • 排序算法之--选择排序
  • 如何在GitHub上创建个人博客
  • 用jquery写贪吃蛇
  • 与 ConTeXt MkIV 官方文档的接驳
  • 在weex里面使用chart图表
  • ​14:00面试,14:06就出来了,问的问题有点变态。。。
  • ​secrets --- 生成管理密码的安全随机数​
  • #我与Java虚拟机的故事#连载19:等我技术变强了,我会去看你的 ​
  • (ctrl.obj) : error LNK2038: 检测到“RuntimeLibrary”的不匹配项: 值“MDd_DynamicDebug”不匹配值“
  • (pytorch进阶之路)CLIP模型 实现图像多模态检索任务
  • (react踩过的坑)Antd Select(设置了labelInValue)在FormItem中initialValue的问题
  • (Redis使用系列) SpirngBoot中关于Redis的值的各种方式的存储与取出 三
  • (附源码)spring boot火车票售卖系统 毕业设计 211004
  • .NET CF命令行调试器MDbg入门(一)
  • .NET MVC、 WebAPI、 WebService【ws】、NVVM、WCF、Remoting
  • .NET/C# 利用 Walterlv.WeakEvents 高性能地定义和使用弱事件
  • .NET/C#⾯试题汇总系列:集合、异常、泛型、LINQ、委托、EF!(完整版)
  • .net实现头像缩放截取功能 -----转载自accp教程网
  • .NET使用存储过程实现对数据库的增删改查
  • /proc/interrupts 和 /proc/stat 查看中断的情况
  • @Data注解的作用
  • @Mapper作用
  • [2019红帽杯]Snake
  • [Algorithm][综合训练][kotori和气球][体操队形][二叉树中的最大路径和]详细讲解
  • [Android]竖直滑动选择器WheelView的实现