当前位置: 首页 > news >正文

kafka 消息位移提交几种方式:消息重复消息、消息丢失的关键

消费位移

Kafka 中的位移(offset)是用来记录消息在分区中的位置的标志,简单说就是记录消费者的消费进度,每次消息消费后需要更新消费进度,也就是位移提交

由此可见一旦位移提交发生异常,会导致消费进度不正确,就必然发生消息丢失或者重复消费

消息位移存储内部主题__consumer_offsets消息消费后需要执行位移的提交

消息位移提交几种方式

自动提交

enable.auto.commit 配置为true 默认每5s 提交一次 (auto.commit.interval.ms)拉取消息之前也会检查 是否可以进行位移提交

消息重复消费例子

消费者拉取了一批消息,消费后,消费位移自动提交前应用崩溃了,下次应用恢复,又从上次位移提交的地方消费

通过减小位移提交的时间间隔,能减少消息重复消费的可能,但会使消息位移提交频繁

消息丢失例子

消费者拉取到消息,此时消息位移刚好自动提交,但消息还没来及处理,然后应用崩溃了,下次应该恢复了,由于位移已经提交, 未处理的几条消息,就丢失了。

除了极端情况下消息可能存在丢失或重复消费,重复消息业务可以通过幂等性保证, 但消息丢失是可怕的,我们甚至都不知道

手动提交

对于业务来说消息拉取后,正确处理完才算消费了,自动提交可以更加灵活精准控制消息位移的提交

使用方式 设置enable.auto.commit 配置为false

同步提交

它会阻塞当前线程,直到提交成功或发生错误。同步提交确保位移提交的可靠性,但会增加延迟。

ConsumerRecords<String,String> records =    kafkaConsumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record:records){
// 消息处理逻辑
}kafkaConsumer.commitSync();

异步提交

它不会阻塞当前线程,提交过程在后台进行。异步提交提高了性能,但需要处理可能的提交失败情况。

kafkaConsumer.commitASync()

KafkaConsumer API 还为手动提交提供了带参数的方法

commitSync(Map<TopicPartition, OffsetAndMetadata>;commitAsync(Map<TopicPartition, OffsetAndMetadata>)

总结

一般情况我们消息位移自动提交就可以满足我们大部分场景,当然也有场景需要控制消息位移提交,需要我们在可靠性与性能之间做取舍,自动位移提交代码稍微复杂点,需要处理好位移提交失败的情况。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • C++ | Leetcode C++题解之第415题字符串相加
  • Go-知识-定时器
  • KTH5762系列 低功耗、高精度 3D 霍尔角度传感器 电子手表旋钮应用
  • 【对比学习串烧】 SimSiam MoCov3 DINO
  • nacos和eureka的区别
  • java(3)数组的定义与使用
  • 数值实验作业(第一章)
  • 鸿蒙OpenHarmony【轻量系统内核扩展组件(动态加载)】子系统开发
  • Python青少年简明教程目录
  • 混合开发应用侧-JSBridge,在加载的网页中调用原生能力
  • ARM驱动学习之7 驱动模块传参数
  • CentOS入门宝典:从零到一构建你的Linux服务器帝国
  • 64. 求 1+2+…+n
  • MOS工作的三种状态及其分析——亚阈值区(截至区),深三极管区(又叫深线性区)和饱和区
  • 基于SpringBoot+定时任务实现地图上绘制车辆实时运动轨迹图
  • @jsonView过滤属性
  • CentOS从零开始部署Nodejs项目
  • Golang-长连接-状态推送
  • HTML-表单
  • iOS筛选菜单、分段选择器、导航栏、悬浮窗、转场动画、启动视频等源码
  • XForms - 更强大的Form
  • yii2权限控制rbac之rule详细讲解
  • 爱情 北京女病人
  • 聊聊hikari连接池的leakDetectionThreshold
  • 三栏布局总结
  • 算法-图和图算法
  • 学习笔记DL002:AI、机器学习、表示学习、深度学习,第一次大衰退
  • 一个完整Java Web项目背后的密码
  • Hibernate主键生成策略及选择
  • Spring Batch JSON 支持
  • 阿里云ACE认证之理解CDN技术
  • ​如何使用ArcGIS Pro制作渐变河流效果
  • # 数据结构
  • #includecmath
  • (1)(1.11) SiK Radio v2(一)
  • (13)[Xamarin.Android] 不同分辨率下的图片使用概论
  • (arch)linux 转换文件编码格式
  • (k8s)Kubernetes本地存储接入
  • (TipsTricks)用客户端模板精简JavaScript代码
  • (黑马点评)二、短信登录功能实现
  • (实测可用)(3)Git的使用——RT Thread Stdio添加的软件包,github与gitee冲突造成无法上传文件到gitee
  • (算法)Game
  • (转)我也是一只IT小小鸟
  • .halo勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
  • .Mobi域名介绍
  • .Net MVC + EF搭建学生管理系统
  • .NET Reactor简单使用教程
  • .net 生成二级域名
  • .Net6支持的操作系统版本(.net8已来,你还在用.netframework4.5吗)
  • .NET中winform传递参数至Url并获得返回值或文件
  • @ 代码随想录算法训练营第8周(C语言)|Day57(动态规划)
  • @PostConstruct 注解的方法用于资源的初始化
  • @RequestBody与@ModelAttribute
  • [ACP云计算]易混淆知识点(考题总结)
  • [C++随笔录] 红黑树