当前位置: 首页 > news >正文

Kafka源码分析之Sender

        Sender为处理发送produce请求至Kafka集群的后台线程。这个线程更新集群元数据,然后发送produce请求至适当的节点。

        首先,我们先看下它的成员变量:

    /* the state of each nodes connection */
    // 每个节点连接的状态KafkaClient实例client
    private final KafkaClient client;

    /* the record accumulator that batches records */
    // 批量记录的记录累加器RecordAccumulator实例accumulator
    private final RecordAccumulator accumulator;

    /* the metadata for the client */
    // 客户端元数据Metadata实例metadata
    private final Metadata metadata;

    /* the maximum request size to attempt to send to the server */
    // 试图发送到server端的最大请求大小maxRequestSize
    private final int maxRequestSize;

    /* the number of acknowledgements to request from the server */
    // 从server端获得的请求发送的已确认数量acks
    private final short acks;

    /* the number of times to retry a failed request before giving up */
    // 一个失败请求在被放弃之前的重试次数retries
    private final int retries;

    /* the clock instance used for getting the time */
    // 获取时间的时钟Time实例time
    private final Time time;

    /* true while the sender thread is still running */
    // Sender线程运行的标志位,为true表示Sender线程一直在运行
    private volatile boolean running;

    /* true when the caller wants to ignore all unsent/inflight messages and force close.  */
    // 强制关闭的标志位forceClose
    private volatile boolean forceClose;

    /* metrics */
    // 度量指标
    private final SenderMetrics sensors;

    /* param clientId of the client */
    // 客户端的clientId
    private String clientId;

    /* the max time to wait for the server to respond to the request*/
    // 等到server端响应请求的超时时间requestTimeout
    private final int requestTimeout;
        既然是一个线程,我们看下它的主要运行逻辑run()方法,代码如下:

    /**
     * The main run loop for the sender thread
     * sender线程的主循环
     */
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        // 主循环,一直运行直到close被调用
        while (running) {// 标志位running为true,则一直循环
            try {
            	// 调用待参数的run()方法
                run(time.milliseconds());
            } catch (Exception e) {
            	
            	// 截获异常后记录err级别log信息,输出异常
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        // 如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求
        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
            try {
            	// 调用调用待参数的run()方法继续处理
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        
        // 如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求
        if (forceClose) {
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.
            this.accumulator.abortIncompleteBatches();
        }
        
        // 关闭客户端
        try {
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }
        Sender线程的主循环在run()方法内,其主要处理逻辑为:

        1、首先进入一个while主循环,当标志位running为true时一直循环,直到close被调用:

              调用带参数的run(long now)方法,处理消息的发送;

        2、当close被调用时,running被设置为false,while主循环退出:

              2.1、如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求,进入另外一个while循环,调用带参数的run(long now)方法,处理尚未发送完的消息的发送;

              2.2、如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求;

              2.3、关闭客户端。



相关文章:

  • 2016 Google hosts 持续更新【更新于:2016-04-10】
  • 学C#你应该熟练使用ILDasm和Reflector【带视频教程】
  • 《Linux内核设计与实现》第八周读书笔记——第四章 进程调度
  • 关于java写进mysql中文乱码问题
  • 从客户端(?)中检测到有潜在危险的 Request.Path 值 的解决方案
  • 最少换乘
  • rpm命令使用总结
  • 学习ios【1】Objective-C 基本语法
  • Mac使用大全
  • JSONArray转ListT
  • 遥感影像显示相关的技术总结
  • 2016.04.8-2016.04.14这周工作时间和内容
  • Kafka常用操作
  • Scalaz(34)- Free :算法-Interpretation
  • 思科4500系列与华为9300系列交换机介绍与选配
  • 03Go 类型总结
  • Brief introduction of how to 'Call, Apply and Bind'
  • ES6 学习笔记(一)let,const和解构赋值
  • Java精华积累:初学者都应该搞懂的问题
  • Java知识点总结(JavaIO-打印流)
  • leetcode98. Validate Binary Search Tree
  • Linux快速复制或删除大量小文件
  • nginx(二):进阶配置介绍--rewrite用法,压缩,https虚拟主机等
  • passportjs 源码分析
  • spring cloud gateway 源码解析(4)跨域问题处理
  • Transformer-XL: Unleashing the Potential of Attention Models
  • vue中实现单选
  • 面试遇到的一些题
  • 前端每日实战:61# 视频演示如何用纯 CSS 创作一只咖啡壶
  • 微信小程序:实现悬浮返回和分享按钮
  • ​Linux Ubuntu环境下使用docker构建spark运行环境(超级详细)
  • ​卜东波研究员:高观点下的少儿计算思维
  • ( )的作用是将计算机中的信息传送给用户,计算机应用基础 吉大15春学期《计算机应用基础》在线作业二及答案...
  • (09)Hive——CTE 公共表达式
  • (10)Linux冯诺依曼结构操作系统的再次理解
  • (MonoGame从入门到放弃-1) MonoGame环境搭建
  • (机器学习的矩阵)(向量、矩阵与多元线性回归)
  • (论文阅读26/100)Weakly-supervised learning with convolutional neural networks
  • (原)记一次CentOS7 磁盘空间大小异常的解决过程
  • (转)Android学习系列(31)--App自动化之使用Ant编译项目多渠道打包
  • (转)创业的注意事项
  • .net MVC中使用angularJs刷新页面数据列表
  • .net 生成二级域名
  • .net 桌面开发 运行一阵子就自动关闭_聊城旋转门家用价格大约是多少,全自动旋转门,期待合作...
  • .NET/C# 如何获取当前进程的 CPU 和内存占用?如何获取全局 CPU 和内存占用?
  • ??myeclipse+tomcat
  • @ModelAttribute 注解
  • [20160902]rm -rf的惨案.txt
  • [Android]一个简单使用Handler做Timer的例子
  • [bzoj1006]: [HNOI2008]神奇的国度(最大势算法)
  • [BZOJ3757] 苹果树
  • [cogs2652]秘术「天文密葬法」
  • [hdu 3652] B-number
  • [HNOI2006]鬼谷子的钱袋
  • [Interview]Java 面试宝典系列之 Java 多线程