当前位置: 首页 > news >正文

Kafka系列之如何提高消费者消费速度

前言

在实际开发过程中,如果使用Kafka处理超大数据量(千万级、亿级)的场景,Kafka消费者的消费速度可能决定系统性能瓶颈。

实现方案

为了提高消费者的消费速度,我们可以采取以下措施:

  • 将主题的分区数量增大,如 20,通过concurrency将消费者的消费线程数增大到 10(2个pod),提高消息处理的并发能力。
  • 将每次批量拉取消息的数量max.poll.records增大到 500,提高单次处理消息的数量。
  • 将消息切分成批次,将单个批次的数据处理业务逻辑放进线程池中异步进行,提高并发处理消息的速度。
  • 将异步线程池的拒绝模式调整为 CallerRunsPolicy,这个配置非常重要。当线程池的任务队列已满且所有线程都在忙碌时,新的任务将由提交任务的线程(即调用者线程)来执行。否则在消息量特别大的情况下,很可能会因为线程池任务队列满了而丢失数据。
  • 将异步线程池的队列容量设置为 0,这样意味着所有任务必须立即由线程池中的线程来处理,减少在队列中的等待时间。
  • 在数据上报的时候进行幂等性验证,防止重复上报数据。
@Component
public class OrderConsumer {@Resource(name = "execThreadPool")private ThreadPoolTaskExecutor execThreadPool;@KafkaListener(id = "record_consumer",topics = "record",groupId = "g_record_consumer",concurrency = "10",properties = {"max.poll.interval.ms:300000", "max.poll.records:500"})public void consume(ConsumerRecords<String, String> records, Acknowledgment ack) {execThreadPool.submit(()-> {// 业务逻辑});ack.acknowledge();}}

ThreadPoolTaskExecutor 是 Spring 框架提供的一个线程池实现,用于管理和执行多线程任务。它是 TaskExecutor 接口的实现,提供了在 Spring 应用程序中创建和配置线程池的便捷方式。

ThreadPoolTaskExecutor主要特点:

  • 线程池配置: ThreadPoolTaskExecutor 允许你配置核心线程数、最大线程数、队列容量等线程池属性。

  • 线程创建和销毁: 它会根据任务的需求自动创建和销毁线程,避免不必要的线程创建和销毁开销。

  • 线程复用: 线程池中的线程可以被复用,从而减少线程创建的开销。

  • 队列管理: 当线程池达到最大线程数时,新任务会被放入队列中等待执行。

  • 拒绝策略: 当线程池已满并且队列也已满时,可以配置拒绝策略来处理新任务的方式。
    RejectedExecutionHandler 是 Java 线程池的一个重要接口,用于定义当线程池已满并且无法接受新任务时,如何处理被拒绝的任务。当线程池的队列和线程都已满,新任务就会被拒绝执行,这时就会使用 RejectedExecutionHandler 来处理这些被拒绝的任务。
    在 Java 中,有几种内置的 RejectedExecutionHandler 实现可供选择,每种实现都有不同的拒绝策略:
    AbortPolicy(默认策略): 这是默认的拒绝策略,它会抛出一个 RejectedExecutionException 异常,表示任务被拒绝执行。
    CallerRunsPolicy: 当线程池已满时,将任务返回给提交任务的调用者(Caller)。这意味着提交任务的线程会尝试执行被拒绝的任务。
    DiscardPolicy: 这个策略会默默地丢弃被拒绝的任务,不会产生任何异常。
    DiscardOldestPolicy: 这个策略会丢弃队列中最老的任务,然后尝试将新任务添加到队列中。

    除了这些内置的策略,你还可以实现自定义的 RejectedExecutionHandler 接口,以定义特定于你应用程序需求的拒绝策略。你可以根据业务需求来决定拒绝策略,比如记录日志、通知管理员、重试等。

@Configuration
public class ThreadPoolConfig {@Beanprivate ThreadPoolTaskExecutor execThreadPool() {ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();pool.setCorePoolSize(50);  // 核心线程数pool.setMaxPoolSize(10000);  // 最大线程数pool.setQueueCapacity(0);  // 等待队列sizepool.setKeepAliveSeconds(60);  // 线程最大空闲存活时间pool.setWaitForTasksToCompleteOnShutdown(true);pool.setAwaitTerminationSeconds(60);  // 程序shutdown时最多等60秒钟让现存任务结束pool.setRejectedExecutionHandler(new CallerRunsPolicy());  // 拒绝策略return pool;}
}

通过以上方案,我们可以提高消费侧的TPS,同时杜绝重复上报的现象,极大提高数据准确性和用户体验。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Java高频面试题分享
  • git 学习总结
  • pytest结合allure-pytest插件生成测试报告
  • vue3项目报错集合
  • go语言map底层及扩容机制原理详解(下)
  • Cocos Creator 小游戏案例
  • flask 开始
  • Docker(十)-Docker运行elasticsearch7.4.2容器实例以及分词器相关的配置
  • linux系统iptable防火墙开放指定ip及端口
  • 香橙派orangepi系统没有apt,也没有apt-get,也没有yum命令,找不到apt、apt-get、yum的Linux系统
  • CNCKAD激光切割软件
  • 【Ant Design Pro】快速上手
  • JavaWeb学习——请求响应、分层解耦
  • 昇思25天学习打卡营第22天|CycleGAN图像风格迁移互换
  • 代码随想录 day 25 回溯
  • @angular/forms 源码解析之双向绑定
  • 《Java8实战》-第四章读书笔记(引入流Stream)
  • angular2 简述
  • angular学习第一篇-----环境搭建
  • java概述
  • Java基本数据类型之Number
  • Java知识点总结(JavaIO-打印流)
  • LeetCode18.四数之和 JavaScript
  • PAT A1120
  • Perseus-BERT——业内性能极致优化的BERT训练方案
  • python 装饰器(一)
  • 从 Android Sample ApiDemos 中学习 android.animation API 的用法
  • 全栈开发——Linux
  • 译自由幺半群
  • 怎么把视频里的音乐提取出来
  • ​补​充​经​纬​恒​润​一​面​
  • ​软考-高级-信息系统项目管理师教程 第四版【第19章-配置与变更管理-思维导图】​
  • !!Dom4j 学习笔记
  • ![CDATA[ ]] 是什么东东
  • # 深度解析 Socket 与 WebSocket:原理、区别与应用
  • #NOIP 2014#day.2 T1 无限网络发射器选址
  • (Java数据结构)ArrayList
  • (二十三)Flask之高频面试点
  • (附源码)springboot车辆管理系统 毕业设计 031034
  • (每日一问)设计模式:设计模式的原则与分类——如何提升代码质量?
  • (三)elasticsearch 源码之启动流程分析
  • (十) 初识 Docker file
  • (数据大屏)(Hadoop)基于SSM框架的学院校友管理系统的设计与实现+文档
  • (转)nsfocus-绿盟科技笔试题目
  • .bat批处理(九):替换带有等号=的字符串的子串
  • .htaccess 强制https 单独排除某个目录
  • .Net FrameWork总结
  • .net mvc 获取url中controller和action
  • .net开发日常笔记(持续更新)
  • .NET中分布式服务
  • .xml 下拉列表_RecyclerView嵌套recyclerview实现二级下拉列表,包含自定义IOS对话框...
  • ::前边啥也没有
  • @Bean, @Component, @Configuration简析
  • @modelattribute注解用postman测试怎么传参_接口测试之问题挖掘
  • [ CTF ] WriteUp-2022年春秋杯网络安全联赛-冬季赛