当前位置: 首页 > news >正文

Kafka知识总结(事务+数据存储+请求模型+常见场景)

文章收录在网站:http://hardyfish.top/

文章收录在网站:http://hardyfish.top/

文章收录在网站:http://hardyfish.top/

文章收录在网站:http://hardyfish.top/

在这里插入图片描述

事务

事务Producer保证消息写入分区的原子性,即这批消息要么全部写入成功,要么全失败。此外,Producer重启回来后,kafka依然保证它们发送消息的精确一次处理。

开启enable.idempotence = true

设置Producer端参数transctional.id

数据的发送需要放在beginTransaction和commitTransaction之间。

Consumer端的代码也需要加上isolation.level参数,用以处理事务提交的数据。

producer.initTransactions();
try {producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction();
} catch (KafkaException e) {producer.abortTransaction();
}

事务Producer虽然在多分区的数据处理上保证了幂等,但是处理性能上相应的是会有一些下降的。

数据存储

Kafka 消息以 Partition 作为存储单元,每个 Topic 的消息被一个或者多个 Partition 进行管理。

  • Partition 是一个有序的,不变的消息队列,消息总是被追加到尾部。
  • 一个 Partition 不能被切分成多个散落在多个 Broker 上或者多个磁盘上。

Partition 又划分成多个 Segment 来组织数据。

Segment 在它的下面还有两个组成部分:

  • 索引文件:以 .index 后缀结尾,存储当前数据文件的索引。
  • 数据文件:以 .log 后缀结尾,存储当前索引文件名对应的数据文件。

在这里插入图片描述

请求模型

在这里插入图片描述

请求到Broker后,也会通过类似于请求转发的组件Acceptor转发到对应的工作线程上,Kafka中被称为网络线程池,一般默认每个Broker上为3个工作线程,可以通过参数 num.network.threads 进行配置。

并且采用轮询的策略,可以很均匀的将请求分发到不同的网络线程中进行处理。

但是实际的处理请求并不是由网络线程池进行处理的,而是会交给后续的IO线程池,当网络线程接受到请求的时候,会将请求写入到共享的请求队列中,而IO线程池会进行异步的处理,默认情况下是8个,可以通过 num.io.threads 进行配置。

常见场景

重复消费

consumer 在消费过程中,应用进程被强制kill掉或发生异常退出。

例如在一次poll500条消息后,消费到200条时,进程被强制kill消费到offset未提交,或出现异常退出导致消费到offset未提交。

下次重启时,依然会重新拉取500消息,造成之前消费到200条消息重复消费了两次。

消费者消费时间过长。

max.poll.interval.ms参数定义了两次poll的最大间隔,它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起 离开组 的请求,Coordinator 也会开启新一轮 Rebalance。

因为上次消费的offset未提交,再次拉取的消息是之前消费过的消息,造成重复消费。

提高消费能力,提高单条消息的处理速度;根据实际场景max.poll.interval.ms值设置大一点,避免不必要的rebalance;

可适当减小max.poll.records的值,默认值是500,可根据实际消息速率适当调小。

消息丢失

消费者程序丢失数据

Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移

假如某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。

最佳配置:

不要使用 producer.send(msg),而要使用 producer.send(msg, callback)

设置 acks = all:

  • 设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是 已提交。

设置 retries 为一个较大的值。

  • 当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

设置 unclean.leader.election.enable = false

设置 replication.factor >= 3

  • 防止消息丢失的主要机制就是冗余。

设置 min.insync.replicas > 1

  • 控制的是消息至少要被写入到多少个副本才算是 已提交 。
  • 设置成大于 1 可以提升消息持久性。
  • 在实际环境中千万不要使用默认值 1。

确保 replication.factor > min.insync.replicas

  • 如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。

确保消息消费完成再提交。

  • Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。

消息顺序

乱序场景一

因为一个topic可以有多个partition,kafka只能保证partition内部有序。

1、可以设置topic 有且只有一个partition。

2、根据业务需要,需要顺序的指定为同一个partition。

乱序场景二

对于同一业务进入了同一个消费者组之后,用了多线程来处理消息,会导致消息的乱序。

消费者内部根据线程数量创建等量的内存队列,对于需要顺序的一系列业务数据,根据key或者业务数据,放到同一个内存队列中,然后线程从对应的内存队列中取出并操作。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 《Java初阶数据结构》----6.<优先级队列之PriorityQueue底层:堆>
  • USB3.0的等长要求到底是多少?
  • Unity 物理动画:利用物理引擎创造逼真动作
  • Python面试整理-常用标准库
  • PHP反序列化漏洞
  • 将手机作为服务器运行docker服务
  • 了解ChatGPT API
  • leetcode面试题17.最大子矩阵
  • Windows系统安全加固方案:快速上手系统加固指南 (下)
  • c#自动关闭 MessageBox 弹出的窗口
  • 【Pytorch】当num_workers > 0时,程序卡住
  • Vue2.
  • MySQL查询优化:提升数据库性能的策略
  • 如何查找下载安装安卓APK历史版本?
  • CUDA编程之grid和block详解
  • 《Java编程思想》读书笔记-对象导论
  • 【前端学习】-粗谈选择器
  • Apache的80端口被占用以及访问时报错403
  • axios 和 cookie 的那些事
  • CentOS6 编译安装 redis-3.2.3
  • co.js - 让异步代码同步化
  • CODING 缺陷管理功能正式开始公测
  • CSS 专业技巧
  • es6要点
  • javascript数组去重/查找/插入/删除
  • JavaScript新鲜事·第5期
  • Java教程_软件开发基础
  • Mysql5.6主从复制
  • Redis 中的布隆过滤器
  • Spring Boot MyBatis配置多种数据库
  • Spring Cloud Feign的两种使用姿势
  • 排序算法学习笔记
  • 配置 PM2 实现代码自动发布
  • 深度学习在携程攻略社区的应用
  • 一个6年java程序员的工作感悟,写给还在迷茫的你
  • 一起来学SpringBoot | 第十篇:使用Spring Cache集成Redis
  • 用jquery写贪吃蛇
  • [Shell 脚本] 备份网站文件至OSS服务(纯shell脚本无sdk) ...
  • ionic异常记录
  • shell使用lftp连接ftp和sftp,并可以指定私钥
  • zabbix3.2监控linux磁盘IO
  • ​520就是要宠粉,你的心头书我买单
  • # 深度解析 Socket 与 WebSocket:原理、区别与应用
  • (17)Hive ——MR任务的map与reduce个数由什么决定?
  • (Java)【深基9.例1】选举学生会
  • (JSP)EL——优化登录界面,获取对象,获取数据
  • (leetcode学习)236. 二叉树的最近公共祖先
  • (Redis使用系列) Springboot 使用Redis+Session实现Session共享 ,简单的单点登录 五
  • (ZT)一个美国文科博士的YardLife
  • (笔记)Kotlin——Android封装ViewBinding之二 优化
  • (二)c52学习之旅-简单了解单片机
  • (二)延时任务篇——通过redis的key监听,实现延迟任务实战
  • (附源码)ssm学生管理系统 毕业设计 141543
  • (接口自动化)Python3操作MySQL数据库
  • (五)网络优化与超参数选择--九五小庞