当前位置: 首页 > news >正文

kafka如何保证消息不丢失?半分钟的答案和半个小时的答案有点不一样。

文章目录

  • 一、Kafka在哪些场景下有丢消息的可能?
  • 二、面试流经典答法
  • 三、为什么金融场景没人会用Kafka?
  • 总结

kafka如何保证消息不丢失? 这是面试最常问到的问题。但是其实这是一个最体现综合实力的开放性题目。把这问题真正弄明白,面试时绝对值得涨个五毛钱的薪水。但是很可惜,很多人把这种问题当成了八股文来背。我见过最简单的回答是,生产者的ack应答机制+消费者手动提交Offset。一分钟答完收工,然后看着我,大眼瞪小眼。

​ 但是真相往往就被八股文们给覆盖了。先说结论,Kafka为了保证他的高性能,高吞吐,牺牲了他的数据安全性。所以,至少目前看来,Kafka是不能保证消息安全的。所以,Kafka大都是用在日志、大数据采集这样的允许消息少量丢失的场景。

​ 具体为什么,跟我一起来分析分析。

一、Kafka在哪些场景下有丢消息的可能?

​ 在MQ场景下,不管是哪一种MQ产品,都有两个通用的丢消息的元凶,就是网络+缓存。所以,关于MQ消息丢失的问题,都可以从以下这几个方面来讨论。

在这里插入图片描述

​ 1、生产者发送消息到MQ,有可能丢消息–跨网络

​ 2、生产者把消息发到MQ后,MQ服务突然崩溃,有可能丢消息–Pagecache缓存

​ 3、MQ中的Server端都会提供主从机制,防止Master节点的单点崩溃。但是往从节点发消息,有可能会丢失。这样如果消息没有同步,而Master节点上的消息又因为缓存的原因丢失,就有可能造成集群丢消息 --跨网络

​ 4、消费者去MQ上拉取消息,有可能丢消息–跨网络

​ 回到Kafka的场景,他的消息模型是这样的:

在这里插入图片描述

二、面试流经典答法

​ 那如果面试官问到Kafka如何保证消息不丢失,你会如何回答呢? 及格线的答法是这样的:

1、生产者端,配置ack应答参数。

​ Kafka的消息生产者Producer,支持定制一个参数,ProducerConfig.ACKS_CONFIG。

  • acks配置为0 : 生产者只负责往Broker端发消息,而不关注Broker的响应。也就是说不关心Broker端有没有收到消息。性能高,但是数据会有丢消息的可能。
  • acks配置为1:当Broker端的Leader Partition接收到消息后,只完成本地日志文件的写入,然后就给生产者答复。其他Partiton异步拉取Leader Partiton的消息文件。这种方式如果其他Partiton拉取消息失败,也有可能丢消息。
  • acks配置为-1或者all:Broker端会完整所有Partition的本地日志写入后,才会给生产者答复。数据安全性最高,但是性能显然是最低的。

​ 有这个acks配置后,Kafka的生产者消息安全性基本就分析到位了。另外还有一些加分项,就是如果你能考虑到发送消息的幂等性,防止生产者重复发送消息。那么可以补充下Kakfa的生产者幂等性配置。这个机制需要acks配置为-1或者all才能生效(当然,还有其他条件)。

2、Broker端,配置多Partition分区。

​ 在Broker端,可以给Topic配置更大的备份因子replication-factors。配置了备份因子后,Kafka会给每个Partition分配多个备份Partition。这些Partiton会尽量平均的分配到多个Broker上。并且,在这些Partiton中,会选举产生Leader Partition和Follower Partition。这样,当Leader Partition发生故障时,其他Follower Partition上还有消息的备份。就可以重新选举产生Leader Partition,继续提供服务。

​ 这样整个集群内的消息不会丢失。

3、消费者端,避免异步丢消息。

​ 消费者端由于有消息重试机制,正常情况下是不会丢消息的。每次消费者处理一批消息,需要在处理完后给Broker应答,提交当前消息的Offset。Broker接到应答后,会推进本地日志的Offset记录。如果Broker没有接到应答,那么Broker会重新向同一个消费者组的消费者实例推送消息,最终保证消息不丢失。

​ 消费者端唯一需要注意的是,不要异步处理业务逻辑。因为如果业务逻辑异步进行,而消费者已经同步提交了Offset,那么如果业务逻辑执行过程中出现了异常,失败了,那么Broker端已经接收到了消费者的应答,后续就不会再重新推送消息,这样就造成了业务层面的消息丢失。

​ 当然这里也有很多加分项,就是如何防止消息的重复消费。比如消费者端业务层面增加幂等性判断,或者在技术层间使用消费者的LowLevel API,将Offset移到数据库中自行管理,将数据库的业务操作与Offset变更放到同一个数据库事务中去处理,保证消息和业务的一致性。等等。

三、为什么金融场景没人会用Kafka?

​ 大部分的八股文也就是这样来分析这个问题的。上述的那些方法,虽然都会降低Kafka的性能,但是也还是可以保证消息不丢失的啊。如果这样的话,Kafka就是一个挺完美的MQ产品了。高并发的日志,大数据采集场景,Kafka的性能没得说。对数据安全要求高的金融场景,Kafka降低一点性能也可以hold住。但是事实是这样吗?

​ 所有的面试分享都很容易推导出这个结论。但是事实是,不会有任何一个架构师选择用Kafka来传递安全性要求极高的消息。阿里为了适应自己的金融服务,更是重新推出了RocketMQ。那到底是为什么呢?其实原因就在于,Kafka本身为了保证他的超高性能,就没有保证消息的绝对安全。

1、Kafka的文件设计不适合多分区。

​ Kafka的日志文件是以Partitoin为单位进行落地的。也就是说,每个Partition对应一组log日志文件。虽然Kafka写log文件的性能堪称一流,但是这也造成文件比较零散。当Kafka中的Topic和Partition比较多时,在日志文件也会随之变得更多,这样,寻找文件的性能消耗就会变大。 所以,当Topic和Partition过多时,Kafka的性能下降会非常明显。而在对数据敏感的业务场景中,天生就需要对数据进行更详细的区分,也就需要更多的Topic。

​ 对比之下,RocketMQ用一组统一的CommitLog收集所有Topic的消息,这样就能够很好的避免日志文件碎片化的问题。其实RocketMQ的数据文件读写方式,很大程度上就是借鉴了Kafka,但是做了一些调整之后,就能更贴合阿里庞大的电商场景。

2、Kafka不支持同步刷盘

​ 缓存断电就会丢失,这是大家都能理解的,所以缓存中的数据如果没有及时写入到硬盘,也就是常说的刷盘,那么当服务突然崩溃,就会有丢消息的可能。所以,最安全的方式是写一条数据,就刷一次盘,成为同步刷盘。

​ 但是,这里真正容易产生困惑的,是这里所说的缓存,并不是我们平常开发过程中接触到的缓存,而是操作系统内核态的缓存-pageCache。这是应用程序接触不到的一部分缓存。比如我们用应用程序打开一个文件,实际上文件里的内容,是从内核态的PageCache中读取出来的。因为与磁盘这样的硬件交互,相比于内存,性能是很低的,操作系统为了提升性能,会将磁盘中的文件加载到PageCache缓存中,再向应用程序提供数据。修改文件时也是一样的。用记事本修改一个文件的内容,不管你保存多少次,内容都是写到PageCache里的。然后操作系统会通过脏页机制,在未来的某个时刻将所有的PageCache统一写入磁盘的操作,这个操作就是刷盘。比如在操作系统正常关系的过程中,也会触发刷盘机制。

​ 说这么多,就是告诉你,其实对于缓存断掉,造成数据丢失,这个问题,应用程序其实是没有办法插手的。他并不能够决定自己产生的数据在什么时候刷入到硬盘当中。应用程序唯一能做的时,就是尽量频繁的通知操作系统进行刷盘操作。但是,这必然会降低应用的执行性能,而且,也不是能百分之百保证数据安全的。应用程序在这个问题上,只能取舍,不能解决。

​ Kafka其实在Broker端设计了一系列的参数,来控制刷盘操作的频率。如果对这些频率进行深度定制,是可以实现同步刷盘效果的。但是,这样的定制显然会大大降低Kafka的执行效率,这与Kafka的设计初衷是不符合的。

​ 相关的参数包括这几个:

  • log.flush.interval.messages:表示当同一个Partiton的消息条数积累到这个数量时,就会申请一次刷盘操作。默认是Long.MAX。
  • log.flush.interval.ms:当一个消息在内存中保留的时间,达到这个数量时,就会申请一次刷盘操作。他的默认值是空。如果这个参数配置为空,则生效的是下一个参数。
  • log.flush.scheduler.interval.ms:检查是否有日志文件需要进行刷盘的频率。默认也是Long.MAX。

​ 这里可以看到,Kafka为了最大化性能,默认是将刷盘操作交由了操作系统进行统一管理。

​ 而对比下RocketMQ,RocketMQ实现了同步刷盘的机制,也就是每写入一个消息,就会发起一起刷盘的操作。但是,Kafka并没有直接提供同步刷盘的功能。因为这相当于是破坏了操作系统的优化机制,强行将一列火车拖到乡间小路上去跑。

3、Partition故障恢复机制可能会丢消息

​ 这个问题就隐藏得比较深。

​ 首先需要理解Kafka的LEO和HW机制。

​ Kafka可以给每个Topic配置replication-factor备份因子,在Broker端就会给每个Partition维护一组Partition。这一组Partition会尽量平均的分配到不同的Broker上。然后,这一组Partition中会选举产生一个Leader Partition,负责与客户端进行交互。其他的Follower Partiton则负责从Leader Partition中同步消息,并辅助完成一部分的读请求。同步消息的过程,则是通过LEO和HW机制来完成的。

​ LEO 记录的是每个Partiton记录的最后一个Offset。

​ HW 记录的是一组Partiton中最小的LEO。

在这里插入图片描述

​ Leader Partition负责与客户端交互,他最先写入消息。然后其他的Follower才去同步Leader Partition的消息,同时,推高自己的LEO以及整个Partition的HW。

​ 当一个Follower Partiton发生故障时,不会影响消息的整体写入。这时,Kafka就会忽略这个有问题的Partiton,继续在其他Partiton之间写入以及同步数据,推高LEO和HW。

​ 当这个出故障的Partition出现问题时,并不会立即恢复加入到Partition集合中,而是会根据自己记录的HW值,清空掉HW之后的数据,然后开始拉取消息。直到自己记录的LEO值,赶上了整个Partition集合的HW之后,才正式加入Partiton集合,完全恢复正常工作。

在这里插入图片描述

​ 这种情况还好,不会对数据有什么影响。但是如果是Leader Partition发生故障了呢?这时,就需要重新选举产生一个新的Leader Partitoin。但是新的Leader Partition的消息记录与旧的Leader Partition会有差距。这时,Kafka的选择方式就是以新产生的Leader Partition的消息为准。所有其他Partiton中高于新Leader Partition的LEO值的那一部分消息全部清空,保持与Leader Partition同步,后续再继续记录消息。

​ 而旧的Leader Partition服务恢复过来后,也会作为Follower Partition,截取掉比自己以前记录的HW值更新的消息,然后重新去同步消息。

在这里插入图片描述

​ 这时,问题就出来了。旧Leader Partition中比较新的那几条数据,就在集群内部被彻底抛弃掉了,也就是说这部分数据丢失了

​ 那对比RocketMQ是不是也有这样的问题呢?RocketMQ的主从集群,主节点和从节点的地位是不变的,不会有重新选举的过程,所以,自然也不会有这种消息丢失的情况。而RocketMQ的Dledger集群,虽然也会有主节点切换,但是他的Deldger集群采用的是二阶段的文件写入,也就是说原本Kafkak中会丢失的这一部分消息,在RocketMQ中并不会被截取掉,而是会被记录为uncommited状态,等待消息继续同步,同步完成后再标记为commited状态。

总结

​ 关于Kafka保证消息不丢失的问题,就简单总结到这里,但这其实并不是结束。相反,随着你对Kafka理解得越深,你会发现这个问题会有更多的发散空间。像MQ如何保证消息不丢失?如何不重复消费?如何处理消息积压?等等,这都是一系列非常开放的面试题。对于你是否真正理解了每个MQ产品,是非常好的检验标准。所以,这么好的题目,如果只是简简单单背个八股文,那太可惜了。

相关文章:

  • Java学习----集合1
  • PBR概念及PBR核心理论和渲染原理
  • 5.5如何去除有序数组的重复元素
  • PBR标准化工作流程
  • Vue学习第17天——netTick()的原理及使用
  • 英语语法精讲合集
  • 如何用数据采集网关快速采集工业现场数据,怎么搭建MQTT服务器?
  • Vue中的样式绑定
  • 大学网课答案公众号题库搭建
  • torch.utils.data
  • torch.torchvision
  • Git GitHub VSCode 简单使用
  • 小程序开发技术框架选型
  • 大学生怎么制作查题搜题公众号?
  • Spring源码------IOC容器初始化过程
  • Android开发 - 掌握ConstraintLayout(四)创建基本约束
  • CentOS 7 防火墙操作
  • CentOS7 安装JDK
  • CSS居中完全指南——构建CSS居中决策树
  • Docker下部署自己的LNMP工作环境
  • ES6系统学习----从Apollo Client看解构赋值
  • Javascript基础之Array数组API
  • learning koa2.x
  • Leetcode 27 Remove Element
  • mysql_config not found
  • Nodejs和JavaWeb协助开发
  • Python_OOP
  • tweak 支持第三方库
  • UMLCHINA 首席专家潘加宇鼎力推荐
  • 分享一个自己写的基于canvas的原生js图片爆炸插件
  • 如何使用 OAuth 2.0 将 LinkedIn 集成入 iOS 应用
  • 微信小程序实战练习(仿五洲到家微信版)
  • 我感觉这是史上最牛的防sql注入方法类
  • nb
  • 我们雇佣了一只大猴子...
  • #[Composer学习笔记]Part1:安装composer并通过composer创建一个项目
  • #HarmonyOS:Web组件的使用
  • #Java第九次作业--输入输出流和文件操作
  • #LLM入门|Prompt#3.3_存储_Memory
  • %3cscript放入php,跟bWAPP学WEB安全(PHP代码)--XSS跨站脚本攻击
  • (LeetCode) T14. Longest Common Prefix
  • (非本人原创)我们工作到底是为了什么?​——HP大中华区总裁孙振耀退休感言(r4笔记第60天)...
  • (附源码)spring boot儿童教育管理系统 毕业设计 281442
  • (简单) HDU 2612 Find a way,BFS。
  • (转)Java socket中关闭IO流后,发生什么事?(以关闭输出流为例) .
  • (转)MVC3 类型“System.Web.Mvc.ModelClientValidationRule”同时存在
  • (转)程序员技术练级攻略
  • (转)四层和七层负载均衡的区别
  • **CI中自动类加载的用法总结
  • ... 是什么 ?... 有什么用处?
  • .net core 6 使用注解自动注入实例,无需构造注入 autowrite4net
  • .Net FrameWork总结
  • .Net MVC + EF搭建学生管理系统
  • .net MySql
  • .NET 动态调用WebService + WSE + UsernameToken