当前位置: 首页 > news >正文

Kafka【九】如何实现数据的幂等性操作

为了解决Kafka传输数据时,所产生的数据重复和乱序问题,Kafka引入了幂等性操作,所谓的幂等性,就是Producer同样的一条数据,无论向Kafka发送多少次,kafka都只会存储一条。注意,这里的同样的一条数据,指的不是内容一致的数据,而是指的不断重试的数据

默认幂等性是不起作用的,所以如果想要使用幂等性操作,只需要在生产者对象的配置中开启幂等性配置即可。

配置项配置值说明
enable.idempotencetrue开启幂等性
max.in.flight.requests.per.connection小于等于5每个连接的在途请求数,不能大于5,取值范围为[1,5]
acksall(-1)确认应答,固定值,不能修改
retries>0重试次数,推荐使用Int最大值

【1】kafka实现幂等性的流程

① 数据增加唯一性标识

开启幂等性后,为了保证数据不会重复,那么就需要给每一个请求批次的数据增加唯一性标识。kafka中,这个标识采用的是连续的序列号数字sequencenum。但是不同的生产者Producer可能序列号是一样的,仅仅靠seqnum还无法唯一标记数据,所以还需要同时对生产者进行区分。

Kafka采用申请生产者ID(producerid)的方式对生产者进行区分。在发送数据前,我们就需要提前申请producerid以及序列号sequencenum

在这里插入图片描述

② 记录生产者的生产状态

Broker中会给每一个分区记录生产者的生产状态:采用队列的方式缓存最近的5个批次数据。队列中的数据按照seqnum进行升序排列。这里的数字5是经过压力测试,均衡空间效率和时间效率所得到的值,所以为固定值,无法配置且不能修改。

在这里插入图片描述

③ 判重

判断Borker当前新的请求批次数据在缓存的5个旧的批次中是否存在相同的,如果有相同的,那么说明有重复,当前批次数据不做任何处理。

在这里插入图片描述

④ 判断序列号是否连续

如果Broker当前的请求批次数据在缓存中没有相同的,那么判断当前新的请求批次的序列号是否为缓存的最后一个批次的序列号加1:

  • 如果是,说明是连续的,顺序没乱,那么继续。
  • 如果不是,那么说明数据已经乱了,发生异常。

在这里插入图片描述

⑤ 重试

Broker根据异常返回响应,通知Producer进行重试。Producer重试前,需要在缓冲区中将数据重新排序,保证正确的顺序后再进行重试即可。

⑥ 更新数据

如果请求批次不重复,且有序,那么更新缓冲区中的批次数据。将当前的批次放置再队列的结尾,将队列的第一个移除,保证队列中缓冲的数据最多5个。

在这里插入图片描述

⑦ 缺陷

从上面的流程可以看出,Kafka的幂等性是通过消耗时间和性能的方式提升了数据传输的有序和去重,在一些对数据敏感的业务中是十分重要的。但是这种幂等性还是有缺陷的:

  • 幂等性的producer仅做到单分区上的幂等性,即单分区消息有序不重复,多分区无法保证幂等性。
  • 只能保持生产者单个会话的幂等性,无法实现跨会话的幂等性,也就是说如果一个producer挂掉再重启,那么重启前和重启后的producer对象会被当成两个独立的生产者,从而获取两个不同的独立的生产者ID,导致broker端无法获取之前的状态信息,所以无法实现跨会话的幂等。要想解决这个问题,可以采用后续的事务功能。

【2】跨会话的幂等性

对于幂等性的缺陷,kafka可以采用事务的方式解决跨会话的幂等性。基本的原理就是通过事务功能管理生产者ID,保证事务开启后,生产者对象总能获取一致的生产者ID。

为了实现事务,Kafka引入了事务协调器(TransactionCoodinator)负责事务的处理,所有的事务逻辑包括分派PID等都是由TransactionCoodinator负责实施的。TransactionCoodinator 会将事务状态持久化到该主题中。

事务基本的实现思路就是通过配置的事务ID,将生产者ID进行绑定,然后存储在Kafka专门管理事务的内部主题 __transaction_state中,而内部主题的操作是由事务协调器(TransactionCoodinator)对象完成的,这个协调器对象有点类似于咱们数据发送时的那个副本Leader。

其实这种设计是很巧妙的,因为kafka将事务ID和生产者ID看成了消息数据,然后将数据发送到一个内部主题中。这样,使用事务处理的流程和咱们自己发送数据的流程是很像的。

接下来,我们就把这两个流程简单做一个对比。

① 普通数据发生流程

在这里插入图片描述

② 事务数据发送流程

在这里插入图片描述

通过两张图可以看到,基本的事务操作和数据操作是很像的。不过要注意,我们这里只是简单对比了数据发送的过程,其实它们的区别还在于数据发送后的提交过程。普通的数据操作,只要数据写入了日志,那么对于消费者来讲。数据就可以读取到了,但是事务操作中,如果数据写入了日志,但是没有提交的话,其实数据默认情况下也是不能被消费者看到的。只有提交后才能看见数据。

更为详细的可以参考下图:

在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • SQL各子句的执行顺序
  • 【Jupyter Notebook】汉化
  • 简单的springboot log4j2日志配置
  • 如何在实际应用中优化AI大模型性能
  • 学习大模型最佳书籍推荐:NUS尤洋教授所著新书《实战AI大模型》,得到李开复、颜水成、周鸿祎大牛鼎力推荐
  • 如何通过商品id商品链接来获取淘宝商品主图详情图等数据?
  • 828华为云征文 | 基于Docker与Jenkins实现自动化部署
  • 大模型推理--KV Cache
  • SCL 常见问题
  • 异常整理(JAVA基础)
  • 【C++】STL容器详解【上】
  • Java——堆
  • 路灯集中控制器与智慧照明:塑造未来城市的智能光影
  • 亦菲喊你来学机器学习(20) --PCA数据降维
  • 江协科技stm32————11-5 硬件SPI读写W25Q64
  • (十五)java多线程之并发集合ArrayBlockingQueue
  • Android 控件背景颜色处理
  • CAP理论的例子讲解
  • Docker: 容器互访的三种方式
  • Fastjson的基本使用方法大全
  • iOS 系统授权开发
  • Lsb图片隐写
  • Object.assign方法不能实现深复制
  • Redis 懒删除(lazy free)简史
  • spring boot 整合mybatis 无法输出sql的问题
  • Vue.js-Day01
  • windows下如何用phpstorm同步测试服务器
  • 持续集成与持续部署宝典Part 2:创建持续集成流水线
  • 动态规划入门(以爬楼梯为例)
  • 湖南卫视:中国白领因网络偷菜成当代最寂寞的人?
  • 老板让我十分钟上手nx-admin
  • 我这样减少了26.5M Java内存!
  • 小程序上传图片到七牛云(支持多张上传,预览,删除)
  • 一些css基础学习笔记
  • 走向全栈之MongoDB的使用
  • 【运维趟坑回忆录 开篇】初入初创, 一脸懵
  • 关于Kubernetes Dashboard漏洞CVE-2018-18264的修复公告
  • 好程序员大数据教程Hadoop全分布安装(非HA)
  • ​LeetCode解法汇总2696. 删除子串后的字符串最小长度
  • $$$$GB2312-80区位编码表$$$$
  • $LayoutParams cannot be cast to android.widget.RelativeLayout$LayoutParams
  • (2015)JS ES6 必知的十个 特性
  • (35)远程识别(又称无人机识别)(二)
  • (4) openssl rsa/pkey(查看私钥、从私钥中提取公钥、查看公钥)
  • (4)Elastix图像配准:3D图像
  • (bean配置类的注解开发)学习Spring的第十三天
  • (delphi11最新学习资料) Object Pascal 学习笔记---第7章第3节(封装和窗体)
  • (DenseNet)Densely Connected Convolutional Networks--Gao Huang
  • (分享)自己整理的一些简单awk实用语句
  • (附源码)计算机毕业设计ssm电影分享网站
  • (全部习题答案)研究生英语读写教程基础级教师用书PDF|| 研究生英语读写教程提高级教师用书PDF
  • (算法)硬币问题
  • (转)eclipse内存溢出设置 -Xms212m -Xmx804m -XX:PermSize=250M -XX:MaxPermSize=356m
  • (转)visual stdio 书签功能介绍
  • (自用)仿写程序