当前位置: 首页 > news >正文

Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

目录

    • 生产者ack机制
    • 消费者ack模式
    • 手动提交ACK

生产者ack机制

Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。

Kafka 提供了三种 ACK 机制的配置选项,分别是:

  1. acks=0:生产者在成功将消息发送到网络缓冲区后即视为消息已被提交,不等待任何服务器响应。这种配置下,可能会出现消息丢失的情况。

  2. acks=1:生产者在成功将消息发送到主题的分区 leader 后即视为消息已被提交。这种配置下,生产者会收到分区 leader
    的确认,但仍有可能出现消息丢失的情况,例如当 leader 出现故障,而消息尚未复制到其他副本时。

  3. acks=all 或acks=-1:生产者需要等待所有分区副本都成功写入消息后才视为消息已被提交。这种配置下,生产者会等待所有分区副本的确认,确保消息被复制到足够数量的副本后才返回提交确认。这是最安全的确认方式,但也会导致较长的等待时间。

在实际使用中,根据对消息可靠性和延迟的要求,可以选择不同的 ACKs 级别。一般来说,如果对消息的可靠性要求较高,可以选择较高的 ACKs 级别,但需要考虑相应的延迟成本。

我们可以通过spring.kafka.producer.acks来配置ack机制

spring.kafka.producer.acks=1

消费者ack模式

kafka支持的消费模式,在AbstractMessageListenerContainer.AckMode的枚举中,下面就介绍下各个模式的区别

public enum AckMode {/*** Commit after each record is processed by the listener.*/RECORD,/*** Commit whatever has already been processed before the next poll.*/BATCH,/*** Commit pending updates after* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.*/TIME,/*** Commit pending updates after* {@link ContainerProperties#setAckCount(int) ackCount} has been* exceeded.*/COUNT,/*** Commit pending updates after* {@link ContainerProperties#setAckCount(int) ackCount} has been* exceeded or after {@link ContainerProperties#setAckTime(long)* ackTime} has elapsed.*/COUNT_TIME,/*** User takes responsibility for acks using an* {@link AcknowledgingMessageListener}.*/MANUAL,/*** User takes responsibility for acks using an* {@link AcknowledgingMessageListener}. The consumer* immediately processes the commit.*/MANUAL_IMMEDIATE,}

AckMode模式

RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
当使用 RECORD 确认模式时,消息监听容器会在每个消息被单独处理后进行确认。这意味着,如果一条消息被成功处理,它将作为单独的记录进行确认;如果处理失败,也会针对该消息进行错误记录。这种确认模式适用于需要精确处理每个消息的应用场景,例如确保每个消息都被正确处理。

BATCH:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
当使用 BATCH 确认模式时,消息监听容器会在批量处理一组消息后进行确认。这意味着,消息监听容器会将多个消息合并为批次,并将它们作为一组进行处理。只有在整个批次都被成功处理后,该批次的所有消息才会被确认。这种确认模式适用于需要提高处理效率的场景,例如批量处理大量消息以减少网络传输和系统调用的开销。

TIME:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交

COUNT:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交

COUNT_TIME:TIME或COUNT 有一个条件满足时提交

MANUAL:这是手动确认模式,消费者需要显式地调用 Acknowledgment.acknowledge() 方法来确认消息。只有当消费者调用 acknowledge() 方法后,才会向 Kafka 服务器发送确认消息。这种模式可以保证消息的可靠性和顺序性,但需要消费者显式地处理确认逻辑。

MANUAL_IMMEDIATE:这是立即手动确认模式,与 MANUAL 模式类似,但消费者在调用 acknowledge() 方法时,会立即向 Kafka 服务器发送确认消息。这种模式可以提高消息处理的速度,但可能会增加重复消费的风险。

MANUAL和MANUAL_IMMEDIATE的区别

MANUAL 和 MANUAL_IMMEDIATE 都是 Kafka 消费者的手动确认模式,它们的区别在于确认的时机不同。

MANUAL 模式下,消费者需要显式地调用 Acknowledgment.acknowledge() 方法来确认消息,在调用该方法之后,消息才会被标记为已消费,并且确认消息会在下次 poll() 时发送到 Kafka 服务器。这种模式的优点是可以保证消息的可靠性和顺序性,但需要消费者显式地处理确认逻辑。

相比之下,MANUAL_IMMEDIATE 模式下,在消费者调用 Acknowledgment.acknowledge() 方法时,会立即向 Kafka 服务器发送确认消息。这种模式可以提高消息处理的速度,但可能会增加重复消费的风险,因为如果消息处理失败,Kafka 不会再次发送该消息,而是认为该消息已经被成功消费了。

在实际使用中,应根据业务需求和性能要求来选择合适的确认模式。如果要求消息的可靠性和顺序性比较高,可以选择 MANUAL 模式;如果要求处理速度比较高,可以选择 MANUAL_IMMEDIATE 模式。

AckMode 可以通过配置文件或代码进行设置。例如,在 Spring Boot 应用中,可以使用以下配置方式指定确认模式:

spring.kafka.listener.ack-mode=manual_immediate

手动提交ACK

kafka默认是自动提交ack的,很多时候,我们都需要手动提交,这就要进行以下配置

1、设置enable-auto-commit=false,禁止自动提交
2、设置ack-mode为manual_immediate

在配置文件进行如下配置

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual_immediate

3、监听方法的入参加入Acknowledgment ack 参数,并在消费完成之后调用acknowledge方法,如下所示

	@KafkaListener(topics = "my-topic2",groupId = "myGroup")public void  receiveMessage2(String message, Acknowledgment ack){log.info("消费消息:"+message);//ack确认ack.acknowledge();}

相关文章:

  • SAP UI5 walkthrough step2 Bootstrap
  • Html转PDF,前端JS实现Html页面导出PDF(html2canvas+jspdf)
  • PTA:哈夫曼编码
  • class067 二维动态规划【算法】
  • 自然语言处理:电脑如何理解我们的语言?
  • Spring Cloud + Vue前后端分离-第3章 SpringBoot项目技术整合
  • spring security面经-字节飞书生产力工具后端一面
  • Google Bard vs. ChatGPT 4.0:文献检索、文献推荐功能对比
  • Linux AMH服务器管理面板本地安装与远程访问
  • 30 剑指offer-动态规划求正则表达式匹配
  • 第九天:信息打点-CDN绕过篇amp;漏洞回链amp;接口探针amp;全网扫描amp;反向邮件
  • 数据结构和算法专题---2、算法思想
  • 把 Windows 11 装进移动硬盘:Windows 11 To Go
  • UDP协议实现群聊
  • C++ 多态性(Polymorphism)和 虚函数(Virtual Functions)
  • (ckeditor+ckfinder用法)Jquery,js获取ckeditor值
  • 《微软的软件测试之道》成书始末、出版宣告、补充致谢名单及相关信息
  • 【407天】跃迁之路——程序员高效学习方法论探索系列(实验阶段164-2018.03.19)...
  • 【跃迁之路】【585天】程序员高效学习方法论探索系列(实验阶段342-2018.09.13)...
  • canvas 五子棋游戏
  • exports和module.exports
  • export和import的用法总结
  • fetch 从初识到应用
  • Java 23种设计模式 之单例模式 7种实现方式
  • JavaScript对象详解
  • JS创建对象模式及其对象原型链探究(一):Object模式
  • leetcode388. Longest Absolute File Path
  • python 装饰器(一)
  • rabbitmq延迟消息示例
  • React Transition Group -- Transition 组件
  • 浮动相关
  • 批量截取pdf文件
  • 《天龙八部3D》Unity技术方案揭秘
  • 【运维趟坑回忆录】vpc迁移 - 吃螃蟹之路
  • ​Linux Ubuntu环境下使用docker构建spark运行环境(超级详细)
  • ‌前端列表展示1000条大量数据时,后端通常需要进行一定的处理。‌
  • # 深度解析 Socket 与 WebSocket:原理、区别与应用
  • #!/usr/bin/python与#!/usr/bin/env python的区别
  • #NOIP 2014#day.2 T1 无限网络发射器选址
  • #QT(QCharts绘制曲线)
  • (2024最新)CentOS 7上在线安装MySQL 5.7|喂饭级教程
  • (C++)栈的链式存储结构(出栈、入栈、判空、遍历、销毁)(数据结构与算法)
  • (poj1.2.1)1970(筛选法模拟)
  • (rabbitmq的高级特性)消息可靠性
  • (二十九)STL map容器(映射)与STL pair容器(值对)
  • (利用IDEA+Maven)定制属于自己的jar包
  • (一)RocketMQ初步认识
  • (转)ObjectiveC 深浅拷贝学习
  • .net 程序 换成 java,NET程序员如何转行为J2EE之java基础上(9)
  • @JoinTable会自动删除关联表的数据
  • []串口通信 零星笔记
  • [100天算法】-实现 strStr()(day 52)
  • [20181219]script使用小技巧.txt
  • [2019.3.5]BZOJ1934 [Shoi2007]Vote 善意的投票
  • [C#]winform部署官方yolov10目标检测的onnx模型