当前位置: 首页 > news >正文

kafka知识点总结

一 、kafka结构

1.kafka 基础结构

kafka有两种消息队列的模式 即点对点 和主题模式;

为了方便扩展,并提高吞吐量,一个topic被切分成多个pertition
一个主机对应一个broker,每个break里面又被分成topic;

(1)Producer:消息生产者,就是向Kafka broker 发消息的客户端。

(2)Consumer:消息消费者,向Kafka broker 取消息的客户端。

(3)Consumer Group(CG):消费者组,由多个consumer 组成。消费者组内每个消
    费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不
    影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

(4)Broker:一台Kafka 服务器就是一个broker。一个集群由多个broker 组成。一个broker 可以容纳多个topic。

(5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。

(6)Partition:为了实现扩展性,一个非常大的topic 可以分布到多个broker(即服务器)上,一个topic 可以分为多个                
     partition,每个partition 是一个有序的队列。

(7)Replica:副本。一个topic 的每个分区都有若干个副本,一个Leader 和若干个Follower。

(8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。

(9)Follower:每个分区多个副本中的“从”,实时从Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 
      Follower 会成为新的Leader。

常见题目

2、常用配置

配置文件为config包下的service.property 文件

broker 的 全局唯一编号,不能重复 ,只能是数字 。
 **broker.id=0** 

处理网络请求的线程数量
num.network.threads=3
用来处理磁盘IO 的线程数量
num.io.threads=8
发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
请求套接字的缓冲区大小
socket.request.max.bytes=104857600
kafka 运行日志数据存放的路径 ,路径不需要提前创建 kafka 自动帮你创建 ,可以
配置多个磁盘路径,路径与路径之间可以用分隔
log.dirs=/opt/module/kafka/ datas
topic 在当前 broker 上的分区个数
num. partitions=1
用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
配置连接 Zookeeper 集群 地址 (在 zk 根目录下创建 kaf ka ,方便管理
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181 /kafka

集群部署kafka的shell脚本

#! /bin/bash
case $1 in
"
for i in hadoop102 hadoop103 hadoop104
do
echo " 启动 $i Kafka
ssh $i "/opt/module/kafka/bin/kafka server start.sh
daemon /opt/module/kafka/config/server.properties"
done
"
for i in hadoop102 hadoop103 hadoop104
do
echo " 停止 $i Kafka
ssh $i "/opt/module/kafka/bin/kafka server stop.sh "
done
esac

二、kafka生产者

在消息发送的过程中,涉及到了两个线程——main 线程和Sender 线程。在main 线程中创建了一个双端队列RecordAccumulator。main 线程将消息发送给RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到Kafka Broker。

buffer.memory  RecordAccumulator 缓冲区总大小, 默认 32 m ;
reques是批量应答的时候用到,broker最多缓存几个requst;

1、三种生产者发送消息的方式

发送消息的三种方式

<dependencies>
  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.0.0</version>
  </dependency>
</dependencies>

1、普通异步发送的生产者代码,不关注返回结果

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class CustomProducer {
     public static void main(String[] args) throws InterruptedException {

         // 1. 创建 kafka 生产者的配置对象
         Properties properties = new Properties();

         // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
 
         // key,value 序列化(必须):key.serializer,value.serializer
         
         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
 
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
       
        // 3. 创建 kafka 生产者对象
         KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
              kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));


            // 异步发送 默认
            // kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));

             // 同步发送
             kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
            
            //send的返回值是一个Future对象,可以通过get方法取出其中存储的发送消息的一些元信息RecordMetadata
               Future<RecordMetadata> sendResult = producer.send(record);
               RecordMetadata recordMetadata = sendResult.get();

         }
        // 5. 关闭资源
         kafkaProducer.close();
    }
}

2、带回调函数的异步发送,通过回调函数获取返回结果,且回调函数是异步的

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CustomProducerCallback {
     public static void main(String[] args) throws InterruptedException {
        
         // 1. 创建 kafka 生产者的配置对象
         Properties properties = new Properties();
         // 2. 给 kafka 配置对象添加配置信息
         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
         
         // key,value 序列化(必须):key.serializer,value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
        // 3. 创建 kafka 生产者对象
         KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        
         // 4. 调用 send 方法,发送消息
         for (int i = 0; i < 5; i++) {
             // 添加回调
             kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
              
              // 该方法在 Producer 收到 ack 时调用,为异步调用
             @Override
             public void onCompletion(RecordMetadata metadata, Exception exception) {
             if (exception == null) {
             
                // 没有异常,输出信息到控制台
                 System.out.println(" 主题: " + metadata.topic() + "->" + "分区:" + metadata.partition());
             } else {
                 // 出现异常打印
                exception.printStackTrace();
             }
             }
           });
         // 延迟一会会看到数据发往不同分区
         Thread.sleep(2);
         }
         // 5. 关闭资源
         kafkaProducer.close();
     }
}

3、同步发送,关注返回结果,同步的会被阻塞

public static void sendMessageWithCareResult() throws ExecutionException, InterruptedException {
        
        //ProducerRecord的三个参数,topic,发送的key,发送的value
        ProducerRecord<String,String> record = new ProducerRecord<>("user-info-topic","name","路飞");
        
        //send的返回值是一个Future对象,可以通过get方法取出其中存储的发送消息的一些元信息RecordMetadata
        Future<RecordMetadata> sendResult = producer.send(record);
        RecordMetadata recordMetadata = sendResult.get();
       
       //打印下发送消息的topic,partition,offset
        System.out.println(String.format("发送结果:topic:%s,存储的partition:%s,offset:%s",
                recordMetadata.topic(),
                recordMetadata.partition(),
                recordMetadata.offset()));

        producer.close();
    }

通过topic里面的pertition分区来提高消息处理的效率

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。 (2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

在IDEA中全局查找(ctrl +n)ProducerRecord类,在类中可以看到构造方法:

  1. // 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)

kafkaProducer.send(new ProducerRecord<>("first", 1,"","atguigu " + i)); 第一个参数是topic,第二个参数是pertition,第三个是消息messenger

  1. 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值

// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余, 分别发往 1、2、0

kafkaProducer.send(new ProducerRecord<>("first", "a","atguigu " + i), new Callback() { });

2、自定义分区器

定义类实现 Partitioner 接口来重写分区方法;

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;

/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {

 /**
* 返回信息对应的分区
 * @param topic 主题
 * @param key 消息的 key
 * @param keyBytes 消息的 key 序列化后的字节数组
 * @param value 消息的 value
 * @param valueBytes 消息的 value 序列化后的字节数组
 * @param cluster 集群元数据可以查看分区信息
 * @return
 */
 @Override
 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
     // 获取消息
     String msgValue = value.toString();
     
    // 创建 partition
     int partition;
 
    // 判断消息是否包含 atguigu
     if (msgValue.contains("atguigu")){
         partition = 0;
     }else {
         partition = 1;
     }
     
    // 返回分区号
        return partition;
     }
     
    // 关闭资源
     @Override
     public void close() {
     }
     
    // 配置方法
     @Override
     public void configure(Map<String, ?> configs) {
     }
}

使用分区器的方法,在生产者的配置中添加分区器参数。其实就是再properties里面引入分区的配置类

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CustomProducerCallbackPartitions {
 public static void main(String[] args) throws InterruptedException {
    
    Properties properties = new Properties();
 
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
    
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
   properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
   // 添加自定义分区器
  properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");
     KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

 for (int i = 0; i < 5; i++) {
 
     kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
     @Override
     public void onCompletion(RecordMetadata metadata, Exception e) {
     if (e == null){
     System.out.println(" 主题: " + metadata.topic() + "->" + "分区:" + metadata.partition()
     );
 

    }else {
         e.printStackTrace();
     }
     }
 });

 }
 kafkaProducer.close();
 }
}

为了提高生成者的效率,还可以通过配置以下内容

// batch.size:批次大小,默认 16K properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

// linger.ms:等待时间,默认 0 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd

properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

三、消息可靠性问题

1、可靠性和重复性

可靠性总结:

acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

同意是通过在properties里面配置ack的机制

// 设置 acks
 properties.put(ProducerConfig.ACKS_CONFIG, "all");
 // 重试次数 retries,默认是 int 最大值,2147483647
 properties.put(ProducerConfig.RETRIES_CONFIG, 3);

数据重复性问题

• 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
• 最多一次(At Most Once)= ACK级别设置为0

• 总结:
At Least Once可以保证数据不丢失,但是不能保证数据不重复;
At Most Once可以保证数据不重复,但是不能保证数据不丢失。
• 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。

2、幂等性和事务

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。

重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。

所以幂等性只能保证的是在单分区单会话内不重复。

开启幂等性参数 :enable.idempotence 默认为 true,false 关闭。

事务:

开启事务前必须开启幂等性

Kafka 的事务一共有如下 5 个 API

// 1 初始化事务
void initTransactions();

// 2 开启事务
void beginTransaction() throws ProducerFencedException;

// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws ProducerFencedException;

// 4 提交事务
void commitTransaction() throws ProducerFencedException;

// 5 放弃事务(类似于回滚事务的操作)

void abortTransaction() throws ProducerFencedException;
单个 Producer,使用事务保证消息的仅一次发送

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class CustomProducerTransactions {

     public static void main(String[] args) throws InterruptedException {
            // 1. 创建 kafka 生产者的配置对象
         Properties properties = new Properties();
 
        // 2. 给 kafka 配置对象添加配置信息
         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
 
        // key,value 序列化
         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
        // 设置事务 id(必须),事务 id 任意起名
         properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
 
        // 3. 创建 kafka 生产者对象
         KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

         // 初始化事务
         kafkaProducer.initTransactions();
         // 开启事务
         kafkaProducer.beginTransaction();
 
        try {
             // 4. 调用 send 方法,发送消息
         for (int i = 0; i < 5; i++) {
             // 发送消息
             kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
         }
        
        // int i = 1 / 0;
         // 提交事务
         kafkaProducer.commitTransaction();
 
    } catch (Exception e) {
     // 终止事务
         kafkaProducer.abortTransaction();
     } finally {
 
    // 5. 关闭资源
     kafkaProducer.close();
     }
  }
}

3、生产者消息的有序性

主要是开启幂等性后会通过其序号来落盘,如果失败,则会缓存起来知道正序的到来才落盘

四、broker工作流程

本章主要介绍kafka如何存储数据的

在zookeeper的服务端存储的Kafka相关信息:

1)/kafka/brokers/ids [0,1,2] 记录有哪些服务器

2)/kafka/brokers/topics/first/partitions/0/state {"leader":1 ,"isr":[1,0,2] } 记录谁是Leader,有哪些服务器可用
    Zookeeper中存储的Kafka 信息

3)/kafka/controller  {“brokerid”:0}  辅助选举Leader

ISR 是lead 跟follow里面通讯正常的节点

1、节点的服役和退役

2、kafka的副本

(1 Kafka 副本作用:提高数据可靠性 。
(2 Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加 磁盘存储空间,增加网络上数据传输,
    降低效率。
(3 Kafka 中副本分为: Leader 和 Follower 。 Kafka 生产者只会把数据发往 Leader然后 Follower 找 Leader 
    进行同步数据。
(4 Kafka 分区中的所有副本统称为 AR Assigned Repllicas )。AR =ISR + OSR
  ISR表示 和 Leader 保持同步的 Follower 集合。 如果 Follower 长时间未向 Leader 发送通信请求或同步数据,
  则该Follower 将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader 发生故障之后,
  就会从ISR 中选举新的Leader。OSR,表示Follower 与Leader 副本同步时,延迟过多的副本。

3、lead选举流程

Kafka 集群中有一个broker 的Controller 会被选举为Controller Leader,负责管理集群
broker 的上下线,所有topic 的分区副本分配和Leader 选举等工作。
Controller 的信息同步工作是依赖于Zookeeper 的。

4. lead 和follow故障处理

1)Follower故障

(1) Follower发生故障后会被临时踢出ISR (2) 这个期间Leader和Follower继续接收数据 (3)待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。 (4)等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。

2)Leader故障

(1) Leader发生故障之后,会从ISR中选出一个新的Leader (2)为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

5. Leader Partition 负载平衡

正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。
但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,
其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。

6. 生产经验——增加副本因子

 在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

7.topic 里面文件存储的机制

Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数 据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制, 将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。

说明:日志存储参数配置

参数描述 log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。

log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引。

8.文件清理策略

(1)检查是否过期的配置

     Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
      log.retention.hours,最低优先级小时,默认 7 天。
      log.retention.minutes,分钟。
      log.retention.ms,最高优先级毫秒。
      log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。


(2)过期清楚策略

    Kafka 中提供的日志清理策略有 delete 和 compact 两种。

    delete 日志删除:将过期数据删除
     log.cleanup.policy = delete 所有数据启用删除策略
    (1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
    (2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。
        log.retention.bytes,默认等于-1,表示无穷大。
        思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?


    compact日志压缩:对于相同key的不同value值,只保留最后一个版本。

        压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大
        的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。
        这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息
        集里就保存了所有用户最新的资料。
        log.cleanup.policy = compact 所有数据启用压缩策

读数据采用稀疏索引,可以快速定位要消费的数据,写入的时候是通过顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

零拷贝: Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用 走应用层,传输效率高。

PageCache页缓存: Kafka重度依赖底层操作系统提供的PageCache功 能。当上层有写操作时,操作系统只是将数据写入 PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存 都当做了磁盘缓存

五、kafka的消费者

消费者组 :由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。 • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

1、基础消费者代码:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer {
     public static void main(String[] args) {
 
            // 1.创建消费者的配置对象
             Properties properties = new Properties();
            // 2.给消费者配置对象添加参数
             properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
 
            // 配置序列化 必须
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

           // 配置消费者组(组名任意起名) 必须
             properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
 
          // 创建消费者对象
             KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
 
          // 注册要消费的主题(可以消费多个主题)
            ArrayList<String> topics = new ArrayList<>();
            topics.add("first");
            kafkaConsumer.subscribe(topics);
 
         // 拉取数据打印
         while (true) {
             // 设置 1s 中消费一批数据
             ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
             // 打印消费到的数据
             for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                     System.out.println(consumerRecord);
                 }
         }
     }
}

如果是消费某个topic里面特定的partitions,需要在配置里面注明

// 消费某个主题的某个分区数据
 ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
 topicPartitions.add(new TopicPartition("first", 0));// 消费first主题下的0分区
 kafkaConsumer.assign(topicPartitions);

复制一份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者,代码与上面的基础消费者完全一样,消费者组名还是test 这样就能在test消费者组里面启动两个消费者

2、分区的分配以及再平衡

(1、一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据。

(2、Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,
 修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。

常见参数的配置:

heartbeat.interval.ms : Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于session.timeout.ms,也不应该高于session.timeout.ms 的 1/3。

session.timeout.ms : Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms: 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。

partition.assignment.strategy: 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range + CooperativeSticky。Kafka 可以同时使用多个分区分配策略。

可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky

3、四种分区分配策略

1)Range 分区策略原理

  1. RoundRobin 分区策略原理

    RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后 通过轮询算法来分配 partition 给到各个消费者。

// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

0 号消费者宕机后,0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,
 分别由 1 号消费者或者 2 号消费者消费。

3)Sticky 以及再平衡

  粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,
  尽量少的调整分配的变动,可以节省大量的开销。粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,
  首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);

4、offset的位移

消费完一批数据后,需要提交offset,可以设置自动提交和手动提交;默认是自动提交

设置自动自交offset和提交时间;这种方式时间不好控制

// 是否自动提交 offset
 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
 // 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

同步提交offset和异步提交:

  虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因
  此Kafka还提供了手动提交offset的API。手动提交offset的方法有两种:分别是commitSync(同步提交)
  和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,
  同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);
  而异步提交则没有失败重试机制,故有可能提交失败。

      • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
      • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class CustomConsumerByHandSync {
     public static void main(String[] args) {

         // 1. 创建 kafka 消费者配置类
         Properties properties = new Properties();
         // 2. 添加配置参数
         // 添加连接
         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
         
        // 配置序列化
            
         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                                        "org.apache.kafka.common.serialization.StringDeserializer");
 
         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                                        "org.apache.kafka.common.serialization.StringDeserializer");
 
         // 配置消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

         // 是否自动提交 offset
         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
         //3. 创建 kafka 消费者
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
 
        //4. 设置消费主题 形参是列表
         consumer.subscribe(Arrays.asList("first"));
         //5. 消费数据
         while (true){
         // 读取消息
         ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
         
        // 输出消息
         for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                 System.out.println(consumerRecord.value());
         }
         
          // 同步提交 offset
         consumer.commitSync();
        
         // 异步提交 offset
         consumer.commitAsync();
     }
   }
}

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此 吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式

5、指定offser消费

auto.offset.reset = earliest | latest | none 默认是 latest。

当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

(1)earliest:自动将偏移量重置为最早的偏移量,--from-beginning。
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
(4)任意指定 offset 位移开始消费
// 2 订阅一个主题
 ArrayList<String> topics = new ArrayList<>();
 topics.add("first");
 kafkaConsumer.subscribe(topics);

 Set<TopicPartition> assignment= new HashSet<>();
 while (assignment.size() == 0) {
     kafkaConsumer.poll(Duration.ofSeconds(1));
     // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
     assignment = kafkaConsumer.assignment();
 }

 // 遍历所有分区,并指定 offset 从 1700 的位置开始消费
 for (TopicPartition tp: assignment) {
     kafkaConsumer.seek(tp, 1700);
 }

 // 3 消费该主题数据
 while (true) {
     ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
     for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
     System.out.println(consumerRecord);
     }
 }

(5)指定时间消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?
package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;

public class CustomConsumerForTime {
     public static void main(String[] args) {
 
    // 0 配置信息

    Properties properties = new Properties();
     // 连接
     properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
 
    // key value 反序列化
 
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
    
     // 1 创建一个消费者
     KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
     // 2 订阅一个主题
     ArrayList<String> topics = new ArrayList<>();
     topics.add("first");
     kafkaConsumer.subscribe(topics);
    
     Set<TopicPartition> assignment = new HashSet<>();

     while (assignment.size() == 0) {
         kafkaConsumer.poll(Duration.ofSeconds(1));
         // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
         assignment = kafkaConsumer.assignment();
     }

     HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();

     // 封装集合存储,每个分区对应一天前的数据
    for (TopicPartition topicPartition : assignment) {
         timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
     }

    // 获取从 1 天前开始消费的每个分区的 offset
    Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
 
   // 遍历每个分区,对每个分区设置消费时间。
    for (TopicPartition topicPartition : assignment) {
          OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
         // 根据时间指定开始消费的位置
         if (offsetAndTimestamp != null){
             kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
         }
     }

    // 3 消费该主题数据
     while (true) {
         ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
     
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                 System.out.println(consumerRecord);
     }
     }
 }
}

6、重复消费和漏消费问题

重复消费:已经消费了数据,但是 offset 没提交。

漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比 如MySQL)。这部分知识会在后续项目部分涉及。

7. 数据积压问题

六、kafka集成其他框架

1.kafka与flink集成

kafka作为生产者发送消息到flink,也可以kafka作为消费者从flink里面消费数据``

org.apache.flink flink-java 1.13.1

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.13.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.13.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
        <version>1.7.2</version>
    </dependency>

</dependencies>
生产者
public class MyFlinkKafkaProducer1 {

public static void main(String[] args) throws Exception {
    // 准备环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(3);
    // 准备数据源
    ArrayList<String> strings = new ArrayList<>();
    strings.add("test1");
    strings.add("test2");
    DataStreamSource<String> stream = env.fromCollection(strings);
    //创建kafka生产者 -- 未完善
    Properties properties = new Properties();

    FlinkKafkaProducer<String> first = new FlinkKafkaProducer<>("first", new SimpleStringSchema(), properties);
    // 添加数据源
    stream.addSink((SinkFunction<String>) first);
    // 执行
    env.execute();

}
}
消费者
public class MyFlinkKafkaConsumer {

public static void main(String[] args) {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(3);

    Properties properties = new Properties();

    FlinkKafkaConsumer<String> first =
                new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), properties);

}
}

 2、springboot与kafka的集成

org.springframework.kafka spring-kafka 

#配置连接的集群
spring.kafka.bootstrap-servers=ip:socket,ip1:socket
# key value 的序列化
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

生产者

@RestController
@RequestMapping("api/v1/kafka")
public class ProducerController {

    @Autowired
    KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("/producedata")
    public String data(String msg){
        kafkaTemplate.send("first",msg);
        return "true";
    }
}

消费者

# 反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#消费者组id
spring.kafka.consumer.group-id=testcon
@Configuration
public class ConsumerController {

    @Autowired
    KafkaTemplate<String,String> kafkaTemplate;

    @KafkaListener(topics = "first")
    public void getData(String msg){
        System.out.println(msg);
    }
}`

相关文章:

  • 【vue3】06. 跟着官网学习vue3
  • 任务十一 BERT
  • MyBatis实现多层级collection嵌套查询
  • Containerd【轻量级容器管理工具】
  • 计算机毕业设计ssm+vue基本微信小程序的图书馆座位管理系统
  • 腾讯核心高级架构师汇总Java全栈知识点笔记,“吃透”后成功上岸!
  • 169.多数元素
  • webpack拓展篇(六十七):webpack5 新特性解析
  • CF515E Drazil and Park【思维+线段树】
  • CodeForces 1717E【线性筛】
  • Java程序猿搬砖笔记(九)
  • ROS1云课→16机器人模型从urdf到xacro
  • 花好月圆│以代码寄相思,绘嫦娥之奔月
  • WiFi基础学习到实战(一)
  • Java 在Word文档中添加艺术字
  • [NodeJS] 关于Buffer
  • 【跃迁之路】【585天】程序员高效学习方法论探索系列(实验阶段342-2018.09.13)...
  • 2017-09-12 前端日报
  • C++入门教程(10):for 语句
  • extjs4学习之配置
  • JavaScript-Array类型
  • Laravel Telescope:优雅的应用调试工具
  • mysql innodb 索引使用指南
  • MySQL QA
  • vue-cli在webpack的配置文件探究
  • vue-router的history模式发布配置
  • 搭建gitbook 和 访问权限认证
  • 检测对象或数组
  • 简单基于spring的redis配置(单机和集群模式)
  • 模仿 Go Sort 排序接口实现的自定义排序
  • 前端代码风格自动化系列(二)之Commitlint
  • 算法-图和图算法
  • 原生Ajax
  • 【干货分享】dos命令大全
  • 国内唯一,阿里云入选全球区块链云服务报告,领先AWS、Google ...
  • ​ 轻量应用服务器:亚马逊云科技打造全球领先的云计算解决方案
  • ​LeetCode解法汇总1410. HTML 实体解析器
  • ​sqlite3 --- SQLite 数据库 DB-API 2.0 接口模块​
  • #100天计划# 2013年9月29日
  • #HarmonyOS:Web组件的使用
  • #我与Java虚拟机的故事#连载18:JAVA成长之路
  • (4)(4.6) Triducer
  • (c语言)strcpy函数用法
  • (C语言)球球大作战
  • (Oracle)SQL优化技巧(一):分页查询
  • (PyTorch)TCN和RNN/LSTM/GRU结合实现时间序列预测
  • (solr系列:一)使用tomcat部署solr服务
  • (编译到47%失败)to be deleted
  • (一)appium-desktop定位元素原理
  • (一)Thymeleaf用法——Thymeleaf简介
  • (转)memcache、redis缓存
  • (转)VC++中ondraw在什么时候调用的
  • (转)人的集合论——移山之道
  • (转)真正的中国天气api接口xml,json(求加精) ...
  • (转载)Linux网络编程入门