当前位置: 首页 > news >正文

RabbitMQ之消息确认机制(事务+Confirm)

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?

RabbitMQ为我们提供了两种方式:

通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;

通过将channel设置成confirm模式来实现;

事务机制

这里首先探讨下RabbitMQ事务机制。

RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback(), txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。

关键代码:

通过wirkshark抓包(ip.addr==xxx.xxx.xxx.xxx && amqp),可以看到:

u=2331763415,1330510459&fm=173&app=25&f=JPEG?w=638&h=129&s=D80A5D38CD6361201EFC23DB0300D0B6

(注意这里的Tx.Commit与Tx.Commit-Ok之间的时间间隔294ms,由此可见事务还是很耗时的。)

我们先来看看没有事务的通信过程是什么样的:

u=1155344905,1671010280&fm=173&app=25&f=JPEG?w=640&h=100

可以看到带事务的多了四个步骤:

client发送Tx.Select

broker发送Tx.Select-Ok(之后publish)

client发送Tx.Commit

broker发送Tx.Commit-Ok

下面我们来看下事务回滚是什么样子的。关键代码如下:

u=2645203847,3628655406&fm=173&app=25&f=JPEG?w=640&h=111&s=8D42E112CD665F2010DDF5CE000080B1

同样通过wireshark抓包可以看到:

u=3215471523,1388571863&fm=173&app=25&f=JPEG?w=640&h=128&s=C1144F30C5B140231EF4A5DB0300C0B7

代码中先是发送了消息至broker中但是这时候发生了异常,之后在捕获异常的过程中进行事务回滚。

事务确实能够解决producer与broker之间消息确认的问题,只有消息成功被broker接受,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发,但是使用事务机制的话会降低RabbitMQ的性能,那么有没有更好的方法既能保障producer知道消息已经正确送到,又能基本上不带来性能上的损失呢?从AMQP协议的层面看是没有更好的方法,但是RabbitMQ提供了一个更好的方案,即将channel信道设置成confirm模式。

Confirm模式

概述

上面我们介绍了RabbitMQ可能会遇到的一个问题,即生成者不知道消息是否真正到达broker,随后通过AMQP协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低RabbitMQ的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用Confirm模式。

producer端confirm模式的实现原理

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。

在channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack 。

u=1377913353,2149292461&fm=173&app=25&f=JPEG?w=425&h=202&s=A802F412055849CA1CF880D9000080B3

已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的。

编程模式

对于固定消息体大小和线程数,如果消息持久化,生产者confirm(或者采用事务机制),消费者ack那么对性能有很大的影响.

消息持久化的优化没有太好方法,用更好的物理存储(SAS, SSD, RAID卡)总会带来改善。生产者confirm这一环节的优化则主要在于客户端程序的优化之上。归纳起来,客户端实现生产者confirm有三种编程方式:

普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。

批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。

异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。

从编程实现的复杂度上来看:

第1种

普通confirm模式最简单,publish一条消息后,等待服务器端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。

关键代码如下:

u=3970530814,4268556173&fm=173&app=25&f=JPEG?w=640&h=54

wirkShark抓包可以看到如下:

u=2556769443,2549781351&fm=173&app=25&f=JPEG?w=639&h=202&s=80154F309FB1500B1E7581DF030050B7

第二种

批量confirm模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish消息,然后等待服务器端confirm, 相比普通confirm模式,批量极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。

关键代码:

u=2665545569,756456842&fm=173&app=25&f=JPEG?w=640&h=91

第三种

异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。

关键代码:

u=3565743662,1099335049&fm=173&app=25&f=JPEG?w=640&h=262&s=BD2874331B7950095CF545DA000050B0

SDK中waitForConfirms方法实现:

u=1139880365,2529492621&fm=173&app=25&f=JPEG?w=577&h=537&s=888A703319BEE4CE4E7500DB0000C0B3

性能测试

Client端机器和RabbitMQ机器配置:CPU:24核,2600MHZ, 64G内存,1TB硬盘。

Client端发送消息体大小10B,线程数为1即单线程,消息都持久化处理(deliveryMode:2)。

分别采用事务模式、普通confirm模式,批量confirm模式和异步confirm模式进行producer实验,比对各个模式下的发送性能。

u=1189864115,3521664694&fm=173&app=25&f=JPEG?w=640&h=382&s=85F07033234A4949447D50CA000080B1

发送平均速率:

事务模式(tx):1637.484

普通confirm模式(common):1936.032

批量confirm模式(batch):10432.45

异步confirm模式(async):10542.06

可以看到事务模式性能是最差的,普通confirm模式性能比事务模式稍微好点,但是和批量confirm模式还有异步confirm模式相比,还是小巫见大巫。批量confirm模式的问题在于confirm之后返回false之后进行重发这样会使性能降低,异步confirm模式(async)编程模型较为复杂,至于采用哪种方式,那是仁者见仁智者见智了。

消息确认(Consumer端)

为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。

采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。

当noAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。

RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

RabbitMQ管理平台界面上可以看到当前队列中Ready状态和Unacknowledged状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者但是未收到ack信号的消息数。也可以通过命令行来查看上述信息:

u=3414536488,638105750&fm=173&app=25&f=JPEG?w=640&h=128&s=6D60BB42CFE4BF705A75440F0000A0C1

代码示例(关闭自动消息确认,进行手动ack):

basicRecover:是路由不成功的消息可以使用recovery重新发送到队列中。

basicReject:是接收端告诉服务器这个消息我拒绝接收,不处理,可以设置是否放回到队列中还是丢掉,而且只能一次拒绝一个消息,官网中有明确说明不能批量拒绝消息,为解决批量拒绝消息才有了basicNack。

basicNack:可以一次拒绝N条消息,客户端可以设置basicNack方法的multiple参数为true,服务器会拒绝指定了delivery_tag的所有未确认的消息(tag是一个64位的long值,最大值是9223372036854775807)。

转载于:https://my.oschina.net/rechardchensir/blog/1939619

相关文章:

  • 大屏FAQ
  • java源码 - CountDownLatch
  • 推荐几十本DBA学习的书
  • 利用 Siblings一步实现多个同级div,只改变当前点击的div样式
  • 前端笔记-201808
  • 设置PHP最长运行时间
  • zabbix添加nginx中间件监控
  • 将MWeb的文章发布到自己做的网站(超级详细)
  • SELinux 宽容模式(permissive) 强制模式(enforcing) 关闭(disabled) 几种模式之间的转换...
  • 如何把本地文件上传到gitlab上已存在的工程里
  • Spring事务隔离级别详解
  • 反爬虫:利用ASP.NET MVC的Filter和缓存(入坑出坑)
  • 输入一条url后,发生了什么??
  • 超级鹰打码平台
  • Python中列表生成式和字典生成式练习
  • [PHP内核探索]PHP中的哈希表
  • [数据结构]链表的实现在PHP中
  • 3.7、@ResponseBody 和 @RestController
  • Angular Elements 及其运作原理
  • css的样式优先级
  • extract-text-webpack-plugin用法
  • JavaScript设计模式与开发实践系列之策略模式
  • Mysql优化
  • PyCharm搭建GO开发环境(GO语言学习第1课)
  • Spring-boot 启动时碰到的错误
  • uva 10370 Above Average
  • Webpack入门之遇到的那些坑,系列示例Demo
  • 基于阿里云移动推送的移动应用推送模式最佳实践
  • 使用Envoy 作Sidecar Proxy的微服务模式-4.Prometheus的指标收集
  • 为视图添加丝滑的水波纹
  • 在Docker Swarm上部署Apache Storm:第1部分
  • 中国人寿如何基于容器搭建金融PaaS云平台
  • LevelDB 入门 —— 全面了解 LevelDB 的功能特性
  • # 数论-逆元
  • (2)Java 简介
  • (6)STL算法之转换
  • (TOJ2804)Even? Odd?
  • (二)丶RabbitMQ的六大核心
  • (附源码)计算机毕业设计SSM教师教学质量评价系统
  • (转)C#调用WebService 基础
  • *ST京蓝入股力合节能 着力绿色智慧城市服务
  • .mysql secret在哪_MYSQL基本操作(上)
  • .NET Compact Framework 3.5 支持 WCF 的子集
  • .Net core 6.0 升8.0
  • .NET Core6.0 MVC+layui+SqlSugar 简单增删改查
  • .Net 代码性能 - (1)
  • .NET 指南:抽象化实现的基类
  • .NET/C# 使用反射注册事件
  • .NET实现之(自动更新)
  • .NET下ASPX编程的几个小问题
  • .NET运行机制
  • .NET中使用Redis (二)
  • .sh 的运行
  • ::
  • @configuration注解_2w字长文给你讲透了配置类为什么要添加 @Configuration注解