当前位置: 首页 > news >正文

kafka消费过程中失败,kafka重试补偿

    今天遇到一个kafka的问题,在生产者发送消息之后,消费者会消费多次。在网上查询了很久,最终是在这个博客的引导下发现了问题:http://www.dalbll.com/Group/Topic/JAVA/5162,里面提到了kafka中的配置enable.auto.commit 是 true,这个会自动提交,然后是当我们的配置是自动提交的时候,消费者的消息投递保证有可能是at least once,或者at most once。当到达提交时间间隔,触发Kafka自动提交上次的偏移量时,就可能发生at most once的情况, 在这段时间,如果消费者还没完成消息的处理进程就崩溃了, 消费者进程重新启动时,它开始接收上次提交的偏移量之后的消息,实际上消费者可能会丢失几条消息;而当消费者处理完消息并将消息提交到持久化存储系统,而消费者进程崩溃时,会发生at least once的情况。 在此期间,kafka没有向broker提交offset,因为自动提交时间间隔没有过去。 当消费者进程重新启动时,会收到从上次提交的偏移量开始的一些旧消息。正是这个导致消息丢失或者重复消费现象。

    在一些情况下,即使消费者进程没有崩溃,假如中间有一个消息的业务逻辑执行抛出了异常,消费者也当作是接收到了消息,程序执行回滚,这条消息也等同于丢失了。我关闭了自动提交(enable.auto.commit:false),当消费者每次 poll 处理完业务逻辑后必须完成手动同步提交(commitSync),如果消费者在消费过程中发生 crash,或者执行业务逻辑发生异常回滚,下次启动时依然会从之前的位置开始消费,从而保证每次提交的内容都能被消费,即实现了at least once保证。

    通过代码说明一下:

@StreamListener(MySink.INPUTB)
	public void messageListen(JSONObject message) {
		//下面是具体的消息消费事件
        int result = doSoming();
		if (result<0){
			System.out.println("service消息内容:解析失败");
		}else {
			System.out.println("service消息内容:解析成功");
		}
	}

如果是在doSoming()的过程中程序抛出异常,而又没有实现异常捕获的时候,消费者就又以为消息没有消费,会重新去再走一遍这个消费方法,即又会重新执行doSoming();这样就可能造成消息的重复消费。

最后的解决方式有两种:

1、保证自己在doSoming()中对所有异常都能捕捉,并做相应的处理。

2、在这个监听的方法开始设置一个唯一键字段,消费时根据唯一键查询这条消息,判断是否消费过。也可以通过redis缓存来实现类似的机制。

相关文章:

  • 从0到1搭建属于自己的服务器
  • PostgreSQL实战(2)数据结构
  • 金蝶kis记账王初始化过程中如何设置科目
  • SpringBoot项目的jar包在启动时选择的多环境配置以及加载顺序
  • PostgreSQL中date数据类型
  • springmvc带参数链接跳转,实现单一样式容器
  • Spring Boot 打包分为 war 格式,放到Tomcat下报错的解决方案
  • 窗体的事件
  • PostgreSQL序列的创建和使用
  • PostgreSQL的数据备份与恢复(windows版本)
  • 表单提交相关
  • @Transactional注解下,循环取序列的值,但得到的值都相同的问题
  • https网站跳转到http网站时,referrer获取不到的问题
  • Centos 6.5 使用命令ssh localhost一直需要密码的问题
  • 练习:WinForm 登录界面
  • 【刷算法】从上往下打印二叉树
  • AHK 中 = 和 == 等比较运算符的用法
  • CNN 在图像分割中的简史:从 R-CNN 到 Mask R-CNN
  • CSS选择器——伪元素选择器之处理父元素高度及外边距溢出
  • gitlab-ci配置详解(一)
  • JavaScript实现分页效果
  • PHP 7 修改了什么呢 -- 2
  • Python代码面试必读 - Data Structures and Algorithms in Python
  • React中的“虫洞”——Context
  • SQLServer之创建数据库快照
  • 从零到一:用Phaser.js写意地开发小游戏(Chapter 3 - 加载游戏资源)
  • 搞机器学习要哪些技能
  • 警报:线上事故之CountDownLatch的威力
  • 名企6年Java程序员的工作总结,写给在迷茫中的你!
  • 前端性能优化--懒加载和预加载
  • 如何利用MongoDB打造TOP榜小程序
  • 使用Gradle第一次构建Java程序
  • 数据可视化之 Sankey 桑基图的实现
  • 系统认识JavaScript正则表达式
  • 职业生涯 一个六年开发经验的女程序员的心声。
  • Hibernate主键生成策略及选择
  • 资深实践篇 | 基于Kubernetes 1.61的Kubernetes Scheduler 调度详解 ...
  • ​LeetCode解法汇总1410. HTML 实体解析器
  • ​一帧图像的Android之旅 :应用的首个绘制请求
  • # Apache SeaTunnel 究竟是什么?
  • #define与typedef区别
  • #预处理和函数的对比以及条件编译
  • (6)【Python/机器学习/深度学习】Machine-Learning模型与算法应用—使用Adaboost建模及工作环境下的数据分析整理
  • (android 地图实战开发)3 在地图上显示当前位置和自定义银行位置
  • (Demo分享)利用原生JavaScript-随机数-实现做一个烟花案例
  • (LeetCode) T14. Longest Common Prefix
  • (pt可视化)利用torch的make_grid进行张量可视化
  • (windows2012共享文件夹和防火墙设置
  • (保姆级教程)Mysql中索引、触发器、存储过程、存储函数的概念、作用,以及如何使用索引、存储过程,代码操作演示
  • (二)hibernate配置管理
  • (分布式缓存)Redis哨兵
  • (汇总)os模块以及shutil模块对文件的操作
  • (三)终结任务
  • (转)socket Aio demo
  • (转载)Linux 多线程条件变量同步