当前位置: 首页 > news >正文

Kafka消息积压的典型场景及解决方案

Kafka消息积压的典型场景:

1.实时/消费任务挂掉

比如,我们写的实时应用因为某种原因挂掉了,并且这个任务没有被监控程序监控发现通知相关负责人,负责人又没有写自动拉起任务的脚本进行重启。
那么在我们重新启动这个实时应用进行消费之前,这段时间的消息就会被滞后处理,如果数据量很大,可就不是简单重启应用直接消费就能解决的。

>>>>  解决方案1: 

方案简述: 任务重新启动后直接消费最新的消息,对于"滞后"的历史数据采用离线程序进行"补漏"。

>>>>   具体操作1 :  消费者从最新的地方进行消费

  • 1、偏移量设置  默认位置是在 zookeeper 中设置的 
  • 2、设置消费者参数  auto.offset.reset  = latest 从最新消息开始消费
  • 3、给启动的消费组设置 动态的组ID=> 每次启动消费者重新生成一个随机的组ID保证从最新消息开始消费
  • 4、任务重启手动设置偏移量到最新的位置  。这可以通过查询Kafka的消费者API来完成。


>>>>  具体操作2 :   针对滞后的数据如何查漏补缺


Apache Kafka Connect:通过Kafka Connect可以快速实现大量数据进出Kafka,从而与其他源数据源或目标数据源进行交互。


Kafka ETL:这是一个开源的ETL工具,专门为处理Kafka数据而设计。它提供了一个简单易用的界面,使数据抽取、转换和加载变得更加容易。


Talend:这是一款功能强大的ETL工具,支持处理Kafka数据。它提供了可视化的界面,使您可以轻松地设计、部署和管理ETL作业。


Apache NiFi:这是一个用于数据集成和处理的开源软件框架,它提供了一个可视化的界面来设计ETL流程,并支持Kafka作为数据源和目标。


Logstash:这是一个开源的数据收集引擎,可以用来处理和转换Kafka数据。它提供了强大的过滤器和转换器,使您可以轻松地实现数据的抽取、转换和加载。

>>>>  解决方案2:(常用) 


方案简述: 创建新的topic并配置更多数量的分区,将积压消息的topic消费者逻辑改为直接把消息打入新的topic,将消费逻辑写在新的topic的消费者中。

-- 第一步 迁移topic数据  --> 历史数据迁移到新的Topic上
-- 第二部 使用新的消费者消费新的Topic 数据  

此方案不会缺失数据(新的消费者消费能力可以与原消费者保持一致)前提是允许延时处理一段时间才可以应用此方案


1. 创建新topic:首先,您需要创建一个新的topic。您可以使用Kafka提供的命令行工具或Kafka管理界面来创建新topic。确保新topic的partition数量与旧topic相同或更多。
2. 调整消费者配置:创建一个新的消费者,用于消费旧topic的数据。您需要设置消费者的配置,以便将数据从旧topic消费到新topic。这可以通过设置auto.offset.reset和topic.config参数来实现。
o auto.offset.reset参数用于指定消费者从哪个offset开始消费数据。您可以将其设置为earliest,以便从旧topic的开始处消费数据。
o topic.config参数可以用于设置新topic的配置。例如,您可以设置retention.ms参数来控制新topic的日志保留时间。
3. 复制数据:使用新消费者消费旧topic的数据,并将其写入新topic。确保在写入新topic时,消息的key和value与旧topic中的消息一致。这样,消息的顺序和偏移量将保持不变。
4. 验证数据:完成数据转移后,验证新topic中的数据是否与旧topic中的数据一致。您可以使用Kafka提供的工具或编写自定义脚本进行验证。
5. 删除旧topic:一旦您验证了新topic中的数据,并且确认可以删除旧topic,请将其删除以释放存储空间。请注意,在删除旧topic之前,确保所有数据都已成功迁移到新topic中。

2.Kafka分区数设置的不合理(太少)和消费者"消费能力"不足

Kafka单分区生产消息的速度qps通常很高,如果消费者因为某些原因(比如受业务逻辑复杂度影响,消费时间会有所不同),就会出现消费滞后的情况。
此外,Kafka分区数是Kafka并行度调优的最小单元,如果Kafka分区数设置的太少,会影响Kafka consumer消费的吞吐量。

>>>>   不同的情况分析及方案


1、kafka 消费能力不足: 
如果是Kafka消费能力不足,则可以考虑增加 topic 的 partition 的个数,同时提升消费者组的消费者数量,消费数 = 分区数 (二者缺一不可)
2、程序消费能力不足:
(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。则提高每批次拉取的数量。
如果数据量很大,合理的增加Kafka分区数是关键。如果利用的是Spark流和Kafka direct approach方式,也可以对KafkaRDD进行repartition重分区,增加并行度处理。
3.Kafka消息的key不均匀,导致分区间数据不均衡

在使用Kafka producer消息时,可以为消息指定key,但是要求key要均匀,否则会出现Kafka分区间数据不均衡。可以在Kafka producer处,给key加随机后缀,使其均衡。

4、kafka数据有过期时间,一些数据就丢失了,主要是消费不及时 (消费不及时)

产生消息堆积,消费不及时,kafka数据有过期时间,一些数据就丢失了,主要是消费不及时
 
>>> 实战经验总结

  • 1、消费kafka消息时,应该尽量减少每次消费时间,可通过减少调用三方接口、读库等操作, 从而减少消息堆积的可能性。
  • 2、如果消息来不及消费,可以先存在数据库中,然后逐条消费  (还可以保存消费记录,方便定位问题)
  • 3、每次接受kafka消息时,先打印出日志,包括消息产生的时间戳。
  • 4、kafka消息保留时间(修改kafka配置文件, 默认一周)
  • 5、任务启动从上次提交offset处开始消费处理


 

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • python办公自动化:使用`Python-PPTX` 嵌入媒体文件
  • 智谱发布新一代基座模型
  • es、kibana及分词器的安装
  • 冲刺蓝桥杯第三章字符串
  • C语言通用函数 - 判断ip是否合法
  • 简述Glide的源码
  • 自动化脚本到LabVIEW转换
  • Django+Vue花卉商城系统的设计与实现
  • day04-面向对象-常用API时间Arrays
  • Django+Vue音乐推荐系统的设计与实现
  • 如何在C语言中使用pthread库实现多线程编程
  • kafak集群搭建-基于kRaft方式
  • 【MySQL-24】万字全面解析<索引>——【介绍&语法&性能分析&使用规则】
  • USER_CLOCK_ROOT
  • 解构赋值的理解
  • 〔开发系列〕一次关于小程序开发的深度总结
  • Hibernate【inverse和cascade属性】知识要点
  • Java 23种设计模式 之单例模式 7种实现方式
  • JavaScript学习总结——原型
  • Laravel Mix运行时关于es2015报错解决方案
  • Linux CTF 逆向入门
  • python docx文档转html页面
  • SpiderData 2019年2月25日 DApp数据排行榜
  • SpringBoot几种定时任务的实现方式
  • 第2章 网络文档
  • 给初学者:JavaScript 中数组操作注意点
  • 模型微调
  • 算法系列——算法入门之递归分而治之思想的实现
  • 推荐一个React的管理后台框架
  • 最简单的无缝轮播
  • scrapy中间件源码分析及常用中间件大全
  • 宾利慕尚创始人典藏版国内首秀,2025年前实现全系车型电动化 | 2019上海车展 ...
  • #mysql 8.0 踩坑日记
  • (2024)docker-compose实战 (8)部署LAMP项目(最终版)
  • (C11) 泛型表达式
  • (ibm)Java 语言的 XPath API
  • (PyTorch)TCN和RNN/LSTM/GRU结合实现时间序列预测
  • (不用互三)AI绘画工具应该如何选择
  • (附源码)springboot 智能停车场系统 毕业设计065415
  • (附源码)springboot“微印象”在线打印预约系统 毕业设计 061642
  • (附源码)springboot码头作业管理系统 毕业设计 341654
  • (附源码)ssm旅游企业财务管理系统 毕业设计 102100
  • (附源码)ssm失物招领系统 毕业设计 182317
  • (七)微服务分布式云架构spring cloud - common-service 项目构建过程
  • (一)项目实践-利用Appdesigner制作目标跟踪仿真软件
  • (游戏设计草稿) 《外卖员模拟器》 (3D 科幻 角色扮演 开放世界 AI VR)
  • (转) RFS+AutoItLibrary测试web对话框
  • *1 计算机基础和操作系统基础及几大协议
  • .java 9 找不到符号_java找不到符号
  • .L0CK3D来袭:如何保护您的数据免受致命攻击
  • .NET CF命令行调试器MDbg入门(四) Attaching to Processes
  • .net Signalr 使用笔记
  • .NET WPF 抖动动画
  • .net6 webapi log4net完整配置使用流程
  • .NET教程 - 字符串 编码 正则表达式(String Encoding Regular Express)