当前位置: 首页 > news >正文

springboot kafka 提高拉取数量

文章目录

  • 背景
  • 问题复现
  • 解决问题
  • 原理分析
    • fetch.min.bytes
    • fetch.max.wait.ms
    • 源码分析
      • ReplicaManager#fetchMessages

背景

开发过程中,使用kafka批量消费,发现拉取数量一直为1,如何提高批量拉取数量,记录下踩坑记录。

问题复现

  • kafka maven依赖
		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>
  • 配置消费者
@Configuration
public class KafkaBlukConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.max-poll-records:30}")private Integer maxPollRecords;@Value("${spring.kafka.consumer.groupId:group1}")private String group;/*** 消费者配置信息*/@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}/*** 消费者批量⼯程*/@Beanpublic KafkaListenerContainerFactory<?> batchFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIGfactory.setBatchListener(true);return factory;}
}
  • 消费端代码

@Component
public class KafkaBatchConsumer {private static final Logger log = LoggerFactory.getLogger(KafkaBatchConsumer.class);@KafkaListener(id = "consumer1", topics = "topic2", containerFactory = "batchFactory")public void consume(List<ConsumerRecord<String, String>> record) throws Exception {log.info("KafkaBatchConsumer recode size : {} ", record.size());}}
  • 使用yml配置生产者
spring:kafka:bootstrap-servers: 192.168.56.112:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
  • 使用生产者发送消息
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaProducer {// 自定义的主题名称public static final String TOPIC_NAME = "topic2";private KafkaTemplate<String, String> kafkaTemplate;/*** http://localhost:8080/kafka/send?msg=a* @param msg*/@RequestMapping("/send")public String send(@RequestParam("msg") String msg) {log.info("准备发送消息为:{}", msg);// 1.发送消息ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, msg);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable throwable) {// 2.发送失败的处理log.error("生产者 发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, String> stringObjectSendResult) {// 3.发送成功的处理log.info("生产者 发送消息成功:" + stringObjectSendResult.toString());}});return "接口调用成功";}
}
  • 发送消息,观察消费者批量消费情况
http://localhost:9999/kafka/send?msg=a

多次调用发现如下:

在这里插入图片描述
发现拉取消息的大小始终为1

解决问题

  • 添加下面两行代码
@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);################ 添加下面两行 ###########props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024);props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 3000);######################################props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}
  • 再次发送消息,观察消费情况

在这里插入图片描述
可以看到批量消费成功。

原理分析

fetch.min.bytes

消费者从服务器获取记录的最小字节数,broker 收到消费者拉取数据的请求的时候,如果可用数据量小于设置的值,那么 broker 将会等待有足够可用的数据的时候才返回给消费者,这样可以降低消费者和 broker 的工作负载。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。

fetch.max.wait.ms

如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可能会因为获取不到足够大小的消息而一直阻塞等待,从而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms 参数用于指定 等待 FetchResponse 的最长时间,服务端根据此时间决定何时进行响应,默认值为 500(ms)。如果 Kafka 中没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待 500ms 再响应消费者请求。这个参数的设定需要参考 Consumer 与 Kafka 之间的延迟大小,如果业务应用对延迟敏感,那么可以适当调小这个参数。

源码分析

ReplicaManager#fetchMessages

/*** 能够立即返回给客户端的4种情况* 1. fetch请求没有大于0的wait时间,参考fetch.max.wait.ms设置* 2. fetch请求要拉取的分区为空* 3. 根据fetch.min.bytes的设置,有足够的数据返回* 4. 出现异常*/if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {// fetchPartitionData是一个TopicPartition -> FetchPartitionData 的map集合val fetchPartitionData = logReadResults.map { case (tp, result) =>tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,result.lastStableOffset, result.info.abortedTransactions)}// 调用响应回调函数responseCallback(fetchPartitionData)}

相关文章:

  • Kamailio dialog timeout
  • 【设计模式】创建型-建造者模式
  • oracle mysql索引区别
  • 微信小程序-页面导航
  • 流量回放平台与传统测试工具的对比分析
  • MySQL复合查询操作【 函数接口集合 | 多表查询 | 子查询 | 表的内连外连】
  • 在Github找自己想要的的项目
  • 基于VGG16使用图像特征进行迁移学习的时装推荐系统
  • 安卓手机APP开发___广播概述
  • Java反射实战指南:反射机制的终极指南
  • NeuralForecast 推理 - 从csv文件里读取数据进行推理
  • Kafka 请求处理揭秘:从入门到精通
  • 小程序vant DropdownMenu 下拉菜单无法关闭
  • 【Linux】文件
  • 探究 Cosmos Hub 作为国家行为者的可能性
  • Angular2开发踩坑系列-生产环境编译
  • Computed property XXX was assigned to but it has no setter
  • CSS 提示工具(Tooltip)
  • Golang-长连接-状态推送
  • GraphQL学习过程应该是这样的
  • jquery ajax学习笔记
  • Promise面试题2实现异步串行执行
  • SpringCloud(第 039 篇)链接Mysql数据库,通过JpaRepository编写数据库访问
  • Stream流与Lambda表达式(三) 静态工厂类Collectors
  • Swift 中的尾递归和蹦床
  • Vue.js源码(2):初探List Rendering
  • vue-router的history模式发布配置
  • 表单中readonly的input等标签,禁止光标进入(focus)的几种方式
  • 给自己的博客网站加上酷炫的初音未来音乐游戏?
  • 前端js -- this指向总结。
  • 时间复杂度与空间复杂度分析
  • 通过npm或yarn自动生成vue组件
  • 运行时添加log4j2的appender
  • CMake 入门1/5:基于阿里云 ECS搭建体验环境
  • 测评:对于写作的人来说,Markdown是你最好的朋友 ...
  • 机器人开始自主学习,是人类福祉,还是定时炸弹? ...
  • 积累各种好的链接
  • 蚂蚁金服CTO程立:真正的技术革命才刚刚开始
  • #if #elif #endif
  • #Linux(权限管理)
  • #QT(一种朴素的计算器实现方法)
  • $var=htmlencode(“‘);alert(‘2“); 的个人理解
  • (10)工业界推荐系统-小红书推荐场景及内部实践【排序模型的特征】
  • (C#)一个最简单的链表类
  • (C++17) optional的使用
  • (el-Date-Picker)操作(不使用 ts):Element-plus 中 DatePicker 组件的使用及输出想要日期格式需求的解决过程
  • (LeetCode 49)Anagrams
  • (vue)el-tabs选中最后一项后更新数据后无法展开
  • (web自动化测试+python)1
  • (八十八)VFL语言初步 - 实现布局
  • (待修改)PyG安装步骤
  • (第30天)二叉树阶段总结
  • (二)换源+apt-get基础配置+搜狗拼音
  • (翻译)Entity Framework技巧系列之七 - Tip 26 – 28
  • (五)MySQL的备份及恢复