当前位置: 首页 > news >正文

Kafka 消费者启动后与服务器的交互流程

Kafka 消费者启动后与服务器的交互流程涉及多个关键步骤,主要包括初始化、查找组协调器、加入消费者组、分区分配、心跳维持、拉取数据和提交偏移量等。以下是详细的流程说明:

1. 初始化消费者

  • 创建消费者实例:应用程序通过调用KafkaConsumer的构造函数,传入配置参数创建消费者实例。
  • 配置参数:包括bootstrap.servers(Kafka集群地址)、group.id(消费者组ID)、key.deserializervalue.deserializer等。

2. 订阅主题

  • 调用subscribe方法:消费者通过调用subscribe方法订阅一个或多个主题,也可以使用正则表达式来匹配多个主题。

3. 查找组协调器

  • 消费者发送FindCoordinator请求:消费者向Kafka集群中的任意Broker发送FindCoordinator请求,请求中包含消费者组ID。
  • Broker服务器接收请求:Broker根据消费者组ID计算出组协调器所在的Broker节点,并返回该节点的地址信息。
    - 计算组协调器算法:
/*** 表示 内部主题 __consumer_offsets 的分区数量,默认初始化值是50(顺带一提__consumer_offsets 副本因子默认值是3)* 初始值为 -1,表示尚未设置。* 使用 volatile 关键字确保在多线程环境中对该变量的修改是可见的。*/private volatile int numPartitions = -1;/*** 内部主题 __consumer_offsets 的各个分区分布在各个Broker服务器上,算出当前消费者组的协调器在哪个服务器* 消费者组协调器所在brokerId = 消费者组id的哈希值 % 50 */coordinator_broker_id = Utils.abs(groupId.hashCode()) % numPartitionspublic static int abs(int n) {return (n == Integer.MIN_VALUE) ? 0 : Math.abs(n);}
  • 消费者连接组协调器
    • 消费者根据FindCoordinator响应中的地址信息,连接到组协调器。

4. 加入消费者组

  • Kafka消费者加入消费者组的过程主要涉及JoinGroup和SyncGroup两个关键步骤。这个过程确保消费者能够以协调的方式加入消费者组,并且分区能够被合理地分配给消费者组内的消费者。以下是JoinGroup和SyncGroup的具体流程:
  • 阶段一:JoinGroup阶段
    • (1)发送JoinGroup请求:当消费者启动并调用poll方法时,如果它尚未加入消费者组,或者需要重新加入(例如,因为再平衡),它会向组协调器发送JoinGroup请求。这个请求包含消费者的group.id、订阅的主题列表以及消费者使用的分区分配策略。

    • (2)等待响应:组协调器收到JoinGroup请求后,会等待一段时间,以允许其他消费者也发送他们的JoinGroup请求。这个等待时间是为了收集同一消费者组内所有消费者的信息。

    • (3)选择Leader:对于同一个消费者组的第一次JoinGroup请求,协调器会选择第一个消费者作为Leader。Leader负责为组内的所有消费者分配分区。Leader的选择基于消费者的JoinGroup请求顺序。

    • (4)分区分配策略:Leader消费者收到协调器的响应后,会根据提供的分区分配策略(如RangeRoundRobin等)和所有消费者的订阅信息来决定分区的分配方案。

  • 阶段二:SyncGroup阶段
    • (1)发送SyncGroup请求:Leader消费者将分区分配方案通过SyncGroup请求发送给组协调器。随后,组内的其他消费者也发送SyncGroup请求,但不包含分区分配方案。

    • (2)协调器广播分区分配方案:组协调器接收到SyncGroup请求后,将leader消费者的分区分配方案广播给消费者组内的所有消费者。

5. 开始消费

  • 消费者接收分区分配:每个消费者接收到SyncGroup响应后,会知道自己被分配到了哪些分区。
  • 初始化分区消费:消费者根据分配到的分区,初始化分区消费的相关资源,如设置分区的偏移量。
  • 拉取数据并消费:消费者开始从分配给它的分区拉取数据并进行消费。

6. 心跳维持和再平衡

  • 发送心跳:消费者会定期向组协调器发送心跳,以表明它仍然活跃。
  • 处理再平衡:如果有新的消费者加入或现有消费者离开消费者组,协调器会触发再平衡过程,重新分配分区。

7. 拉取数据

  • 发送Fetch请求:消费者向分配给它的分区的Leader Broker发送Fetch请求,请求包含拉取数据的偏移量。
  • 接收数据:Broker返回包含消息的响应,消费者处理这些消息。

8. 提交偏移量

  • 自动提交:如果启用了自动提交(enable.auto.commit=true),消费者会定期自动提交消费的偏移量。
  • 手动提交:如果使用手动提交,消费者需要调用commitSynccommitAsync方法提交偏移量。

9. 处理再平衡

  • 再平衡触发:当消费者组成员发生变化,协调器会触发再平衡。
      1. 消费者加入消费者组
        当新的消费者加入现有的消费者组时,会触发再平衡。新消费者可能是新启动的实例,或者是之前失败后重新加入的实例。
      1. 消费者离开消费者组
      • 主动离开:消费者调用close方法或者主动离开消费者组时,会触发再平衡。
      • 被动离开:如果消费者因为网络问题、崩溃或者长时间未发送心跳而被组协调器认为已经离开,也会触发再平衡。
      1. 订阅主题的分区数变化
        如果消费者组订阅的主题新增了分区,那么为了将新增的分区分配给消费者,也会触发再平衡。
      1. 消费者订阅模式变化
        如果消费者组内的任何消费者更改了其订阅模式(例如,通过subscribe方法订阅了新的主题或者取消订阅了某些主题),这也会触发再平衡。
      1. 组协调器变更
        如果负责管理消费者组的组协调器(Group Coordinator)发生变化(例如,因为原协调器所在的Broker宕机),新的协调器在接管消费者组管理职责时,会触发再平衡。
      1. 主题元数据变化
        消费者定期从Broker获取订阅主题的元数据(如分区信息)。如果检测到元数据变化,可能会触发再平衡,尽管这种情况较少见。
  • 暂停拉取:在再平衡期间,消费者会暂停拉取数据。
  • 重新分配分区:协调器重新分配分区,并通知消费者新的分区分配情况。
  • 恢复拉取:再平衡完成后,消费者恢复拉取数据。

示例代码

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic")); // 订阅主题// 消费者加入消费者组并开始消费的过程是在第一次调用poll方法时触发的
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}// 提交偏移量
consumer.commitSync();

在上述代码中,消费者通过调用subscribe方法订阅了主题test-topic,然后通过调用poll方法触发了加入消费者组的完整流程,包括查找组协调器、加入消费者组、分区分配、拉取数据和提交偏移量等步骤。

总结

Kafka消费者启动后与服务器的交互流程是一个复杂的过程,涉及与组协调器的多次交互。这个流程确保了消费者能够正确地加入消费者组,分区能够被合理地分配给消费者组内的消费者,并且在消费者组成员变化时能够进行适当的再平衡,同时保证了消费者能够从正确的位置继续消费数据。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • python实现提取视频帧的图片
  • vue3 主页面 跳转到子页面后 ,再次切换到主页面后 主页面及其它的所有页面 竟然不显示了的解决。
  • 企业做数据治理的意义是什么
  • 计算机网络必会面经
  • springboot业务层service开发全过程(以mybatis-plus为例)
  • EF访问PostgreSql,如何判断jsonb类型的数组是否包含某个数值
  • k8s学习--k8s集群部署kubesphere的详细过程
  • 2024.8.1(前端服务器的配置以及tomcat环境的配置)
  • 对象转化成base64-再转回对象
  • 人数管控系统助力图书馆实现精准客流统计分析
  • uniapp微信小程序按钮分享定制动态传参
  • git回退未commit、回退已commit、回退已push、合并某一次commit到另一个分支
  • 下载安装docker并解决拉去镜像的connect:connection refused问题(2024.7.31亲测有效)
  • 【Linux】文件描述符 fd
  • uniapp手写滚动选择器
  • Android Volley源码解析
  • Angularjs之国际化
  • Apache Spark Streaming 使用实例
  • Java比较器对数组,集合排序
  • Java小白进阶笔记(3)-初级面向对象
  • js继承的实现方法
  • Linux后台研发超实用命令总结
  • Netty 框架总结「ChannelHandler 及 EventLoop」
  • React 快速上手 - 07 前端路由 react-router
  • Spring Cloud(3) - 服务治理: Spring Cloud Eureka
  • swift基础之_对象 实例方法 对象方法。
  • Yeoman_Bower_Grunt
  • 电商搜索引擎的架构设计和性能优化
  • 力扣(LeetCode)22
  • 前端设计模式
  • 如何使用 JavaScript 解析 URL
  • 使用docker-compose进行多节点部署
  • 我与Jetbrains的这些年
  • 物联网链路协议
  • 消息队列系列二(IOT中消息队列的应用)
  • 源码安装memcached和php memcache扩展
  • 格斗健身潮牌24KiCK获近千万Pre-A轮融资,用户留存高达9个月 ...
  • ​ArcGIS Pro 如何批量删除字段
  • !$boo在php中什么意思,php前戏
  • #Datawhale AI夏令营第4期#AIGC文生图方向复盘
  • #HarmonyOS:软件安装window和mac预览Hello World
  • $分析了六十多年间100万字的政府工作报告,我看到了这样的变迁
  • %3cscript放入php,跟bWAPP学WEB安全(PHP代码)--XSS跨站脚本攻击
  • (3)医疗图像处理:MRI磁共振成像-快速采集--(杨正汉)
  • (python)数据结构---字典
  • (二)pulsar安装在独立的docker中,python测试
  • (附源码)spring boot校园拼车微信小程序 毕业设计 091617
  • (附源码)ssm高校实验室 毕业设计 800008
  • (附源码)计算机毕业设计ssm基于Internet快递柜管理系统
  • (几何:六边形面积)编写程序,提示用户输入六边形的边长,然后显示它的面积。
  • (六)Flink 窗口计算
  • (论文阅读32/100)Flowing convnets for human pose estimation in videos
  • (论文阅读笔记)Network planning with deep reinforcement learning
  • (免费领源码)Java#Springboot#mysql农产品销售管理系统47627-计算机毕业设计项目选题推荐
  • (求助)用傲游上csdn博客时标签栏和网址栏一直显示袁萌 的头像