当前位置: 首页 > news >正文

Kafka Producer发送消息流程之分区器和数据收集器

文章目录

  • 1. Partitioner分区器
  • 2. 自定义分区器
  • 3. RecordAccumulator数据收集器

1. Partitioner分区器

在这里插入图片描述

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java,中doSend方法,记录了生产者将消息发送的流程,其中有一步就是计算当前消息应该发送往对应Topic哪一个分区,

int partition = partition(record, serializedKey, serializedValue, cluster);

private final Partitioner partitioner;private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {//当record的分区已存在,则直接返回,这对应了创建Record时可以手动传入partition参数if (record.partition() != null)return record.partition();// 如果存在partitioner分区器,则使用Partitioner.partition方法计算分区数据if (partitioner != null) {int customPartition = partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);if (customPartition < 0) {throw new IllegalArgumentException(String.format("The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));}return customPartition;}// 如果没有分区器的情况if (serializedKey != null && !partitionerIgnoreKeys) {// hash the keyBytes to choose a partitionreturn BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());} else {return RecordMetadata.UNKNOWN_PARTITION;}}// 利用键的哈希值来选择分区
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;}

2. 自定义分区器

新建类实现Partitioner接口,key是字符串数字,奇数送到分区0,偶数送到分区1 。

public class MyKafkaPartitioner implements Partitioner {@Overridepublic int partition(String s, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {// Ensure the key is a non-null stringif (key == null || !(key instanceof String)) {throw new IllegalArgumentException("Key must be a non-null String");}// Parse the key as an integerint keyInt;try {keyInt = Integer.parseInt((String) key);} catch (NumberFormatException e) {throw new IllegalArgumentException("Key must be a numeric string", e);}// Determine the partition based on the key's odd/even natureif (keyInt % 2 == 0) {return 1; // Even keys go to partition 2} else {return 0; // Odd keys go to partition 0}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

新建一个存在多分区的Topic。

在这里插入图片描述

public class KafkaProducerPartitionorTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//指定拦截器config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());//指定分区器config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test1","key"+i,"我是你爹"+i);//发送recordproducer.send(record);Thread.sleep(500);}//关闭producerproducer.close();}
}

配置好PARTITIONER_CLASS_CONFIG后发送消息。
在这里插入图片描述
在这里插入图片描述

可以分区器成功起作用了。

3. RecordAccumulator数据收集器

通过数据校验后,数据从分区器来到数据收集器

数据收集器的工作机制

  1. 队列缓存RecordAccumulator为每个分区维护一个队列。默认情况下,每个队列的批次大小(buffer size)是16KB,这个大小可以通过配置参数batch.size来调整。

  2. 缓冲区管理

    • 每个分区都有一个或多个批次,每个批次包含多条消息。
    • 当一个批次填满(即达到batch.size),或者达到发送条件(如linger.ms时间窗口,即发送消息前等待的时间)时,批次会被标记为可发送状态,并被传递给Sender线程。
  3. 满批次处理

    • 当某个分区的队列中的某个批次大小超过了16KB(默认值)或满足linger.ms的时间条件,RecordAccumulator会将该批次加入到一个待发送的队列中。
    • Sender线程会从待发送队列中获取这些满批次并将其发送到Kafka集群。

相关文章:

  • 电阻有哪些参数呢
  • 13 个最受欢迎的技术写作工具
  • Proteus + Keil单片机仿真教程(六)多位LED数码管的动态显示
  • 镜像与容器
  • PostgreSQL 慢 SQL 排查
  • 【MySQL篇】Percona XtraBackup工具备份指南:常用备份命令详解与实践(第二篇,总共五篇)
  • Elasticsearch 批量更新
  • 阿里云国际站:海外视频安全的DRM加密
  • 防溺水预警系统引领水域安全新篇章
  • apache Kylin系列介绍及配置
  • Logback格式简记
  • 026-GeoGebra中级篇-曲线(2)_极坐标曲线、参数化曲面、分段函数曲线、分形曲线、复数平面上的曲线、随机曲线、非线性动力系统的轨迹
  • SpringBoot增加网关服务
  • Linux发行版CentOS 8 利用Docker安装应用
  • 初谈Linux信号-=-信号的产生
  • 分享一款快速APP功能测试工具
  • 【翻译】babel对TC39装饰器草案的实现
  • CODING 缺陷管理功能正式开始公测
  • echarts的各种常用效果展示
  • IOS评论框不贴底(ios12新bug)
  • node学习系列之简单文件上传
  • npx命令介绍
  • python 学习笔记 - Queue Pipes,进程间通讯
  • Vue 2.3、2.4 知识点小结
  • Vue.js-Day01
  • 官方新出的 Kotlin 扩展库 KTX,到底帮你干了什么?
  • 开放才能进步!Angular和Wijmo一起走过的日子
  • 前端面试之CSS3新特性
  • 微信小程序--------语音识别(前端自己也能玩)
  • 写给高年级小学生看的《Bash 指南》
  • Salesforce和SAP Netweaver里数据库表的元数据设计
  • SAP CRM里Lead通过工作流自动创建Opportunity的原理讲解 ...
  • ​七周四次课(5月9日)iptables filter表案例、iptables nat表应用
  • # windows 安装 mysql 显示 no packages found 解决方法
  • $.ajax中的eval及dataType
  • ${ }的特别功能
  • (3)选择元素——(17)练习(Exercises)
  • (el-Date-Picker)操作(不使用 ts):Element-plus 中 DatePicker 组件的使用及输出想要日期格式需求的解决过程
  • (八)c52学习之旅-中断实验
  • (二)windows配置JDK环境
  • (附源码)计算机毕业设计ssm基于Internet快递柜管理系统
  • (附源码)计算机毕业设计SSM基于java的云顶博客系统
  • (接上一篇)前端弄一个变量实现点击次数在前端页面实时更新
  • (理论篇)httpmoudle和httphandler一览
  • (面试必看!)锁策略
  • (亲测有效)解决windows11无法使用1500000波特率的问题
  • (四)stm32之通信协议
  • (转)visual stdio 书签功能介绍
  • (转载)Google Chrome调试JS
  • *ST京蓝入股力合节能 着力绿色智慧城市服务
  • .NET 4.0网络开发入门之旅-- 我在“网” 中央(下)
  • .NET Framework与.NET Framework SDK有什么不同?
  • .Net 执行Linux下多行shell命令方法
  • .NET/C# 如何获取当前进程的 CPU 和内存占用?如何获取全局 CPU 和内存占用?
  • .net6 webapi log4net完整配置使用流程