当前位置: 首页 > news >正文

Kafka 请求处理揭秘:从入门到精通

Kafka 请求处理揭秘:从入门到精通

今天我们来聊聊 Kafka 的请求处理流程。无论是 Kafka 客户端还是 Broker,它们的交互都是通过“请求/响应”的方式完成的。比如,客户端会发送生产消息的请求给 Broker,Broker 处理完后再响应客户端。

Kafka 定义了一组自己的请求协议,涵盖了各种操作,比如 PRODUCE 请求用于生产消息,FETCH 请求用于消费消息,METADATA 请求用于获取集群信息。截止到 Kafka 2.3 版本,已经有多达 45 种请求格式。所有请求都是通过 TCP 网络以 Socket 的方式进行通讯的。

今天,我们就来详细探讨一下 Kafka Broker 是如何处理这些请求的。

请求处理的两种简单方案

我们可以先设想两个简单的请求处理方案:

  1. 顺序处理请求

    while (true) {Request request = accept(connection);handle(request);
    }
    

    这种方式实现简单,但吞吐量太差。每个请求都必须等待前一个请求处理完毕才能得到处理。

  2. 每个请求使用单独线程处理

    while (true) {Request request = accept(connection);Thread thread = new Thread(() -> handle(request));thread.start();
    }
    

    这种方式完全异步,每个请求都有单独的线程处理,但开销极大,可能会压垮系统。

既然这两种方案都不好,那么 Kafka 是如何处理请求的呢?答案是:Reactor 模式

什么是 Reactor 模式?

Reactor 模式是事件驱动架构的一种实现方式,特别适合处理多个客户端并发请求的场景。我们看看它的架构图:22)

多个客户端发送请求给 Reactor,Reactor 的 Dispatcher 线程将请求分发到多个工作线程中处理。Dispatcher 线程不涉及具体逻辑处理,非常轻量级,因此有很高的吞吐量表现。

Kafka 的请求处理模型

Kafka 的 Broker 端有个 SocketServer 组件,类似于 Reactor 模式中的 Dispatcher。它有 Acceptor 线程和一个网络线程池(网络线程池的大小由 num.network.threads 参数控制,默认值是 3)。Acceptor 线程将入站请求公平地分发到所有网络线程中。

网络线程拿到请求后,不是自己处理,而是将请求放入一个共享的请求队列中。Broker 端还有一个 IO 线程池,从该队列中取出请求并执行真正的处理。IO 线程池的大小由 num.io.threads 参数控制,默认值是 8。

当 IO 线程处理完请求后,会将生成的响应发送到网络线程池的响应队列中,然后由对应的网络线程负责将 Response 返还给客户端。

延时请求与 Purgatory

Kafka 中有个“炼狱”组件 Purgatory,用来缓存延时请求(Delayed Request)。比如设置了 acks=all 的 PRODUCE 请求,需要等待 ISR 中所有副本都接收了消息后才能返回。如果请求不能立刻处理,它就会暂存在 Purgatory 中,稍后满足条件时再继续处理。

控制类请求的优先处理

除了数据类请求(如 PRODUCE 和 FETCH 请求),Kafka 还有控制类请求(如 LeaderAndIsr 和 StopReplica 请求)。控制类请求可以直接影响数据类请求的处理。

为了解决控制类请求的优先处理问题,Kafka 在 2.3 版本引入了数据类请求和控制类请求的分离。Broker 启动后,会分别创建网络线程池和 IO 线程池,处理不同类型的请求。你需要在配置中指定不同的端口用于处理不同类型的请求。

小结

Kafka 请求处理流程的解析到此为止。了解请求处理过程是优化 Kafka 性能的前提条件。如果你能从请求的维度去思考 Kafka 的工作原理,你会发现优化 Kafka 并不是一件困难的事情。

希望这篇文章能帮助你更好地理解 Kafka 的请求处理机制。如果你有任何问题或建议,欢迎在评论区留言。谢谢阅读!


相关文章:

  • 小程序vant DropdownMenu 下拉菜单无法关闭
  • 【Linux】文件
  • 探究 Cosmos Hub 作为国家行为者的可能性
  • Python使用动态代理的多元应用
  • Qt 控件提升
  • HOT100与剑指Offer
  • Oracle中TAF与SCANIP全面解析
  • Usage - hackthebox
  • PyQt5创建与MySQL数据库集成的应用程序
  • 利用ssh远程安装显卡驱动
  • 铁塔基站用能监控能效解决方案
  • 链表(2)反转链表
  • 字符串匹配算法(三)Trie树算法
  • 长难句打卡5.31
  • 闽盾杯 2021 DNS协议分析
  • 【Under-the-hood-ReactJS-Part0】React源码解读
  • 4. 路由到控制器 - Laravel从零开始教程
  • C# 免费离线人脸识别 2.0 Demo
  • Go 语言编译器的 //go: 详解
  • Golang-长连接-状态推送
  • iOS | NSProxy
  • JavaScript对象详解
  • Java新版本的开发已正式进入轨道,版本号18.3
  • Laravel深入学习6 - 应用体系结构:解耦事件处理器
  • LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
  • Meteor的表单提交:Form
  • 搭建gitbook 和 访问权限认证
  • 前端每日实战:61# 视频演示如何用纯 CSS 创作一只咖啡壶
  • 硬币翻转问题,区间操作
  • 掌握面试——弹出框的实现(一道题中包含布局/js设计模式)
  • #HarmonyOS:基础语法
  • %3cscript放入php,跟bWAPP学WEB安全(PHP代码)--XSS跨站脚本攻击
  • (0)Nginx 功能特性
  • (1综述)从零开始的嵌入式图像图像处理(PI+QT+OpenCV)实战演练
  • (22)C#传智:复习,多态虚方法抽象类接口,静态类,String与StringBuilder,集合泛型List与Dictionary,文件类,结构与类的区别
  • (4)logging(日志模块)
  • (7)摄像机和云台
  • (Java岗)秋招打卡!一本学历拿下美团、阿里、快手、米哈游offer
  • (PySpark)RDD实验实战——取最大数出现的次数
  • (转)socket Aio demo
  • (轉)JSON.stringify 语法实例讲解
  • ******IT公司面试题汇总+优秀技术博客汇总
  • .chm格式文件如何阅读
  • .net core 管理用户机密
  • .NET CORE使用Redis分布式锁续命(续期)问题
  • .Net Core与存储过程(一)
  • .net framework profiles /.net framework 配置
  • .net framwork4.6操作MySQL报错Character set ‘utf8mb3‘ is not supported 解决方法
  • .net生成的类,跨工程调用显示注释
  • .NET中 MVC 工厂模式浅析
  • .Net中wcf服务生成及调用
  • .net中我喜欢的两种验证码
  • .vue文件怎么使用_我在项目中是这样配置Vue的
  • @PreAuthorize注解
  • [ NOI 2001 ] 食物链