当前位置: 首页 > news >正文

寻找协调器FindCoordinatorRequest请求流程

文章目录

    • 客户端发起请求
    • Broker处理请求
      • 简单校验
      • 获取分区号和元信息
      • 构建返回数据 createResponse
    • 问题

客户端发起请求

我们在分析消费者的时候, 有看到调用FindCoordinatorRequest的请求


    private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
        // initiate the group metadata request
        log.debug("Sending FindCoordinator request to broker {}", node);
        FindCoordinatorRequest.Builder requestBuilder =
                new FindCoordinatorRequest.Builder(
                        new FindCoordinatorRequestData()
                            .setKeyType(CoordinatorType.GROUP.id())
                            .setKey(this.rebalanceConfig.groupId));
        return client.send(node, requestBuilder)
                .compose(new FindCoordinatorResponseHandler());
    }
    

Broker处理请求



def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
    val findCoordinatorRequest = request.body[FindCoordinatorRequest]
	
	// 根据协调器类型判断是否授权过
    if (findCoordinatorRequest.data.keyType == CoordinatorType.GROUP.id &&
        !authorize(request.context, DESCRIBE, GROUP, findCoordinatorRequest.data.key))
      sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
    else if (findCoordinatorRequest.data.keyType == CoordinatorType.TRANSACTION.id &&
        !authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
      sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
    else {
      // get metadata (and create the topic if necessary)
      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
        case CoordinatorType.GROUP =>
          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
          (partition, metadata)

        case CoordinatorType.TRANSACTION =>
          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
          (partition, metadata)

        case _ =>
          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
      }

      def createResponse(requestThrottleMs: Int): AbstractResponse = {
        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
          new FindCoordinatorResponse(
              new FindCoordinatorResponseData()
                .setErrorCode(error.code)
                .setErrorMessage(error.message)
                .setNodeId(node.id)
                .setHost(node.host)
                .setPort(node.port)
                .setThrottleTimeMs(requestThrottleMs))
        }
        val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
        } else {
          val coordinatorEndpoint = topicMetadata.partitions.asScala
            .find(_.partitionIndex == partition)
            .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
            .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
            .flatMap(_.getNode(request.context.listenerName))
            .filterNot(_.isEmpty)

          coordinatorEndpoint match {
            case Some(endpoint) =>
              createFindCoordinatorResponse(Errors.NONE, endpoint)
            case _ =>
              createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
          }
        }
        trace("Sending FindCoordinator response %s for correlation id %d to client %s."
          .format(responseBody, request.header.correlationId, request.header.clientId))
        responseBody
      }
      sendResponseMaybeThrottle(request, createResponse)
    }
  }



简单校验

根据协调器类型判断是否有被授权。协调器类型有 GROUP((byte) 0), TRANSACTION((byte) 1)两种

获取分区号和元信息

这里的接口分两种情况,一个是协调列席为GROUP 一个是 TRANSACTION
他们的处理逻辑都是一样的,只是处理的Topic不一样

GROUP 对应的Topic是 __consumer_offsets

TRANSACTION 对应的Topic是__transaction_state

这里我们主要分析一下 GROUP的情况

  1. 去zk获取/brokers/topic/__consumer_offsets 数据 找到消费者Topic的分区总数。默认是50. (由offsets.topic.num.partitions 控制)找到分区数之和后, 则计算 Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount(groupID按分区数取模运算)获取到了分区号partition;

  2. 然后接着获取该Topic的元信息, 这里需要注意的是 去获取元信息应该走的是什么 监听协议(listenerName) 呢?这个主要是看当前处理请求的Broker是通过哪个入口来的。比如说该Broker有两个监听口,listeners = INTER://xxx.xx.xx.100:9091, OUTSIDE://xxx.xx.xx.101:9092 .如果客户端发起请求的时候是对xxx.xx.xx.101:9092发起的请求,那么这个对应的监听器就是 OUTSIDE . 那么Broker去获取__consumer_offsets元信息发起请求的时候也是会用的 OUTSIDE 协议。

  3. 如果发现没有这个Topic的元信息,则需要去创建__consumer_offsetsTopic 。
    注意:创建这个Topic的的几个特殊属性:

    属性描述
    cleanup.policycompact日志清理策略为 :紧缩
    segment.bytes10010241024一个日志段的大小
    compression.typeproducer压缩类型 为跟生产者保持一致

构建返回数据 createResponse

这里才是真正的找到协调器的主要逻辑, 这里的判断逻辑是

上面我们获取到的分区号是partition, 我们同样获取到了__consumer_offsets的元信息Metadata。

那我们就可以获取到这个分区号, 并且就能够找到该分区的LeaderId所属在哪个Broker上。

知道了哪个Broker, 那我们就能够获取到对应的EndPoint, 一个Broker可能同时有多个EndPoint(配置了多个监听器),那么我们应该使用哪个EndPoint呢?

这个的判断逻辑与上面说过的一样,客户端发起请求时候的监听器是哪个,那么这里就应该用哪个监听器。

在这里插入图片描述

注意:如果找到的分区Leader不存在 那么这个协调器就不存在

然后会返回异常:


The coordinator is not available

问题

  1. 如果客户端走的外网监听器访问的集群,那么在客户端发起请求之后到集群内部,触发内部调用链的请求,那么内部这个调用链是用什么监听器访问的呢?

从客户端 -> Broker -> 其他Broker. 这是一个调用链路,从最开始用的是什么监听器那么这条链路上都是用的这个监听器!具体请看:多网络情况下,Kafka客户端如何选择合适的网络发起请求
在这里插入图片描述

相关文章:

  • jsvmp-某乎 x-zes-96 算法还原
  • 迅速了解JDK线程池以及Spring线程池
  • 前缀和与查分(一维前缀和,二维前缀和(子矩阵的和)一维差分、二维差分(差分矩阵))
  • 2022年是SEO行业凋谢的一年
  • CDR插件开发之Addon插件006 - 初体验:通过C#代码用外挂方式操作CDR中的对象
  • 【2020.09.01】 新学期,新气象
  • 基于云计算与深度学习的常见作物害虫识别系统的设计与实现
  • Flask 学习-22.可插拨视图MethodView类
  • 微信公众号如何获取查题搜题功能接口
  • 聚醋酸乙烯酯接枝聚苯乙烯PVAc-g-PSt微球/接枝-聚甲基丙烯酸甲酯表面(PS-acyl-Cl)的研究
  • 百度网盘的音乐怎么分享到qq音乐里?
  • Q_PLUGIN_METADATA
  • 【Java初阶】面向对象三大特性之继承
  • 标签传播算法(LPA)
  • ElasticSearch(版本7.8.1)中类型Long精度缺失
  • 【跃迁之路】【585天】程序员高效学习方法论探索系列(实验阶段342-2018.09.13)...
  • Apache Pulsar 2.1 重磅发布
  • Docker: 容器互访的三种方式
  • download使用浅析
  • exports和module.exports
  • Java知识点总结(JDBC-连接步骤及CRUD)
  • mysql中InnoDB引擎中页的概念
  • PHP那些事儿
  • Python 使用 Tornado 框架实现 WebHook 自动部署 Git 项目
  • XForms - 更强大的Form
  • 从0搭建SpringBoot的HelloWorld -- Java版本
  • 从零开始在ubuntu上搭建node开发环境
  • 更好理解的面向对象的Javascript 1 —— 动态类型和多态
  • 浅谈web中前端模板引擎的使用
  • 我有几个粽子,和一个故事
  • 通过调用文摘列表API获取文摘
  • ​Python 3 新特性:类型注解
  • ​第20课 在Android Native开发中加入新的C++类
  • ​马来语翻译中文去哪比较好?
  • #define、const、typedef的差别
  • (1)(1.13) SiK无线电高级配置(六)
  • (iPhone/iPad开发)在UIWebView中自定义菜单栏
  • (vue)页面文件上传获取:action地址
  • (博弈 sg入门)kiki's game -- hdu -- 2147
  • (待修改)PyG安装步骤
  • (三)docker:Dockerfile构建容器运行jar包
  • (转)Android学习笔记 --- android任务栈和启动模式
  • (转)iOS字体
  • .bat文件调用java类的main方法
  • .NET6 开发一个检查某些状态持续多长时间的类
  • .net6解除文件上传限制。Multipart body length limit 16384 exceeded
  • .NET国产化改造探索(三)、银河麒麟安装.NET 8环境
  • .NET框架类在ASP.NET中的使用(2) ——QA
  • /usr/local/nginx/logs/nginx.pid failed (2: No such file or directory)
  • @Bean注解详解
  • @require_PUTNameError: name ‘require_PUT‘ is not defined 解决方法
  • [ linux ] linux 命令英文全称及解释
  • [100天算法】-实现 strStr()(day 52)
  • [AIGC] MySQL存储引擎详解
  • [APIO2015]巴厘岛的雕塑