当前位置: 首页 > news >正文

kafka源码阅读-ReplicaStateMachine(副本状态机)解析

概述

Kafka源码包含多个模块,每个模块负责不同的功能。以下是一些核心模块及其功能的概述:

  1. 服务端源码 :实现Kafka Broker的核心功能,包括日志存储、控制器、协调器、元数据管理及状态机管理、延迟机制、消费者组管理、高并发网络架构模型实现等。

  2. Java客户端源码 :实现了Producer和Consumer与Broker的交互机制,以及通用组件支撑代码。

  3. Connect源码 :用来构建异构数据双向流式同步服务。

  4. Stream源码 :用来实现实时流处理相关功能。

  5. Raft源码 :实现了Raft一致性协议。

  6. Admin模块 :Kafka的管理员模块,操作和管理其topic,partition相关,包含创建,删除topic,或者拓展分区等。

  7. Api模块 :负责数据交互,客户端与服务端交互数据的编码与解码。

  8. Client模块 :包含Producer读取Kafka Broker元数据信息的类,如topic和分区,以及leader。

  9. Cluster模块 :包含Broker、Cluster、Partition、Replica等实体类。

  10. Common模块 :包含各种异常类以及错误验证。

  11. Consumer模块 :消费者处理模块,负责客户端消费者数据和逻辑处理。

  12. Controller模块 :负责中央控制器的选举,分区的Leader选举,Replica的分配或重新分配,分区和副本的扩容等。

  13. Coordinator模块 :负责管理部分consumer group和他们的offset。

  14. Javaapi模块 :提供Java语言的Producer和Consumer的API接口。

  15. Log模块 :负责Kafka文件存储,读写所有Topic消息数据。

  16. Message模块 :封装多条数据组成数据集或压缩数据集。

  17. Metrics模块 :负责内部状态监控。

  18. Network模块 :处理客户端连接,网络事件模块。

  19. Producer模块 :生产者细节实现,包括同步和异步消息发送。

  20. Security模块 :负责Kafka的安全验证和管理。

  21. Serializer模块 :序列化和反序列化消息内容。

  22. Server模块 :涉及Leader和Offset的checkpoint,动态配置,延时创建和删除Topic,Leader选举,Admin和Replica管理等。

  23. Tools模块 :包含多种工具,如导出consumer offset值,LogSegments信息,Topic的log位置信息,Zookeeper上的offset值等。

  24. Utils模块 :包含各种工具类,如Json,ZkUtils,线程池工具类,KafkaScheduler公共调度器类等。

这些模块共同构成了Kafka的整体架构,使其能够提供高吞吐量、高可用性的消息队列服务。

kafka源码分支为1.0.2

分区状态机记录着当前集群所有 Partition 的状态信息以及如何对 Partition 状态转移进行相应的处理;副本状态机则是记录着当前集群所有 Replica 的状态信息以及如何对 Replica 状态转变进行相应的处理。

kafkaController初始化时,会启动replicaStateMachine和partitionStateMachine:

    //在 KafkaController 中//有两个状态机:分区状态机和副本状态机;//一个管理器:Channel 管理器,负责管理所有的 Broker 通信;//相关缓存:Partition 信息、Topic 信息、broker id 信息等;//四种 leader 选举机制:分别是用 leader offline、broker 掉线、partition reassign、最优 leader 选举时触发;//启动副本状态机,初始化所有 Replica 的状态信息,如果 Replica 所在节点是 alive 的,那么状态更新为 OnlineReplica, 否则更新为 ReplicaDeletionIneligible;replicaStateMachine.startup()//启动分区状态机,初始化所有 Partition 的状态信息,如果 leader 所在 broker 是 alive 的,那么状态更新为 OnlinePartition,否则更新为 OfflinePartitionpartitionStateMachine.startup()

ReplicaStateMachine类相关方法:

  /*** Invoked on successful controller election. First registers a broker change listener since that triggers all* state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.* Then triggers the OnlineReplica state change for all replicas.*/def startup() {// //初始化所有副本的状态信息initializeReplicaState()//将online的replica状态转变为OnlineReplicahandleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)info("Started replica state machine with initial state -> " + replicaState.toString())}/*** Invoked on startup of the replica's state machine to set the initial state for replicas of all existing partitions* in zookeeper*///初始化所有副本的状态信息// 这里只是将 Replica 的状态信息更新副本状态机的缓存 replicaState 中,并没有真正进行状态转移的操作。private def initializeReplicaState() {for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {val topic = topicPartition.topicval partition = topicPartition.partitionassignedReplicas.foreach { replicaId =>val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)//如果 Replica 所在机器是 alive 的,那么将其状态设置为 OnlineReplica//replicaId即brokerIdif (controllerContext.isReplicaOnline(replicaId, topicPartition))replicaState.put(partitionAndReplica, OnlineReplica)else {// mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.// This is required during controller failover since during controller failover a broker can go down,// so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.//否则设置为 ReplicaDeletionIneligible 状态replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)}}}}/*** This API is invoked by the broker change controller callbacks and the startup API of the state machine* @param replicas     The list of replicas (brokers) that need to be transitioned to the target state* @param targetState  The state that the replicas should be moved to* The controller's allLeaders cache should have been updated before this*///用于处理 Replica 状态的变化def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,callbacks: Callbacks = (new CallbackBuilder).build) {if (replicas.nonEmpty) {info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))try {brokerRequestBatch.newBatch()//状态转变replicas.foreach(r => handleStateChange(r, targetState, callbacks))//向 broker 发送相应请求brokerRequestBatch.sendRequestsToBrokers(controller.epoch)} catch {case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)}}}/*** This API exercises the replica's state machine. It ensures that every state transition happens from a legal* previous state to the target state. Valid state transitions are:* NonExistentReplica --> NewReplica* --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the*   partition to every live broker** NewReplica -> OnlineReplica* --add the new replica to the assigned replica list if needed** OnlineReplica,OfflineReplica -> OnlineReplica* --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the*   partition to every live broker** NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible -> OfflineReplica* --send StopReplicaRequest to the replica (w/o deletion)* --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and*   UpdateMetadata request for the partition to every live broker.** OfflineReplica -> ReplicaDeletionStarted* --send StopReplicaRequest to the replica (with deletion)** ReplicaDeletionStarted -> ReplicaDeletionSuccessful* -- mark the state of the replica in the state machine** ReplicaDeletionStarted -> ReplicaDeletionIneligible* -- mark the state of the replica in the state machine** ReplicaDeletionSuccessful -> NonExistentReplica* -- remove the replica from the in memory partition replica assignment cache* @param partitionAndReplica The replica for which the state transition is invoked* @param targetState The end state that the replica should be moved to*/def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,callbacks: Callbacks) {val topic = partitionAndReplica.topicval partition = partitionAndReplica.partitionval replicaId = partitionAndReplica.replicaval topicAndPartition = TopicAndPartition(topic, partition)// Replica 不存在的话,状态初始化为 NonExistentReplicaval currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)val stateChangeLog = stateChangeLogger.withControllerEpoch(controller.epoch)try {def logStateChange(): Unit =stateChangeLog.trace(s"Changed state of replica $replicaId for partition $topicAndPartition from " +s"$currState to $targetState")val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)//校验状态转变是否符合要求assertValidTransition(partitionAndReplica, targetState)targetState match {case NewReplica => 其前置状态只能为 NonExistentReplica// start replica as a follower to the current leader for its partition//从 zk 获取 Partition 的 leaderAndIsr 信息val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)leaderIsrAndControllerEpochOpt match {case Some(leaderIsrAndControllerEpoch) =>//若是leader的replica状态不能变为NewReplicaif(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)throw new StateChangeFailedException(s"Replica $replicaId for partition $topicAndPartition cannot " +s"be moved to NewReplica state as it is being requested to become leader")//向该 replicaId 发送 LeaderAndIsr 请求,这个方法同时也会向所有的 broker 发送 updateMeta 请求brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),topic, partition, leaderIsrAndControllerEpoch,replicaAssignment, isNew = true)//对于新建的 Partition,处于这个状态时,该 Partition 是没有相应的 LeaderAndIsr 信息的case None => // new leader request will be sent to this replica when one gets elected}//将该 Replica 的状态转移成 NewReplica,然后结束流程。replicaState.put(partitionAndReplica, NewReplica)logStateChange()case ReplicaDeletionStarted => //其前置状态只能为 OfflineReplica//更新向该 Replica 的状态为 ReplicaDeletionStarted;replicaState.put(partitionAndReplica, ReplicaDeletionStarted)// send stop replica command//发送 StopReplica 请求给该副本,并设置 deletePartition=true//broker收到这请求后,会从物理存储上删除这个 Replica 的数据内容brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,callbacks.stopReplicaResponseCallback)logStateChange()case ReplicaDeletionIneligible => //其前置状态只能为 ReplicaDeletionStartedreplicaState.put(partitionAndReplica, ReplicaDeletionIneligible)logStateChange()case ReplicaDeletionSuccessful => //其前置状态只能为 ReplicaDeletionStartedreplicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)logStateChange()case NonExistentReplica => //其前置状态只能为 ReplicaDeletionSuccessful。// NonExistentReplica 是副本完全删除、不存在这个副本的状态// remove this replica from the assigned replicas list for its partition//在 controller 的 partitionReplicaAssignment 删除这个 Partition 对应的 replica 信息;val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))//将这个 Topic 从缓存中删除。replicaState.remove(partitionAndReplica)logStateChange()case OnlineReplica =>//其前置状态只能为 NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible//副本正常工作时的状态,此时的 Replica 既可以作为 leader 也可以作为 followerreplicaState(partitionAndReplica) match {case NewReplica => //其前置状态如果为 NewReplica// add this replica to the assigned replicas list for its partition//从 Controller 的 partitionReplicaAssignment 中获取这个 Partition 的 AR;val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)//如果 Replica 不在 AR 中的话,那么就将其添加到 Partition 的 AR 中;if(!currentAssignedReplicas.contains(replicaId))controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)logStateChange()case _ => //其前置状态如果为:OnlineReplica, OfflineReplica, ReplicaDeletionIneligible// check if the leader for this partition ever existed//如果该 Partition 的 LeaderIsrAndControllerEpoch 信息存在,那么就更新副本的状态,并发送相应的请求//否则不做任何处理;controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {case Some(leaderIsrAndControllerEpoch) =>brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,replicaAssignment)replicaState.put(partitionAndReplica, OnlineReplica)logStateChange()case None => // that means the partition was never in OnlinePartition state, this means the broker never// started a log for that partition and does not have a high watermark value for this partition}}//最后将 Replica 的状态设置为 OnlineReplica 状态。replicaState.put(partitionAndReplica, OnlineReplica)case OfflineReplica => //其前置状态只能为 NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible// send stop replica command to the replica so that it stops fetching from the leader//发送 StopReplica 请求给该副本,先停止副本同步 (deletePartition = false)brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)// As an optimization, the controller removes dead replicas from the ISRval leaderAndIsrIsEmpty: Boolean =controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {case Some(_) =>//将该 replica 从 Partition 的 isr 移除这个 replica(前提 isr 中还有其他有效副本)controller.removeReplicaFromIsr(topic, partition, replicaId) match {case Some(updatedLeaderIsrAndControllerEpoch) =>// send the shrunk ISR state change request to all the remaining alive replicas of the partition.val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)if (!controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition)) {// 发送 LeaderAndIsr 请求给剩余的其他副本,因为 ISR 变动了brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)}//更新这个 Replica 的状态为 OfflineReplicareplicaState.put(partitionAndReplica, OfflineReplica)logStateChange()falsecase None =>true}case None =>true}if (leaderAndIsrIsEmpty && !controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition))throw new StateChangeFailedException(s"Failed to change state of replica $replicaId for partition $topicAndPartition since the leader " +s"and isr path in zookeeper is empty")}}catch {case t: Throwable =>stateChangeLog.error(s"Initiated state change of replica $replicaId for partition $topicAndPartition from " +s"$currState to $targetState failed", t)}}

上面 Replica 各种转移的触发的条件:
在这里插入图片描述
在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • JupyterNotebook快捷键 自用
  • springboot整合pgsql
  • 回调函数简易笔记
  • 计科录取75人!常州大学计算机考研考情分析!
  • 数字货币交易接口实现(含源代码)
  • 大模型深度神经网络(Deep Neural Network, DNN)
  • 【Python】如何在 Python 中操作 Excel
  • 【Emacs有什么优点,用Emacs写程序真的比IDE更方便吗?】
  • 网络芯片(又称为PHY网络芯片)
  • 【LeetCode】Top100 经典必刷题 06【6/20】
  • “论软件测试中缺陷管理及其应用”写作框架,软考高级论文,系统架构设计师论文
  • Oracle系统表空间的加解密
  • 基于springboot+vue+uniapp的养老院系统小程序
  • 2024最新Selenium面试题(附带答案),建议收藏备用
  • Flink入门(更新中)
  • 【108天】Java——《Head First Java》笔记(第1-4章)
  • 【前端学习】-粗谈选择器
  • Bytom交易说明(账户管理模式)
  • CentOS 7 修改主机名
  • echarts的各种常用效果展示
  • HTTP中GET与POST的区别 99%的错误认识
  • Java 网络编程(2):UDP 的使用
  • Making An Indicator With Pure CSS
  • mysql中InnoDB引擎中页的概念
  • React组件设计模式(一)
  • spring security oauth2 password授权模式
  • 规范化安全开发 KOA 手脚架
  • 基于webpack 的 vue 多页架构
  • 使用parted解决大于2T的磁盘分区
  • 世界上最简单的无等待算法(getAndIncrement)
  • 我的面试准备过程--容器(更新中)
  • 原生Ajax
  • ​决定德拉瓦州地区版图的关键历史事件
  • ​数据链路层——流量控制可靠传输机制 ​
  • ​探讨元宇宙和VR虚拟现实之间的区别​
  • # SpringBoot 如何让指定的Bean先加载
  • #HarmonyOS:软件安装window和mac预览Hello World
  • (+4)2.2UML建模图
  • (06)Hive——正则表达式
  • (day6) 319. 灯泡开关
  • (leetcode学习)236. 二叉树的最近公共祖先
  • (Matlab)基于蝙蝠算法实现电力系统经济调度
  • (二) Windows 下 Sublime Text 3 安装离线插件 Anaconda
  • (附源码)springboot宠物医疗服务网站 毕业设计688413
  • (附源码)springboot掌上博客系统 毕业设计063131
  • (三)Honghu Cloud云架构一定时调度平台
  • (三)uboot源码分析
  • (十七)devops持续集成开发——使用jenkins流水线pipeline方式发布一个微服务项目
  • (十一)图像的罗伯特梯度锐化
  • (五) 一起学 Unix 环境高级编程 (APUE) 之 进程环境
  • (一)插入排序
  • (原創) 系統分析和系統設計有什麼差別? (OO)
  • (转)Spring4.2.5+Hibernate4.3.11+Struts1.3.8集成方案一
  • (转载)Google Chrome调试JS
  • .NET CORE 第一节 创建基本的 asp.net core