当前位置: 首页 > news >正文

Kafka 为什么这么快?

Kafka 是一款性能非常优秀的消息队列,每秒处理的消息体量可以达到千万级别。今天来聊一聊 Kafka 高性能背后的技术原理。

1 批量发送

Kafka 收发消息都是批量进行处理的。我们看一下 Kafka 生产者发送消息的代码:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {TopicPartition tp = null;try {//省略前面代码Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);//把消息追加到之前缓存的这一批消息上RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);//积累到设置的缓存大小,则发送出去if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);this.sender.wakeup();}return result.future;// handling exceptions and record the errors;// for API exceptions return them in the future,// for other exceptions throw directly} catch /**省略 catch 代码*/
}

从代码中可以看到,生产者调用 doSend 方法后,并不会直接把消息发送出去,而是把消息缓存起来,缓存消息量达到配置的批量大小后,才会发送出去。

注意:从上面 accumulator.append 代码可以看到,一批消息属于同一个 topic 下面的同一个 partition。

Broker 收到消息后,并不会把批量消息解析成单条消息后落盘,而是作为批量消息进行落盘,同时也会把批量消息直接同步给其他副本。

消费者拉取消息,也不会按照单条进行拉取,而是按照批量进行拉取,拉取到一批消息后,再解析成单条消息进行消费。

使用批量收发消息,减轻了客户端和 Broker 的交互次数,提升了 Broker 处理能力。

2 消息压缩

如果消息体比较大,Kafka 消息吞吐量要达到千万级别,网卡支持的网络传输带宽会是一个瓶颈。Kafka 的解决方案是消息压缩。发送消息时,如果增加参数 compression.type,就可以开启消息压缩:

public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//开启消息压缩props.put("compression.type", "gzip");Producer<String, String> producer = new KafkaProducer<>(props);ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key1", "value1");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {logger.error("sending message error: ", e);} else {logger.info("sending message successful, Offset: ", metadata.offset());}}});producer.close();
}

如果 compression.type 的值设置为 none,则不开启压缩。那消息是在什么时候进行压缩呢?前面提到过,生产者缓存一批消息后才会发送,在发送这批消息之前就会进行压缩,代码如下:

public RecordAppendResult append(TopicPartition tp,long timestamp,byte[] key,byte[] value,Header[] headers,Callback callback,long maxTimeToBlock) throws InterruptedException {// ...try {// ...buffer = free.allocate(size, maxTimeToBlock);synchronized (dq) {//...RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);if (appendResult != null) {// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...return appendResult;}//这批消息缓存已满,这里进行压缩MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));dq.addLast(batch);incomplete.add(batch);// Don't deallocate this buffer in the finally block as it's being used in the record batchbuffer = null;return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);}} finally {if (buffer != null)free.deallocate(buffer);appendsInProgress.decrementAndGet();}
}

上面的 recordsBuilder 方法最终调用了下面 MemoryRecordsBuilder 的构造方法。

public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,byte magic,CompressionType compressionType,TimestampType timestampType,long baseOffset,long logAppendTime,long producerId,short producerEpoch,int baseSequence,boolean isTransactional,boolean isControlBatch,int partitionLeaderEpoch,int writeLimit) {//省略其他代码this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}

上面的 wrapForOutput 方法会根据配置的压缩算法进行压缩或者选择不压缩。目前 Kafka 支持的压缩算法包括:gzip、snappy、lz4,从 2.1.0 版本开始,Kafka 支持 Zstandard 算法。

在 Broker 端,会解压 header 做一些校验,但不会解压消息体。消息体的解压是在消费端,消费者拉取到一批消息后,首先会进行解压,然后进行消息处理。

因为压缩和解压都是耗费 CPU 的操作,所以在开启消息压缩时,也要考虑生产者和消费者的 CPU 资源情况。

有了消息批量收集和压缩,kafka 生产者发送消息的过程如下图:

图片

3 磁盘顺序读写

顺序读写省去了寻址的时间,只要一次寻址,就可以连续读写。

在固态硬盘上,顺序读写的性能是随机读写的好几倍。而在机械硬盘上,寻址时需要移动磁头,这个机械运动会花费很多时间,因此机械硬盘的顺序读写性能是随机读写的几十倍。

Kafka 的 Broker 在写消息数据时,首先为每个 Partition 创建一个文件,然后把数据顺序地追加到该文件对应的磁盘空间中,如果这个文件写满了,就再创建一个新文件继续追加写。这样大大减少了寻址时间,提高了读写性能。

4 PageCache

在 Linux 系统中,所有文件 IO 操作都要通过 PageCache,PageCache 是磁盘文件在内存中建立的缓存。当应用程序读写文件时,并不会直接读写磁盘上的文件,而是操作 PageCache。

图片

应用程序写文件时,都先会把数据写入 PageCache,然后操作系统定期地将 PageCache 的数据写到磁盘上。如下图:

图片

而应用程序在读取文件数据时,首先会判断数据是否在 PageCache 中,如果在则直接读取,如果不在,则读取磁盘,并且将数据缓存到 PageCache。

图片

Kafka 充分利用了 PageCache 的优势,当生产者生产消息的速率和消费者消费消息的速率差不多时,Kafka 基本可以不用落盘就能完成消息的传输。

5 零拷贝

Kafka Broker 将消息发送给消费端时,即使命中了 PageCache,也需要将 PageCache 中的数据先复制到应用程序的内存空间,然后从应用程序的内存空间复制到 Socket 缓存区,将数据发送出去。如下图:

图片

Kafka 采用了零拷贝技术把数据直接从 PageCache 复制到 Socket 缓冲区中,这样数据不用复制到用户态的内存空间,同时 DMA 控制器直接完成数据复制,不需要 CPU 参与。如下图:

图片

Java 零拷贝技术采用 FileChannel.transferTo() 方法,底层调用了 sendfile 方法。

6 mmap

Kafka 的日志文件分为数据文件(.log)和索引文件(.index),Kafka 为了提高索引文件的读取性能,对索引文件采用了 mmap 内存映射,将索引文件映射到进程的内存空间,这样读取索引文件就不需要从磁盘进行读取。如下图:

图片

7 总结

本文介绍了 Kafka 实现高性能用到的关键技术,这些技术可以为我们学习和工作提供参考。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 黑客是如何绕过ssh登录目标站点
  • 【linux】4张卡,坏了1张,怎么办?
  • Vue使用代理方式解决跨域问题
  • 【Git】Git Commit Angular规范详解
  • win10下使用docker、k8s部署java应用
  • 【HTML元素居中】元素水平垂直居中的常用方法
  • ‌内网穿透技术‌总结
  • Python知识点:使用Cython进行Python性能优化
  • VSCode 离线安装中文语言包
  • 基于springboot+vue超市管理系统
  • Sharding-Jdbc+Mybatis+SpringBoot的分库分表实现
  • Vue学习记录之六(组件实战及BEM框架了解)
  • STM32巡回研讨会总结(2024)
  • [000-01-008].第05节:OpenFeign特性-重试机制
  • LeetCode题练习与总结: 数字 1 的个数--233
  • 【跃迁之路】【641天】程序员高效学习方法论探索系列(实验阶段398-2018.11.14)...
  • angular2 简述
  • AzureCon上微软宣布了哪些容器相关的重磅消息
  • Debian下无root权限使用Python访问Oracle
  • echarts花样作死的坑
  • es6要点
  • interface和setter,getter
  • iOS编译提示和导航提示
  • JavaScript 奇技淫巧
  • Java基本数据类型之Number
  • Nodejs和JavaWeb协助开发
  • PAT A1050
  • sessionStorage和localStorage
  • 二维平面内的碰撞检测【一】
  • 基于webpack 的 vue 多页架构
  • 近期前端发展计划
  • 利用DataURL技术在网页上显示图片
  • 前端临床手札——文件上传
  • 如何解决微信端直接跳WAP端
  • 实现简单的正则表达式引擎
  • 腾讯大梁:DevOps最后一棒,有效构建海量运营的持续反馈能力
  • 微信支付JSAPI,实测!终极方案
  • 文本多行溢出显示...之最后一行不到行尾的解决
  • 我从编程教室毕业
  • [Shell 脚本] 备份网站文件至OSS服务(纯shell脚本无sdk) ...
  • Redis4.x新特性 -- 萌萌的MEMORY DOCTOR
  • ###51单片机学习(2)-----如何通过C语言运用延时函数设计LED流水灯
  • #Datawhale AI夏令营第4期#AIGC方向 文生图 Task2
  • $$$$GB2312-80区位编码表$$$$
  • $().each和$.each的区别
  • (Spark3.2.0)Spark SQL 初探: 使用大数据分析2000万KF数据
  • (八)Spring源码解析:Spring MVC
  • (不用互三)AI绘画:科技赋能艺术的崭新时代
  • (黑客游戏)HackTheGame1.21 过关攻略
  • (教学思路 C#之类三)方法参数类型(ref、out、parmas)
  • (详细版)Vary: Scaling up the Vision Vocabulary for Large Vision-Language Models
  • (新)网络工程师考点串讲与真题详解
  • (原創) 是否该学PetShop将Model和BLL分开? (.NET) (N-Tier) (PetShop) (OO)
  • (转)利用ant在Mac 下自动化打包签名Android程序
  • .jks文件(JAVA KeyStore)