当前位置: 首页 > news >正文

Kafka-服务端-网络层-源码流程

整体架构如下所示:

在这里插入图片描述

responseQueue不在RequestChannel中,在Processor中,每个Processor内部有一个responseQueue

  1. 客户端发送的请求被Acceptor转发给Processor处理
  2. 处理器将请求放到RequestChannel的requestQueue中
  3. KafkaRequestHandler取出requestQueue中的请求
  4. 调用KafkaApis进行业务逻辑处理
  5. KafkaApis将响应结果放到对应的Processor的responseQueue中
  6. processor从responseQueue中取出响应结果
  7. processor将响应结果返回给客户端

KafkaServer是Kafka服务端的主类,KafkaServer中和网络成相关的服务组件包括SocketServer、KafkaApis和KafkaRequestHandlerPool。SocketServer主要关注网络层的通信协议,具体的业务处理逻辑则交给KafkaRequestHandler和KafkaApis来完成。

class KafkaServer(val config: KafkaConfig) {def startup() {socketServer = new SocketServer(config, metrics, time, credentialProvider)socketServer.startup(startupProcessors = false)/* start processing requests */apis = new KafkaApis(socketServer.requestChannel, ...)requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, ...)}}

SocketServer

  def startup(startupProcessors: Boolean = true) {this.synchronized {...createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)if (startupProcessors) {startProcessors()}}private def createAcceptorAndProcessors(processorsPerListener: Int,endpoints: Seq[EndPoint]): Unit = synchronized {...endpoints.foreach { endpoint =>...val acceptor = new Acceptor(endpoint, ...)addProcessors(acceptor, endpoint, processorsPerListener)KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()acceptor.awaitStartup()acceptors.put(endpoint, acceptor)}}

可以看出SocketServer.startup()中会根据listener的个数创建相同个数的acceptor,每个acceptor关联数个processor。这是一种典型的Reactor模式,acceptor负责与客户端建立连接,并将连接分发给processor,processor负责所分连接后续的所有读写交互。

Acceptor

  def run() {serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)startupComplete()try {var currentProcessor = 0while (isRunning) {try {val ready = nioSelector.select(500)if (ready > 0) {val keys = nioSelector.selectedKeys()val iter = keys.iterator()while (iter.hasNext && isRunning) {try {val key = iter.nextiter.remove()if (key.isAcceptable) {val processor = synchronized {currentProcessor = currentProcessor % processors.sizeprocessors(currentProcessor)}accept(key, processor)} elsethrow new IllegalStateException("Unrecognized key state for acceptor thread.")// round robin to the next processor thread, mod(numProcessors) will be done latercurrentProcessor = currentProcessor + 1} catch {case e: Throwable => error("Error while accepting connection", e)}}}}catch {// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due// to a select operation on a specific channel or a bad request. We don't want// the broker to stop responding to requests from other clients in these scenarios.case e: ControlThrowable => throw ecase e: Throwable => error("Error occurred", e)}}} finally {debug("Closing server socket and selector.")CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)shutdownComplete()}}

上面是Acceptor的run()方法,可以看出,Acceptor在通道上注册了SelectionKey.OP_ACCEPT事件(OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT,客户端监听OP_CONNECT事件,负责发起连接,服务端监听OP_CONNECT事件,负责建立连接),负责与客户端建立连接。并将建立的连接通过轮询的方式指派给processor。

Processor

每个Processor都会分到数个与客户端的连接。Processor的处理逻辑如下所示:

  override def run() {startupComplete()try {while (isRunning) {try {// 在新分到的客户端连接上注册OP_READ事件configureNewConnections()// 从responseQueue中取响应,赋值给KafkaChannel的send,等待poll时发送processNewResponses()// selector轮询各种事件,读取请求或者发送响应poll()// 封装selector.completedReceives中的请求,放入requestQueueprocessCompletedReceives()// 处理selector.completedSends响应(移除inflightResponses中的记录;执行响应的回调函数)processCompletedSends()processDisconnected()} catch {...}}} finally {...}}

Processor线程的名字中有kafka-network字样,可以通过jstack -l pid | grep kafka-network进行筛选。

KafkaRequestHandlerPool

KafkaServer会创建请求处理线程池(KafkaRequestHandlerPool),在请求处理线程池中会创建并启动多个请求处理线程(KafkaRequestHandler)。KafkaRequestHandler会获取RequestChannel.requestQueue中的请求进行处理,在内部实际处理会交给KafkaApis完成。

class KafkaRequestHandlerPool(val brokerId: Int, ...) {...for (i <- 0 until numThreads) {createHandler(i)}def createHandler(id: Int): Unit = synchronized {runnables += new KafkaRequestHandler(..., requestChannel, apis, time)KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()}
}

KafkaRequestHandler的run()方法如下:

class KafkaRequestHandler(id: Int,...) extends Runnable with Logging {...def run() {while (!stopped) {val req = requestChannel.receiveRequest(300)req match {case RequestChannel.ShutdownRequest =>shutdownComplete.countDown()returncase request: RequestChannel.Request =>try {request.requestDequeueTimeNanos = endTimeapis.handle(request)} catch {case e: FatalExitError =>shutdownComplete.countDown()Exit.exit(e.statusCode)case e: Throwable => error("Exception when handling request", e)} finally {request.releaseBuffer()}case null => // continue}}shutdownComplete.countDown()}}

相关文章:

  • Flink ProcessFunction不同流异同及应用场景
  • SeekBar设置自定义thumb图片的时候显示不全问题
  • 马斯克宣布xAI将在8月份推出Grok-2大模型 预计年底推出Grok-3
  • Spring Boot 创建定时任务
  • 带安全启动—Ubuntu系统—手动安装Nvidia驱动
  • 三菱PLC 6行程序实现8电机顺序启动逆序停止
  • 亚信安全:《2024云安全技术发展白皮书》
  • 【Axure高保真原型】中继器表格——移入显示详情卡片案例
  • yolo-seg模型后处理
  • 常用的Linux系统命令
  • Java中的编码规范与代码审查实践
  • 大数据面试题之Spark(5)
  • CEPH 系统盘挂了,如何使用数据盘恢复
  • Vue 详情实战涉及从项目初始化到功能实现、测试及部署的整个过程
  • 2024年07月03日 Redis部署方式和持久化
  • [译]Python中的类属性与实例属性的区别
  • 【css3】浏览器内核及其兼容性
  • Angular 响应式表单 基础例子
  • Hibernate最全面试题
  • Java,console输出实时的转向GUI textbox
  • JavaScript 基础知识 - 入门篇(一)
  • Logstash 参考指南(目录)
  • PyCharm搭建GO开发环境(GO语言学习第1课)
  • Python socket服务器端、客户端传送信息
  • React的组件模式
  • vue+element后台管理系统,从后端获取路由表,并正常渲染
  • 警报:线上事故之CountDownLatch的威力
  • 利用DataURL技术在网页上显示图片
  • 前端自动化解决方案
  • 实现菜单下拉伸展折叠效果demo
  • 责任链模式的两种实现
  • scrapy中间件源码分析及常用中间件大全
  • Spring第一个helloWorld
  • ​LeetCode解法汇总2670. 找出不同元素数目差数组
  • ​软考-高级-信息系统项目管理师教程 第四版【第19章-配置与变更管理-思维导图】​
  • #{}和${}的区别是什么 -- java面试
  • (32位汇编 五)mov/add/sub/and/or/xor/not
  • (39)STM32——FLASH闪存
  • (LeetCode C++)盛最多水的容器
  • (pt可视化)利用torch的make_grid进行张量可视化
  • (windows2012共享文件夹和防火墙设置
  • (动手学习深度学习)第13章 计算机视觉---图像增广与微调
  • (附源码)计算机毕业设计ssm基于B_S的汽车售后服务管理系统
  • (图)IntelliTrace Tools 跟踪云端程序
  • (一)SvelteKit教程:hello world
  • (已更新)关于Visual Studio 2019安装时VS installer无法下载文件,进度条为0,显示网络有问题的解决办法
  • (转) 深度模型优化性能 调参
  • *Algs4-1.5.25随机网格的倍率测试-(未读懂题)
  • .NET IoC 容器(三)Autofac
  • .NET 指南:抽象化实现的基类
  • .net反编译工具
  • .net经典笔试题
  • @Controller和@RestController的区别?
  • @RequestBody与@ResponseBody的使用
  • []error LNK2001: unresolved external symbol _m