当前位置: 首页 > news >正文

Kafka运行机制(二):消息确认,消息日志的存储和回收,生产者消息分区

前置知识

Kafka基本概念icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141270920?spm=1001.2014.3001.5501Kafka运行机制(一):Kafka集群启动,controller选举,生产消费流程icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141307210?spm=1001.2014.3001.5501

​1. 消息确认

生产者端,客户端在通过生产者生产消息时,需要知道消息是否发送成功,防止消息丢失或进行其他操作。消费者端,消费者也需要确认自己在消费数据后,提交偏移量是否成功,防止重复消费。

生产者

生产者的消息确认机制通常只有在JavaApi中,或生产者配置了重试机制时才有意义。在JavaApi中Kafka驱动提供了同步提交和异步提交两种提交方式(因为生产者批处理的机制,Kafka可以配置缓冲区大小,消息可以暂存到缓冲区中,等到时间或消息大小达到指定值时发送),同步提交会阻塞当前线程,等待kafka返回确认信号,并更具生产者配置选择是否重试。而异步提交则不会阻塞代码,Kafka返回确认信号后执行指定的回调函数。命令行配置重试机制,也可以在Kafka返回失败的信号时,触发重新提交。通过下文可以详细了解在JavaApi中如何控制生产者和消费者的消息确认

Kafka快速入门:Kafka驱动JavaApi的使用icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141356436?spm=1001.2014.3001.5501

而Kafka何时或者说何种条件下才会返回确认信息,则是由acks配置项控制。

生产者消息确认策略是由acks配置项控制具体如下:

  • acks=0:生产者发送消息后不会等待任何确认。这种模式下,消息的丢失不可避免,适用于对消息丢失容忍的场景。
  • acks=1:生产者发送消息后,会等待 Leader 副本的确认。Leader 确认后,生产者会认为消息已成功写入。这种模式下,消息丢失的风险较低,但如果 Leader 宕机,消息可能会丢失。
  • acks=allacks=-1:生产者发送消息后,会等待所有 ISR(In-Sync Replicas)副本的确认。只有当所有 ISR 副本都确认了消息,生产者才会认为消息写入成功。这种模式下,消息的可靠性最高,但写入延迟可能增加。

实际生产中并非所有情况都不允许消息丢失,在视频相关的功能中,丢失几帧数据并不影响视频整体流畅度,反而是服务的响应速度对流畅度的影响更大,在这种情况下可以完全不在乎是否丢包,将ack设为0,以达到极致的性能。

其中,0和1都比较好理解,当设置为all或-1时所有的ISR副本是什么呢?

ISR是集群元数据的一部分,其中记录了各个Leader分区以及和其日志偏移量差距不差,上次同步数据时间据当前时间不长的Follower副本,和其日志偏移量和上次同步数据的时间的映射关系。

在上文中我们讲述了主题副本分区中的Leader和Follower的角色和相关功能,其中Leader 副本负责处理所有的生产请求,并将数据写入自己的日志中。Follower 副本则从 Leader 副本同步数据,将其写入到自己的日志中。当Follower在同步Ledaer的数据时,Leader会讲这次同步的Follower,其同步数据的偏移量以及同步的时间的映射关系通过controlelr节点,再由controller节点存入元数据中的ISR数据列表。当Leader将新的偏移量和时间存入ISR中时,ISR不仅会更新数据,还会检查当前内部保存的Follower的同步数据偏移量和时间是否和Leader日志中的偏移量和当前时间相差太多(配置中的replica.lag.max.messages和replica.lag.time.max.ms),如果超过预期值则会将该Follwer从ISR踢出。

踢出ISR并不会影响改分区副本的正常功能,不过当选举新的Leader会从ISR中选取,并且当设置acks为all时,生产消息的确认,也是通过当前生产消息的偏移量与ISR中的值进行比对,当ISR中记录的所有的Follwer偏移量都超过了这个消息对应的偏移量,则认为所有分区都已经成功,返回客户端成功响应,不会确认未再ISR中的分区副本。整体流程如下图

​消费者

消费者的确认机制是依赖于提交偏移量的。不同于生产者生产消息,如果生产成功时不返回任何确认信号,客户端则无法知道自己是否生产成功。而消费者消费成功会获取最终数据,所以只要获得了数据,就是消费成功,不需要什么确认信息。不过消费者在消费成功时需要向__consumer-offsets主题中提交一个自己消费分区当前的偏移量,所以只有当成功向向__consumer-offsets主题提交了偏移量后,才叫消费成功,如果没有成功提交偏移量,后续仍然会重复消费。

消费者提交偏移量分为手动提交和自动提交,手动提交时会马上提交偏移量,即使没有消费成功,这种模式下,当消费者由于某些意外消费失败,偏移量也会加一,就会造成消息丢失。

自动提交会在消费成功时自动提交,在这种模式下,当消费者由于某些意外消费成功,但意外宕机导致没有提交偏移量,则会造成消息重复消费。

2. Kafka消息日志的存储和回收

1. 日志存储机制

Kafka 中的每个主题包含若干分区,每个分区在物理上对应一个日志文件夹,该文件夹包含一系列的日志段文件。

日志段文件
  • 日志段:每个分区的日志被分成多个日志段文件。日志段是一个物理文件,存储在 Kafka 的存储目录下。每个日志段文件以偏移量范围命名。
  • 日志文件命名:日志段文件的命名格式通常是 log-start-offset.log,其中 log-start-offset 是该日志段中第一条消息的偏移量。
  • 索引文件:除了日志段文件外,每个分区还包含两个索引文件:时间索引文件和偏移量索引文件,用于快速查找消息。
写入机制
  • 顺序写入:Kafka 中的消息是顺序写入日志段的,每条消息都有一个唯一的偏移量,表示该消息在分区中的位置。
  • 追加模式:消息总是被追加到当前的日志段文件中。当日志段的大小达到配置的阈值时,Kafka 会滚动创建新的日志段。

2. 日志存储配置

Kafka 提供了一系列配置参数,控制日志的存储行为:

  • log.segment.bytes:每个日志段文件的最大大小,默认为 1GB。当日志段大小达到这个限制时,Kafka 会创建一个新的日志段。

    log.segment.bytes=1073741824 # 每个日志段大小限制为1GB
  • log.roll.ms:日志段滚动的时间限制。即使日志段未达到 log.segment.bytes 限制,Kafka 也会根据时间限制滚动日志段,默认为 7 天。

    log.roll.ms=604800000 # 每7天滚动一次日志段

3. 日志回收机制

Kafka 的日志回收是通过删除策略和压缩策略实现的,用于管理存储空间并保持系统的高效运行。

删除策略(Log Cleanup Policy)

Kafka 提供两种主要的日志回收策略:

  • 基于时间的保留策略

    • log.retention.ms:定义了消息在日志中的最长保留时间。超过这个时间的消息会被标记为可删除。

    • log.retention.minuteslog.retention.hours:这是 log.retention.ms 的简写形式,以分钟和小时为单位配置。

      log.retention.hours=168 # 保留7天的消息
  • 基于大小的保留策略

    • log.retention.bytes:定义每个分区日志的最大存储大小。当日志文件的大小超过这个值时,最旧的消息会被删除。

      log.retention.bytes=1073741824 # 每个分区的日志大小限制为1GB
日志清理策略(Log Cleanup Policy)

Kafka 允许用户为每个主题配置日志清理策略:

  • delete 策略:这是默认策略。当消息超过保留时间或日志大小限制时,Kafka 会自动删除旧消息,以释放存储空间。

    log.cleanup.policy=delete
  • compact 策略:此策略不删除旧消息,而是保留每个键的最新版本。

    log.cleanup.policy=compact

compact策略下,通过生产者生产消息时给该条消息设置一个key,当多条消息都有一个key值时,compact会删除旧的key对应的消息,保存最新的key消息。

compact可以和delete结合使用,也就是同时配置两种策略,在这种策略下会对有key的消息保存最新版本,而对于没有key的消息依据删除配置删除。

Kafka 使用后台线程定期检查日志段,并根据删除策略和保留策略执行清理操作。这些线程负责删除旧的日志段或压缩日志段,以保证 Kafka 的存储空间得到合理管理。

3. 生产者消息分区

当没有使用自定义分区器时,会使用默认分区器,默认分区器会根据消息是否具有key值来进行不同的分区判断。

  • 有key值时:有key值时会通过类似于Hash槽的模式来对消息进行分区,通过对key值取hash值,然后在通过hash值取余分区总数,的到最终的值就是分区位置。
  • 无key值时:无key值时则通过轮询算法来进行消息分区

客户端也可以根据具体要求来自定义分区策略,比如某些服务器更强,我们可以向其中分区投入更多消息,具体的自定义分区实现,下文中有详细讲解

Kafka快速入门:Kafka驱动JavaApi的使用icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141356436?spm=1001.2014.3001.5501​​​​​​​

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Postman接口自动化测试:从入门到实践!
  • 物联网(IoT)设备渗透文章二:智能家居中控系统的渗透与利用
  • C++ 设计模式——观察者模式
  • 【CAN总线测试】——CAN数据链路层测试
  • RK平台一个系统固件兼容多款屏幕
  • 虚幻5|AI行为树,跟随task(非行为树AI)
  • .NET应用UI框架DevExpress XAF v24.1 - 可用性进一步增强
  • 内存管理篇-03物理内存管理-32位
  • MySQL 的子查询(Subquery)
  • 单例模式 详解
  • 计算机毕业设计opencv+pytorch疲劳驾驶检测系统 自动驾驶 面部多信息特征融合的疲劳驾驶检测系统 驾驶员疲劳驾驶风险检测 深度学习 机器学习 大数据
  • Educational Codeforces Round 169 (Rated for Div. 2)
  • Java语言程序设计——篇十七(1)
  • verilog中两个常数相除
  • 三、LogicFlow 基础配置介绍及实现一个基础 Demo
  • 【译】JS基础算法脚本:字符串结尾
  • 2019年如何成为全栈工程师?
  • Apache Spark Streaming 使用实例
  • Date型的使用
  • JavaScript函数式编程(一)
  • Java比较器对数组,集合排序
  • JSONP原理
  • Laravel 中的一个后期静态绑定
  • LeetCode算法系列_0891_子序列宽度之和
  • nodejs实现webservice问题总结
  • SSH 免密登录
  • vue 个人积累(使用工具,组件)
  • vue从入门到进阶:计算属性computed与侦听器watch(三)
  • 发布国内首个无服务器容器服务,运维效率从未如此高效
  • 分类模型——Logistics Regression
  • 复习Javascript专题(四):js中的深浅拷贝
  • 将 Measurements 和 Units 应用到物理学
  • 前端路由实现-history
  • 如何利用MongoDB打造TOP榜小程序
  • 学习HTTP相关知识笔记
  • ​​​​​​​ubuntu16.04 fastreid训练过程
  • #LLM入门|Prompt#2.3_对查询任务进行分类|意图分析_Classification
  • #鸿蒙生态创新中心#揭幕仪式在深圳湾科技生态园举行
  • #如何使用 Qt 5.6 在 Android 上启用 NFC
  • #中的引用型是什么意识_Java中四种引用有什么区别以及应用场景
  • $.ajax()参数及用法
  • $nextTick的使用场景介绍
  • (04)odoo视图操作
  • (2022 CVPR) Unbiased Teacher v2
  • (2022版)一套教程搞定k8s安装到实战 | RBAC
  • (done) 两个矩阵 “相似” 是什么意思?
  • (Java企业 / 公司项目)点赞业务系统设计-批量查询点赞状态(二)
  • (MTK)java文件添加简单接口并配置相应的SELinux avc 权限笔记2
  • (ros//EnvironmentVariables)ros环境变量
  • (附源码)springboot炼糖厂地磅全自动控制系统 毕业设计 341357
  • (免费领源码)Java#Springboot#mysql农产品销售管理系统47627-计算机毕业设计项目选题推荐
  • (三)SvelteKit教程:layout 文件
  • (中等) HDU 4370 0 or 1,建模+Dijkstra。
  • (转)ABI是什么
  • (转)MVC3 类型“System.Web.Mvc.ModelClientValidationRule”同时存在