当前位置: 首页 > news >正文

【云原生进阶之PaaS中间件】第三章Kafka-4.2-生产者工作原理剖析

1 kafka生产者工作模式

1.1 生产者消息发送流程

1.1.1 发送原理

        Producer首先调用send方法进行发送,首先会经过拦截器,可以对数据进行一些加工处理。随后会经过序列化,kafka并没有采用Java提供的序列化器,而是自己实现的序列化器,但是Java提供的序列化器,会在原有数据的基础上,增加很多的用于安全校验的数据,在大数据的场景下,每次传输的数据量很大,如果在此基础上还要加入大量用于安全校验的数据,严重的影响了效率,所以kafka等中间件,自己实现了序列化器,仅仅进行简单的校验,增加了效率。

        随后经过分区器(分区器实际上是将数据发送到了缓冲队列中,缓冲队列是一个双端队列,其内部包含内存池,避免频繁的申请和释放内存),因为kafka可以对topic进行分区,所以发送时就需要确定向哪个分区发送信息,就由分区器定义的规则来发送,一个分区对应一个队列,这些队列都是在内存中创建的,总大小默认32M,每一批次默认大小32K。

        在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

        发送时,以分区节点为key,即broker1,broker2为key,请求为value进行发送,形成一个请求。请求发送到某个broker中,如果第一个请求发送到broker1,broker1没有即使的应答,允许继续发送第二个请求,直到五个请求都没有得到应答,后续的请求不会再发送,直到得到了请求的应答才继续发送。

        从图中的流程可以看出,生产者和kafka集群之间还有一个RecordAccumulator队列,默认大小是32M,topic分区的话,producer会对应有一个分区器,数据在进入中间队列前,已经被分区器进行了分区,sender()方法在发送数据时,就直接根据分区进行拉取了,拉取时有两个参数,也就是调优参数:

  1. batch.size :也就是批大小,只有数据累计到batch.size后,sender才会发送数据,默认16k ;
  2. linger.ms :也就是等待时间,如果数据未达到batch.size,sender等待linger.ms设置的时间就会发送数据,单位ms,默认值就是0ms,就是有了一条数据直接发(默认为0是因为kafka要接实时数仓,所以设置为0);

        kafka集群收到请求之后会涉及到一个应答机制,应答级别分为0、1、-1:

  • 0:生产者发送过来的数据,不需要等待数据落盘应答;
  • 1:生产者发送过来的数据,Leader(数据落盘)收到后应答,副本有没有无所谓;
  • -1(all) :生产者发送过来的数据,Leader和ISR里面的所有节点收齐数据后应答,-1和all等价。

        Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follow + Leader集合(leader:0,ISR:0,1,2),如果Follower长时间未向Leader发送通信请求或同步数据,则该Follow将被踢出ISR。改时间阈值由replica.lag.time.max.ms参数设定,默认30s,例如如果2超时,(leader:0,ISR:0,1)。

        在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景:

        kafka集群应答之后,如果成功,进行数据的清理,如果失败,进行重试,默认重试次数是int的最大值 :

        重复数据的评判标准:具有相同主键的信息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的。

        所以幂等性只能保证的是在单分区单会话内不重复。如果想保证数据一定不重复,就需要开启事务。

        使用幂等性:开启参数enable.idempotence默认为true,即默认为开启。

1.1.4 生产者重要参数列表

1.2 异步发送API

        生产者代码中有3必须,IP即连接地址、key和value的序列化器。

1.2.1 普通异步发送流程

创建maven项目

导入依赖

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>

代码编写

package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu"));}// 5. 关闭资源kafkaProducer.close();}
}

1.2.2 带回调函数的异步发送

        回调函数是实现应答机制的函数.

package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.*; 
import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):// 序列化器的serialization是一个接口,找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法//(2)消息发送失败  exception != null  也会调用该方法if (exception == null) {System.out.println(metadata);//使用打印演示}else{exception.printStackTrace();//打印异常信息}}});}// 5. 关闭资源kafkaProducer.close();}
}

1.3 同步发送API

package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 默认为异步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}

1.4 生产者分区

1.4.1 kafka分区的好处

        因为不同的分区分布在不同的节点上,所以便于合理使用资源,实现负载均衡。并且在不同节点上可以提高并行度。

1.4.2 生产者发送消息的分区策略

  1. 指定发送到哪一个分区,直接使用对应的分区号,不会走分区器;
  2. 不写分区号,需要走分区器,有key,按照key进行hash之后取模分区个数;
  3. 不写分区号,需要走分区器,没有key,粘性分区缓存机制;
    • 一批数据发送到随机的一个分区中,下一批数据发送到另外一个分区;
    • 如果是异步发送,数据发送的比较快,10条数据被当作一批,每一次都是一个分区;
    • 如果是同步发送,发一条数据歇一会,导致每一条数据都是不同批;

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerCallBackPartition {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// (1)指定发送到哪一个分区  直接使用对应的分区号   不会走分区器// (2) 不写分区号  需要走分区器  有key     按照key进行hash之后取模分区个数// (3) 不写分区号  需要走分区器  没有key   粘性分区缓存机制//  一批数据发送到随机的一个分区中,下一批数据发送到另外一个分区// 如果是异步发送,数据发送的比较快   10条数据被当作一批  每一次都是一个分区// 如果是同步发送,发一条数据歇一会,导致每一条数据都是不同批kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法//(2)消息发送失败  exception != null  也会调用该方法if (exception == null) {System.out.println(metadata);}else{exception.printStackTrace();}}}).get();}//Thread.sleep(20);// 5. 关闭资源kafkaProducer.close();}
}

1.4.3 自定义分区器

根据业务需求,可以自定义分区器。

假设需求:发送过来的数据中如果包含atguigu,就发往0号分区,不包含atguigu,就发往1号分区

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;// (1)实现分区器的接口
public class CustomPartitioner implements Partitioner {
/*
传参
topic:主题   key:key值   keyBytes:key序列化之后     value:value值   
valueBytes:value序列化之后      cluster:集群信息    return的是分区号
*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//String log = value.toString();if (log.contains("atguigu")) {return 0;}return 1;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

然后,调用

import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 添加定义的分区器,需要自定义分区的全类名properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.partitioner.CustomPartitioner");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法//(2)消息发送失败  exception != null  也会调用该方法if (exception == null) {System.out.println(metadata);}else{exception.printStackTrace();}}});}// 5. 关闭资源kafkaProducer.close();}
}

1.5 生产经验

1.5.1 生产者如何提高吞吐量

提高吞吐量,就是提高批次传输大小,还有就是效率问题.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerParameters {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//调优参数,还是需要根据业务需求来调整//batch.size 批次大小,默认是16k,将批次大小增大,进而提高吞吐量properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32768);//linger.ms 等待时长,默认是0ms,增加等待时长properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);//双端队列大小,默认是32M,可以提高到64Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,67108864);//调整压缩格式,默认没有压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu"));}// 5. 关闭资源kafkaProducer.close();}
}

1.5.2 数据可靠性

        数据可靠性基于ack应答机制。数据完全可靠的条件:Acks级别设置为-1,分区副本大于等于2,ISR应答的最小副本数大于等于2。

副本介绍

(1)Kafka副本作用:提高数据可靠性。

(2)Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。

(3)Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。

(4)Kafka分区中的所有副本统称为AR(Assigned Repllicas)。

AR = ISR + OSR

  • ISR,表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader。
  • OSR,表示Follower与Leader副本同步时,延迟过多的副本。

可靠性总结:

  1. acks=0,生产者数据发来,kafka集群内存接受到数据就返回ack
  2. acks=1,生产者数据发来,kafka集群中的leader落盘数据后返回ack
  3. acks=-1,生产者数据发来,kafka集群中的所有副本落盘数据后返回ack
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerAcks {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//设置应答机制acks,可以去3个值,0、1、all(相当与ask = -1)properties.put(ProducerConfig.ACKS_CONFIG, "all");//重试次数retries ,默认是int最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu"));}// 5. 关闭资源kafkaProducer.close();}
}

副本故障处理

1.5.3 数据去重

1.5.3.1 数据传递语义

1.5.3.2 幂等性

        开启参数enable.idempotence 默认为true,false关闭。

1.5.3.3 生产者事务

        0.11版本的Kafka同时引入了事务的特性,为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。

        为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

        就是引入一个全局唯一且一致的id,然后将id和pid绑定,从而使producer重启后,kafka集群依然可以通过id获得原来的pid。

        注意:开启事务,必须开启幂等性。

        一定要手动指定事务id:

1.5.4 数据有序

        分区内有序,分区之间无序:

1.5.5 数据乱序

        生产端的InFilghtRequests,默认每个broker最多缓存五个请求,当第一个数据发送过去,第二个数据没有发送成功,这时第二波数据就要进行重试,但是此时第三波数据发送,发送成功了,然后第二波数据的重试才发送成功,本来的数据顺序是123,但是现在被改为了132,发生了数据乱序。

        将max.in.flight.requests-per.connection设置为1,即不缓存request请求,自然不会发生数据乱序的情况。

        开启幂等性以后,因为SeqNumber是单调递增的,所以当数据是顺序的时候,不需要排序就可以发送,但是当发生上面的情况之后,服务端发现数据的SeqNumber是132,不是单调递增了,会对数据进行缓存,攒到5个以后会进行重新排序,之后再进行发送。

参考链接

【精选】Kafka基本原理详解_昙花逐月的博客-CSDN博客

这是最详细的Kafka应用教程了 - 掘金

Kafka : Kafka入门教程和JAVA客户端使用-CSDN博客

简易教程 | Kafka从搭建到使用 - 知乎

【精选】kafka简介_唏噗的博客-CSDN博客

Kafka 架构及基本原理简析

kafka是什么

再过半小时,你就能明白kafka的工作原理了

Kafka 设计与原理详解

Kafka【入门】就这一篇! - 知乎

kafka简介_kafka_唏噗-华为云开发者联盟

kafka详解

Kafka 设计与原理详解-CSDN博客

kafka学习知识点总结(三)

kafka——生产者原理解析_小波同学的技术博客_51CTO博客

相关文章:

  • PCIE Order Set
  • 中小学信息学奥赛CSP-J认证 CCF非专业级别软件能力认证-入门组初赛模拟题第一套(完善程序题)
  • uni-app 经验分享,从入门到离职(年度实战总结:经验篇)——上传图片以及小程序隐私保护指引设置
  • django中查询优化
  • docker 2:安装
  • 数据分析基础之《pandas(7)—高级处理2》
  • 详解结构体内存对齐及结构体如何实现位段~
  • OSDI 2023: Conveyor One-Tool-Fits-All Continuous Software Deployment at Meta
  • Spring Boot 笔记 005 环境搭建
  • 三星4621NS加粉后清零方法
  • C#系列-C#EF框架返回单个值(23)
  • 单例模式 C++
  • 数据库切片大对决:ShardingSphere与Mycat技术解析
  • Java中抽象类和接口的区别
  • [ai笔记4] 将AI工具场景化,应用于生活和工作
  • 【划重点】MySQL技术内幕:InnoDB存储引擎
  • android百种动画侧滑库、步骤视图、TextView效果、社交、搜房、K线图等源码
  • classpath对获取配置文件的影响
  • CSS实用技巧
  • Django 博客开发教程 8 - 博客文章详情页
  • Logstash 参考指南(目录)
  • miniui datagrid 的客户端分页解决方案 - CS结合
  • MySQL QA
  • MYSQL 的 IF 函数
  • node-glob通配符
  • node学习系列之简单文件上传
  • React-生命周期杂记
  • Redux系列x:源码分析
  • windows-nginx-https-本地配置
  • 记一次用 NodeJs 实现模拟登录的思路
  • 每天10道Java面试题,跟我走,offer有!
  • 腾讯优测优分享 | 你是否体验过Android手机插入耳机后仍外放的尴尬?
  • 为什么要用IPython/Jupyter?
  • 基于django的视频点播网站开发-step3-注册登录功能 ...
  • 我们雇佣了一只大猴子...
  • ​Linux Ubuntu环境下使用docker构建spark运行环境(超级详细)
  • ​软考-高级-系统架构设计师教程(清华第2版)【第12章 信息系统架构设计理论与实践(P420~465)-思维导图】​
  • $$$$GB2312-80区位编码表$$$$
  • ()、[]、{}、(())、[[]]命令替换
  • (3)(3.5) 遥测无线电区域条例
  • (草履虫都可以看懂的)PyQt子窗口向主窗口传递参数,主窗口接收子窗口信号、参数。
  • (二)Linux——Linux常用指令
  • (附源码)springboot宠物管理系统 毕业设计 121654
  • (附源码)springboot炼糖厂地磅全自动控制系统 毕业设计 341357
  • (一)kafka实战——kafka源码编译启动
  • (转)Android学习系列(31)--App自动化之使用Ant编译项目多渠道打包
  • .cn根服务器被攻击之后
  • .NET Core WebAPI中封装Swagger配置
  • .net mvc 获取url中controller和action
  • .NET/C# 将一个命令行参数字符串转换为命令行参数数组 args
  • .NetCore Flurl.Http 升级到4.0后 https 无法建立SSL连接
  • .NET中统一的存储过程调用方法(收藏)
  • @for /l %i in (1,1,10) do md %i 批处理自动建立目录
  • @RequestMapping 的作用是什么?
  • @vue/cli脚手架