kafka消费过程中失败,kafka重试补偿
今天遇到一个kafka的问题,在生产者发送消息之后,消费者会消费多次。在网上查询了很久,最终是在这个博客的引导下发现了问题:http://www.dalbll.com/Group/Topic/JAVA/5162,里面提到了kafka中的配置enable.auto.commit 是 true,这个会自动提交,然后是当我们的配置是自动提交的时候,消费者的消息投递保证有可能是at least once,或者at most once。当到达提交时间间隔,触发Kafka自动提交上次的偏移量时,就可能发生at most once的情况, 在这段时间,如果消费者还没完成消息的处理进程就崩溃了, 消费者进程重新启动时,它开始接收上次提交的偏移量之后的消息,实际上消费者可能会丢失几条消息;而当消费者处理完消息并将消息提交到持久化存储系统,而消费者进程崩溃时,会发生at least once的情况。 在此期间,kafka没有向broker提交offset,因为自动提交时间间隔没有过去。 当消费者进程重新启动时,会收到从上次提交的偏移量开始的一些旧消息。正是这个导致消息丢失或者重复消费现象。
在一些情况下,即使消费者进程没有崩溃,假如中间有一个消息的业务逻辑执行抛出了异常,消费者也当作是接收到了消息,程序执行回滚,这条消息也等同于丢失了。我关闭了自动提交(enable.auto.commit:false),当消费者每次 poll 处理完业务逻辑后必须完成手动同步提交(commitSync),如果消费者在消费过程中发生 crash,或者执行业务逻辑发生异常回滚,下次启动时依然会从之前的位置开始消费,从而保证每次提交的内容都能被消费,即实现了at least once保证。
通过代码说明一下:
@StreamListener(MySink.INPUTB)
public void messageListen(JSONObject message) {
//下面是具体的消息消费事件
int result = doSoming();
if (result<0){
System.out.println("service消息内容:解析失败");
}else {
System.out.println("service消息内容:解析成功");
}
}
如果是在doSoming()的过程中程序抛出异常,而又没有实现异常捕获的时候,消费者就又以为消息没有消费,会重新去再走一遍这个消费方法,即又会重新执行doSoming();这样就可能造成消息的重复消费。
最后的解决方式有两种:
1、保证自己在doSoming()中对所有异常都能捕捉,并做相应的处理。
2、在这个监听的方法开始设置一个唯一键字段,消费时根据唯一键查询这条消息,判断是否消费过。也可以通过redis缓存来实现类似的机制。