当前位置: 首页 > news >正文

RabbitMQ消息队列(九):Publisher的消息确认机制

 在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack。那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理。

在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care。他只是将自己的状态publish给上层,由上层的逻辑去处理。如果Message没有被正确处理,可能会导致某些状态丢失。但是由于提供了其他强制刷新全部状态的机制,因此这种异常情况的影响也就可以忽略不计了。

对于某些异步操作,比如客户端需要创建一个FileSystem,这个可能需要比较长的时间,甚至要数秒钟。这时候通过RPC可以解决这个问题。因此也就不存在Publisher端的确认机制了。

那么,有没有一种机制能保证Publisher能够感知它的Message有没有被处理的?答案肯定的。在这里感谢笑天居士同学:他在我的《RabbitMQ消息队列(三):任务分发机制》文后留言一起讨论了问题,而且也查找了一些资料。在这里我整理了一下他转载和一篇文章和原创的一篇文章。衔接已经附后。

1. 事务机制 VS Publisher Confirm

如果采用标准的 AMQP 协议,则唯一能够保证消息不会丢失的方式是利用事务机制 — 令 channel 处于 transactional 模式、向其 publish 消息、执行 commit 动作。在这种方式下,事务机制会带来大量的多余开销,并会导致吞吐量下降 250% 。为了补救事务带来的问题,引入了 confirmation 机制(即 Publisher Confirm)。

为了使能 confirm 机制,client 首先要发送 confirm.select 方法帧。取决于是否设置了 no-wait 属性,broker 会相应的判定是否以 confirm.select-ok 进行应答。一旦在 channel 上使用 confirm.select方法,channel 就将处于 confirm 模式。处于 transactional 模式的 channel 不能再被设置成 confirm 模式,反之亦然。
一旦 channel 处于 confirm 模式,broker 和 client 都将启动消息计数(以 confirm.select 为基础从 1 开始计数)。broker 会在处理完消息后,在当前 channel 上通过发送 basic.ack 的方式对其进行 confirm 。delivery-tag 域的值标识了被 confirm 消息的序列号。broker 也可以通过设置 basic.ack 中的 multiple 域来表明到指定序列号为止的所有消息都已被 broker 正确的处理了。

在异常情况中,broker 将无法成功处理相应的消息,此时 broker 将发送 basic.nack 来代替 basic.ack 。在这个情形下,basic.nack 中各域值的含义与 basic.ack 中相应各域含义是相同的,同时 requeue 域的值应该被忽略。通过 nack 一或多条消息,broker 表明自身无法对相应消息完成处理,并拒绝为这些消息的处理负责。在这种情况下,client 可以选择将消息 re-publish 。

在 channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被 nack 一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm 又被 nack 。

2. 消息在什么时候确认

broker 将在下面的情况中对消息进行 confirm :

  • broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)
  • 非持久属性的消息到达了其所应该到达的所有 queue 中(和镜像 queue 中)
  • 持久消息到达了其所应该到达的所有 queue 中(和镜像 queue 中),并被持久化到了磁盘(被 fsync)
  • 持久消息从其所在的所有 queue 中被 consume 了(如果必要则会被 acknowledge)

broker 会丢失持久化消息,如果 broker 在将上述消息写入磁盘前异常。在一定条件下,这种情况会导致 broker 以一种奇怪的方式运行。例如,考虑下述情景:

1. 一个 client 将持久消息 publish 到持久 queue 中
2. 另一个 client 从 queue 中 consume 消息(注意:该消息具有持久属性,并且 queue 是持久化的),当尚未对其进行 ack
3. broker 异常重启
4. client 重连并开始 consume 消息

在上述情景下,client 有理由认为消息需要被(broker)重新 deliver 。但这并非事实:重启(有可能)会令 broker 丢失消息。为了确保持久性,client 应该使用 confirm 机制。如果 publisher 使用的 channel 被设置为 confirm 模式,publisher 将不会收到已丢失消息的 ack(这是因为 consumer 没有对消息进行 ack ,同时该消息也未被写入磁盘)。

3. 编程实现

首先要区别AMQP协议mandatory和immediate标志位的作用。

mandatory和immediate是AMQP协议中basic.pulish方法中的两个标志位,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。具体区别在于:
1. mandatory标志位
当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。
2. immediate标志位
当immediate标志位设置为true时,如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

具体的代码参考请参考参考资料1.

参考资料:

1. http://blog.csdn.net/jiao_fuyou/article/details/21594205

2. http://blog.csdn.net/jiao_fuyou/article/details/21594947

3. http://my.oschina.net/moooofly/blog/142095



相关文章:

  • 减治算法求n个数中的最小数的位置
  • spark2.1.0 自定义AccumulatorV2累加少值(线程不安全)?
  • heartbeat+ldirectord实现web与dns的高可用性
  • __new__ 是什么鬼
  • C#面向对象20 序列化和反序列化
  • SecureCRT 只用 RZ 命令上传大文件失败
  • Ubuntu 10.04下安装libgtk2.0-dev
  • MySQL多实例介绍及配置
  • Java类与对象初始化的过程(一道经典的面试题)
  • EF架构~性能高效的批量操作(Insert篇)
  • user-agent 验证移动端请求
  • python用zipfile模块打包文件或是目录、解压zip文件实例
  • 模块化(学习笔记)
  • HDU 4048 Zhuge Liang's Stone Sentinel Maze [组合数学+Burnside]
  • swap file *.swp already exists问题解决!!!
  • 【跃迁之路】【733天】程序员高效学习方法论探索系列(实验阶段490-2019.2.23)...
  • 2017 前端面试准备 - 收藏集 - 掘金
  • Android组件 - 收藏集 - 掘金
  • echarts花样作死的坑
  • Java知识点总结(JavaIO-打印流)
  • JS 面试题总结
  • LeetCode29.两数相除 JavaScript
  • Linux编程学习笔记 | Linux多线程学习[2] - 线程的同步
  • mysql中InnoDB引擎中页的概念
  • Python语法速览与机器学习开发环境搭建
  • scrapy学习之路4(itemloder的使用)
  • SpingCloudBus整合RabbitMQ
  • 关于Java中分层中遇到的一些问题
  • 力扣(LeetCode)56
  • 深入浅出webpack学习(1)--核心概念
  • 线性表及其算法(java实现)
  • 用Node EJS写一个爬虫脚本每天定时给心爱的她发一封暖心邮件
  • 字符串匹配基础上
  • #if 1...#endif
  • (C语言)输入自定义个数的整数,打印出最大值和最小值
  • (react踩过的坑)antd 如何同时获取一个select 的value和 label值
  • (附源码)springboot掌上博客系统 毕业设计063131
  • (全注解开发)学习Spring-MVC的第三天
  • ***详解账号泄露:全球约1亿用户已泄露
  • 、写入Shellcode到注册表上线
  • .apk 成为历史!
  • .Net Framework 4.x 程序到底运行在哪个 CLR 版本之上
  • .NET HttpWebRequest、WebClient、HttpClient
  • .NET 程序如何获取图片的宽高(框架自带多种方法的不同性能)
  • .NET 中什么样的类是可使用 await 异步等待的?
  • .NET单元测试
  • .NET导入Excel数据
  • .NET连接数据库方式
  • @Mapper作用
  • @property python知乎_Python3基础之:property
  • [.net] 如何在mail的加入正文显示图片
  • [<事务专题>]
  • [AIGC] Kong:一个强大的 API 网关和服务平台
  • [Android]一个简单使用Handler做Timer的例子
  • [BZOJ 4129]Haruna’s Breakfast(树上带修改莫队)