当前位置: 首页 > news >正文

# Kafka_深入探秘者(2):kafka 生产者

Kafka_深入探秘者(2):kafka 生产者

一、kafka 消息发送流程解析

1、kafka :java 客户端 数据生产流程解析

在这里插入图片描述

二、kafka 发送类型

1、kafka 发送类型–发送即忘记:producer.send(record) 同步发送


//通过 send() 发送完消息后返回一个 Future 对象,然后调用 Future 对象的 get() 方法等待 kafka 响应
//如果 kafka 正常响应,返回一个 Recordetadata 对象,该对象存储消息的偏移量
//如果 kafka 发生错误,无法正常响应,就会抛出异常,我们便可以进行异常处理 producer.send(record).get();

2、在 kafka_learn 工程中,修改 生产者 ProducerFastStart.java 类 ,获取发送类型。


/***  kafka_learn\src\main\java\djh\it\kafka\learn\chapter1\ProducerFastStart.java**  2024-6-21 创建 生产者 ProducerFastStart.java 类*/
package djh.it.kafka.learn.chapter1;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
//注意导包,一定要导成 kafka 的序列化包
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.Future;public class ProducerFastStart {//private static final String brokerList = "localhost:9092";private static final String brokerList = "172.18.30.110:9092";private static final String topic = "heima";public static void main( String[] args ) {Properties properties = new Properties();//1)设置 key 序列化器 -- 优化代码//properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//2)设置重试次数 -- 优化代码properties.put(ProducerConfig.RETRIES_CONFIG, 10);//3)设置值序列化器 -- 优化代码//properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//4)设置集群地址 -- 优化代码//properties.put("bootstrap.servers", brokerList);properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);ProducerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo", "kafka发送类型(同步发送)-2024-6-21-kafka-test!");try{
//            producer.send(record);//发送类型--同步发送Future<RecordMetadata> send = producer.send(record);RecordMetadata recordMetadata = send.get();System.out.println("topic: " + recordMetadata.topic());System.out.println("partition: " + recordMetadata.partition());System.out.println("offset: " + recordMetadata.offset());}catch (Exception e){e.printStackTrace();}producer.close();}
}

在这里插入图片描述

3、kafka 发送类型–异步发送,相当于重新启动一个线程发送消息。


//发送类型--异步发送
producer.send(record, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("topic: " + metadata.topic());System.out.println("partition: " + metadata.partition());System.out.println("offset: " + metadata.offset());}}
});

4、在 kafka_learn 工程中,修改 生产者 ProducerFastStart.java 类 ,进行异步发送。


/***  kafka_learn\src\main\java\djh\it\kafka\learn\chapter1\ProducerFastStart.java**  2024-6-21 创建 生产者 ProducerFastStart.java 类*/
package djh.it.kafka.learn.chapter1;import org.apache.kafka.clients.producer.*;
//注意导包,一定要导成 kafka 的序列化包
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.Future;public class ProducerFastStart {//private static final String brokerList = "localhost:9092";private static final String brokerList = "172.18.30.110:9092";private static final String topic = "heima";public static void main( String[] args ) {Properties properties = new Properties();//1)设置 key 序列化器 -- 优化代码//properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//2)设置重试次数 -- 优化代码properties.put(ProducerConfig.RETRIES_CONFIG, 10);//3)设置值序列化器 -- 优化代码//properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//4)设置集群地址 -- 优化代码//properties.put("bootstrap.servers", brokerList);properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);ProducerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo", "kafka异步发送-2024-6-21-kafka-test!");try{
//            producer.send(record);
//            //发送类型--同步发送
//            Future<RecordMetadata> send = producer.send(record);
//            RecordMetadata recordMetadata = send.get();
//            System.out.println("topic: " + recordMetadata.topic());
//            System.out.println("partition: " + recordMetadata.partition());
//            System.out.println("offset: " + recordMetadata.offset());//发送类型--异步发送producer.send(record, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("topic: " + metadata.topic());System.out.println("partition: " + metadata.partition());System.out.println("offset: " + metadata.offset());}}});}catch (Exception e){e.printStackTrace();}producer.close();}
}

在这里插入图片描述

三、kafka 序列化器

1、序列化器

消息要到网络上进行传输,必须进行序列化,而序列化器的作用就是如此。

Kafka 提供了默认的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),
还有整型(IntegerSerializer)和字节数组(BytesSerializer)序列化器,

这些序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer)
基本上能够满足大部分场景的需求。

2、在 kafka_learn 工程中,创建 Company.java 对象类


/***  kafka_learn\src\main\java\djh\it\kafka\learn\chapter2\Company.java**  2024-6-21 创建 Company.java 对象类*/
package djh.it.kafka.learn.chapter2;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Company {private String name;private String address;public String getName() {return name;}public void setName( String name ) {this.name = name;}public String getAddress(){return address;}public void setAddress( String address ) {this.address = address;}
}

3、自定义序列化器

见代码库: com.heima.kafka.chapter2.companySerializer

在 kafka_learn 工程中,自定义序列化器 companySerializer.java


/***  kafka_learn\src\main\java\djh\it\kafka\learn\chapter2\companySerializer.java**  2024-6-21 自定义序列化器 companySerializer.java*  主要处理针对 自定义类*/
package djh.it.kafka.learn.chapter2;import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class companySerializer implements Serializer<Company> {@Overridepublic void configure( Map<String, ?> configs, boolean isKey ) {}@Overridepublic byte[] serialize( String topic, Company data ) {if(data == null){return null;}byte[] name, address;try{if(data.getName() != null){name = data.getName().getBytes("UTF-8");}else{name = new byte[0];}if(data.getAddress() != null){address = data.getAddress().getBytes("UTF-8");}else {address = new byte[0];}ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);buffer.putInt(name.length);buffer.put(name);buffer.putInt(address.length);buffer.put(address);return buffer.array();}catch (UnsupportedEncodingException e){e.printStackTrace();}return new byte[0];}@Overridepublic void close() {}
}

四、kafka 分区器

1、kafka 分区器

本身 kafka 有自己的分区策略的,如果未指定,就会使用默认的分区策略 Kafka 根据传递消息的 key 来进行分区的分配,即 hash(key)%numPartitions。如果 Key 相同的话,那么就会分配到统一分区。

2、源代码 org.apache.kafka.clients.producer.internals.DefaultPartitioner.java 分析


/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.kafka.clients.producer.internals;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Utils;import java.util.Map;/*** The default partitioning strategy:* <ul>* <li>If a partition is specified in the record, use it* <li>If no partition is specified but a key is present choose a partition based on a hash of the key* <li>If no partition or key is present choose the sticky partition that changes when the batch is full.* * See KIP-480 for details about sticky partitioning.*/
public class DefaultPartitioner implements Partitioner {private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();public void configure(Map<String, ?> configs) {}/*** Compute the partition for the given record.** @param topic The topic name* @param key The key to partition on (or null if no key)* @param keyBytes serialized key to partition on (or null if no key)* @param value The value to partition on or null* @param valueBytes serialized value to partition on or null* @param cluster The current cluster metadata*/public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());}/*** Compute the partition for the given record.** @param topic The topic name* @param numPartitions The number of partitions of the given {@code topic}* @param key The key to partition on (or null if no key)* @param keyBytes serialized key to partition on (or null if no key)* @param value The value to partition on or null* @param valueBytes serialized value to partition on or null* @param cluster The current cluster metadata*/public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) {if (keyBytes == null) {return stickyPartitionCache.partition(topic, cluster);}// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}public void close() {}/*** If a batch completed for the current sticky partition, change the sticky partition. * Alternately, if no sticky partition has been determined, set one.*/public void onNewBatch(String topic, Cluster cluster, int prevPartition) {stickyPartitionCache.nextPartition(topic, cluster, prevPartition);}
}

3、也可以自定义分区器,在 kafka_learn 工程中,创建 自定义分区器 DefinePartitioner.java


/***  kafka_learn\src\main\java\djh\it\kafka\learn\chapter2\DefinePartitioner.java**  2024-6-21 创建 自定义分区器 DefinePartitioner.java*/
package djh.it.kafka.learn.chapter2;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;public class DefinePartitioner implements Partitioner {private final AtomicInteger counter = new AtomicInteger(0);@Overridepublic int partition( String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster ) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (null == keyBytes){return counter.getAndIncrement() % numPartitions;} else {return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}@Overridepublic void close() {}@Overridepublic void onNewBatch( String topic, Cluster cluster, int prevPartition ) {}@Overridepublic void configure( Map<String, ?> map ) {}
}

五、kafka 拦截器

1、kafka 拦截器

Producer 拦截器 (interceptor) 是个相当新的功能,它和 consumer 端 interceptor 是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。

2、生产者拦截器可以用在消息发送前做一些准备工作。

使用场景

  • 1、按照某个规则过滤掉不符合要求的消,息
  • 2、修改消息的内容
  • 3、统计类需求

3、源码 kafka 拦截器接口类 ProducerInterceptor.java


package org.apache.kafka.clients.producer;import org.apache.kafka.common.Configurable;public interface ProducerInterceptor<K, V> extends Configurable {public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);public void onAcknowledgement(RecordMetadata metadata, Exception exception);public void close();
}

4、在 kafka_learn 工程中,创建 自定义拦截器 ProducerinterceptorPrefix.java


/***  kafka_learn\src\main\java\djh\it\kafka\learn\chapter2\ProducerInterceptorPrefix.java**  2024-6-21 自定义拦截器 ProducerInterceptorPrefix.java*/
package djh.it.kafka.learn.chapter2;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> {private volatile long sendSuccess = 0;private volatile long sendFailure = 0;@Overridepublic ProducerRecord<String, String> onSend( ProducerRecord<String, String> record ) {String modifiedValue = "prefix1-" + record.value();return new ProducerRecord<>(record.topic(),record.partition(), record.timestamp(),record.key(), modifiedValue, record.headers());}@Overridepublic void onAcknowledgement( RecordMetadata metadata, Exception e ) {if(e == null){sendSuccess++;} else {sendFailure++;}}@Overridepublic void close() {double successRatio = (double) sendSuccess / (sendFailure + sendSuccess);System.out.println("[INFO] 发送成功率=" + String.format("%f", successRatio * 100) + "%");}@Overridepublic void configure( Map<String, ?> configs ) {}
}

六、kafka 发送原理剖析总结

1、kafka 发送原理剖析

kafka 发送原理剖析.png

消息发送的过程中,涉及到两个线程协同工作,主线程首先将业务数据封装成 ProducerRecord 对象,之后调用 send0 方法将消息放入 RecordAccumulator (消息收集器,也,可以理解为主线程与 Sender 线程直接的缓冲区)中暂存,Sender 线程负青将消息信息构成请求,并最终执行网络 I/0 的线程,它从 RecordAccumulator 中取出消息并批量发送出去,需要注意的是,KafkaProducer 是线程安全的,多个线程间可以共享使用同一个 KafkaProducer 对象。

2、其他生产者参数:

2.1 acks

这个参数用来指定分区中必须有多少个副本收到这条消息,之后生产者才会认为这条消息时写入成功的。acks 是生产者客户端中非常重要的一个参数,它涉及到消息的可靠性和吞吐量之间的权衡。

  • ack=0,生产者在成功写入消息之前不会等待任何来自服务器的相应。如果出现问题生产者是感知不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。

  • ack=1,默认值为 1,只要集群的首领节点收到消息,生产这就会收到一个来自服务器的成功响应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能会导致数据丢失,如果收到写成功通知,此时首领节点还没来的及同步数据到 follower 节点,首领节点崩溃,就会导致数据丢失。

  • ack=-1, 只有当所有参与复制的节点都收到消息时,生产这会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息。

注意: acks 参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出以下异常。

2.2 retries

生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,如果达到了 retires 设置的次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,可以通过 retry.backoff.ms 参数来修改这个时间间隔。

2.3 batch.size

当有多个消,息要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消,息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消,息的批次也可能被发送。所以就算把 batch.size 设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置的太小,生产者会因为频繁发送消息而增加一些额外的开销。

2.4 max.request.size

该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指单个请求里所有消息的总大小。broker 对可接收的消息最大值也有自己的限制 (message.max.size),所以两边的配置最好匹配,避免生产者发送的消息被 broker 拒绝。

3、在 kafka_learn 工程中,创建 KafkaProducerAnalysis.java 类,添加 自定义分区器、自定义拦截器 进行发送测试。


/***  kafka_learn\src\main\java\djh\it\kafka\learn\chapter2\KafkaProducerAnalysis.java**  2024-6-21 创建 生产者 KafkaProducerAnalysis.java 类**  自定义分区器、自定义拦截器 分析,进行发送测试*/
package djh.it.kafka.learn.chapter2;//注意导包,一定要导成 kafka 的序列化包import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class KafkaProducerAnalysis {//private static final String brokerList = "localhost:9092";private static final String brokerList = "172.18.30.110:9092";private static final String topic = "heima";public static void main( String[] args ) throws InterruptedException{Properties props = initNevConfig();KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);//        ProducerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo-001", "kafka-自定义-分区DefinePartitioner");ProducerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo-001", "kafka-自定义拦截器ProducerInterceptorPrefix使用");try{//1、发送消息producer.send(record);}catch (Exception e){e.printStackTrace();}producer.close();}private static Properties initNevConfig() {Properties properties = new Properties();//1)设置 key 序列化器 -- 优化代码//properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//2)设置重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 10);//3)设置值序列化器 -- 优化代码//properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//4)设置集群地址 -- 优化代码//properties.put("bootstrap.servers", brokerList);properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");//自定义分区器使用
//        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DefinePartitioner.class.getName());//自定义拦截器使用properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());//其他参数:acks 使用:
//        properties.put(ProducerConfig.ACKS_CONFIG, 0);  //error, 必须是字符串类型
//        properties.put(ProducerConfig.ACKS_CONFIG, "0");  //ok, 必须是字符串类型return properties;}
}

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

上一节关联链接请点击
# Kafka_深入探秘者(1):初识 kafka

相关文章:

  • 如何让视频有高级感 高级感视频制作方法 高级感视频怎么剪 会声会影视频剪辑制作教程 会声会影中文免费下载
  • [Linux] 文件/目录命令
  • MySQL学习笔记-进阶篇-锁
  • 反射机制详解
  • 6 DSL_03地理信息查询相关性算分
  • std::function和std::bind函数
  • MyBatis-Plus入门教程(一)
  • Day11 —— 大数据技术之Spark
  • C#的膨胀之路:创新还是灭亡
  • npm i 报错 gifsicle pre-build test failed
  • 零撸广告创业项目:撸包小游戏对接广告联盟app开发
  • sh脚本模块笔记
  • C语言入门4-函数和程序结构
  • mysql 库存表 累计 sql语句 第一方法
  • Linux_软硬链接
  • [deviceone开发]-do_Webview的基本示例
  • Angularjs之国际化
  • Bootstrap JS插件Alert源码分析
  • hadoop集群管理系统搭建规划说明
  • Java 11 发布计划来了,已确定 3个 新特性!!
  • java8-模拟hadoop
  • Java深入 - 深入理解Java集合
  • MYSQL如何对数据进行自动化升级--以如果某数据表存在并且某字段不存在时则执行更新操作为例...
  • Node 版本管理
  • springMvc学习笔记(2)
  • Vue源码解析(二)Vue的双向绑定讲解及实现
  • 阿里云购买磁盘后挂载
  • 力扣(LeetCode)357
  • 聊聊spring cloud的LoadBalancerAutoConfiguration
  • 前端自动化解决方案
  • 深度学习入门:10门免费线上课程推荐
  • 实战|智能家居行业移动应用性能分析
  • 使用agvtool更改app version/build
  • 使用parted解决大于2T的磁盘分区
  • 微信开源mars源码分析1—上层samples分析
  • 写给高年级小学生看的《Bash 指南》
  • 原创:新手布局福音!微信小程序使用flex的一些基础样式属性(一)
  • 7行Python代码的人脸识别
  • Redis4.x新特性 -- 萌萌的MEMORY DOCTOR
  • 组复制官方翻译九、Group Replication Technical Details
  • ​flutter 代码混淆
  • ​LeetCode解法汇总1276. 不浪费原料的汉堡制作方案
  • #Linux(帮助手册)
  • #周末课堂# 【Linux + JVM + Mysql高级性能优化班】(火热报名中~~~)
  • $L^p$ 调和函数恒为零
  • (1)bark-ml
  • (10)ATF MMU转换表
  • (2024,Vision-LSTM,ViL,xLSTM,ViT,ViM,双向扫描)xLSTM 作为通用视觉骨干
  • (3)Dubbo启动时qos-server can not bind localhost22222错误解决
  • (aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器
  • (c语言版)滑动窗口 给定一个字符串,只包含字母和数字,按要求找出字符串中的最长(连续)子串的长度
  • (TOJ2804)Even? Odd?
  • (顶刊)一个基于分类代理模型的超多目标优化算法
  • (二)WCF的Binding模型
  • (附源码)springboot猪场管理系统 毕业设计 160901