当前位置: 首页 > news >正文

yarn集群NodeManager日志聚合慢问题解决方案

背景

           正常情况作业提交到 Yarn 集群时,作业完成或者失败后,每个 NM 节点都会对每个 app 作业进行日志聚合操作,存储到hdfs指定的目录下,但是最近发现越来越多的任务通过yarn logs命令无法查询,经过排查发现很多任务的日志聚合变慢了,需要半小时甚至更多时间才能聚合完成。通过阅读源码才发现需要调大yarn.nodemanager.logaggregation.threadpool-size-max这个参数,默认是100,如果节点任务超过100,超过的任务日志聚合就会进行排队,因此导致聚合缓慢,可以增加该参数到500左右。

<property>
<name>yarn.nodemanager.logaggregation.threadpool-size-max</name>
<value>500</value>
</property>

Yarn日志聚合源码分析

                  为了彻底弄明白聚合日志如何工作的,就需要了解 Yarn 中处理聚合日志的服务在哪里创建的,根据 ApplicationMaster启动及资源申请源码分析 文章分析,了解到Yarn 的第一个 Container 启动是用于 AppAttmpt 角色,也就是我们通常在 Yarn UI 界面看到的 ApplicationMaster 服务。所以我们来看看一个作业的第一个 Container 是如何启动以及如何创建日志记录组件 LogHandler 的。ApplicationMaster 通过调用 RPC 函数ContainerManagementProtocol#startContainers() 开始启动 Container,即 startContainerInternal() 方法,这部分逻辑做了两件事:

  • 发送 ApplicationEventType.INIT_APPLICATION 事件,对应用程序资源的初始化,主要是初始化各类必需的服务组件(如日志记录组件 LogHandler、资源状态追踪组件 LocalResourcesTrackerImpl等),供后续 Container 启动,通常来自 ApplicationMaster 的第一个 Container 完成,这里的 if 逻辑针对一个 NM 节点上运行作业的所有 Containers 只调用一次,后续的 Container 跳过这段 Application 初始化过程。
  • 发送 ApplicationEventType.INIT_CONTAINER 事件,对 Container 进行初始化操作。(这部分事件留在 Container 启动环节介绍)
//位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
      ContainerTokenIdentifier containerTokenIdentifier,
      StartContainerRequest request) throws YarnException, IOException {
 
    // 省略Token认证及ContainerLaunchContext上下文初始化
 
    this.readLock.lock();
    try {
      if (!serviceStopped) {
        // Create the application
        Application application =
            new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
         
        // 应用程序的初始化,供后续Container使用,这个逻辑只调用一次,通常由来自ApplicationMaster的第一个Container完成
        if (null == context.getApplications().putIfAbsent(applicationID,
          application)) {
          LOG.info("Creating a new application reference for app " + applicationID);
          LogAggregationContext logAggregationContext =
              containerTokenIdentifier.getLogAggregationContext();
          Map<ApplicationAccessType, String> appAcls =
              container.getLaunchContext().getApplicationACLs();
          context.getNMStateStore().storeApplication(applicationID,
              buildAppProto(applicationID, user, credentials, appAcls,
                logAggregationContext));
 
 
          // 1.向 ApplicationImpl 发送 ApplicationEventType.INIT_APPLICATION 事件
          dispatcher.getEventHandler().handle(
            new ApplicationInitEvent(applicationID, appAcls,
              logAggregationContext));
        }
 
        // 2.向 ApplicationImpl 发送 ApplicationEventType.INIT_CONTAINER 事件
        this.context.getNMStateStore().storeContainer(containerId, request);
        dispatcher.getEventHandler().handle(
          new ApplicationContainerInitEvent(container));
 
        this.context.getContainerTokenSecretManager().startContainerSuccessful(
          containerTokenIdentifier);
        NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
          "ContainerManageImpl", applicationID, containerId);
        // TODO launchedContainer misplaced -> doesn't necessarily mean a container
        // launch. A finished Application will not launch containers.
        metrics.launchedContainer();
        metrics.allocateContainer(containerTokenIdentifier.getResource());
      } else {
        throw new YarnException(
            "Container start failed as the NodeManager is " +
            "in the process of shutting down");
      }
    } finally {
      this.readLock.unlock();
    }
  }

这里主要看看第1件事情,即向 ApplicationImpl 发送 ApplicationEventType.INIT_APPLICATION 事件,事件对应的状态机为 AppInitTransition 状态机。

//位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
// Transitions from NEW state
           .addTransition(ApplicationState.NEW, ApplicationState.INITING,
               ApplicationEventType.INIT_APPLICATION, new AppInitTransition())

AppInitTransition 状态机会对日志聚合组件服务进行初始化,关键行动是向调度器发送 LogHandlerEventType.APPLICATION_STARTED 事件。

//位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
  /**
   * Notify services of new application.
   * 
   * In particular, this initializes the {@link LogAggregationService}
   */
  @SuppressWarnings("unchecked")
  static class AppInitTransition implements
      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
    @Override
    public void transition(ApplicationImpl app, ApplicationEvent event) {
      ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
      app.applicationACLs = initEvent.getApplicationACLs();
      app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);

      // 初始化日志聚合组件服务
      // Inform the logAggregator
      app.logAggregationContext = initEvent.getLogAggregationContext();
      // 向调度器发送 LogHandlerEventType.APPLICATION_STARTED 事件
      app.dispatcher.getEventHandler().handle(
          new LogHandlerAppStartedEvent(app.appId, app.user,
              app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
              app.applicationACLs, app.logAggregationContext)); 
    }
  }

想要弄清楚 LogHandlerEventType.APPLICATION_STARTED 事件做了什么,就要知道 LogHandlerEventType 类注册的事件处理器是什么以及事件处理器做了什么事情。这里的 register 方法对 LogHandlerEventType 类进行了注册,对应的 logHandler 事件处理器为 LogAggregationService 服务。

//位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  @Override
  public void serviceInit(Configuration conf) throws Exception {
    // 定义日志处理器
    LogHandler logHandler =
      createLogHandler(conf, this.context, this.deletionService);
    addIfService(logHandler);
    // 注册 LogHandlerEventType 事件,logHandler 为对应的处理器
    dispatcher.register(LogHandlerEventType.class, logHandler);
    
    waitForContainersOnShutdownMillis =
        conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
            YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
        conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
            YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
        SHUTDOWN_CLEANUP_SLOP_MS;

    super.serviceInit(conf);
    recover();
  }

具体创建 logHandler 对象的调用,由于集群开启了日志聚合功能(由参数 yarn.log-aggregation-enable 控制),这里返回 LogAggregationService 服务。

//位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
    protected LogHandler createLogHandler(Configuration conf, Context context,
      DeletionService deletionService) {
    if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
        YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
      // 判断是否启用了日志聚合,由于集群开启了日志聚合,这里初始化 LogAggregationService 服务
      return new LogAggregationService(this.dispatcher, context,
          deletionService, dirsHandler);
    } else {
      return new NonAggregatingLogHandler(this.dispatcher, deletionService,
                                          dirsHandler,
                                          context.getNMStateStore());
    }
  }

弄清楚了 LogHandlerEventType 类注册的服务是 LogAggregationService,我们就进入 LogAggregationService 类的 handle() 方法,看看上面的 LogHandlerEventType.APPLICATION_STARTED 事件做了什么事。

//位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
  @Override
  public void handle(LogHandlerEvent event) {
    switch (event.getType()) {
      // APPLICATION_STARTED 事件处理流程
      case APPLICATION_STARTED:
        LogHandlerAppStartedEvent appStartEvent =
            (LogHandlerAppStartedEvent) event;
        initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
            appStartEvent.getCredentials(),
            appStartEvent.getLogRetentionPolicy(),
            appStartEvent.getApplicationAcls(),
            appStartEvent.getLogAggregationContext());
        break;
      case CONTAINER_FINISHED:
        // 省略
      case APPLICATION_FINISHED:
        //省略
      default:
        ; // Ignore
    }
  }

LogHandlerEventType.APPLICATION_STARTED 事件的关键逻辑在 initApp() 方法的调用。这段逻辑主要做了三件事:

  1. 判断 HDFS 上日志聚合的根目录是否存在,即 /tmp/logs/ 目录(具体为 hdfs://nameservice/tmp/logs),由参数 yarn.nodemanager.remote-app-log-dir 控制。(注意:这里的请求会阻塞读 HDFS)
  2. 创建作业日志聚合的 HDFS 目录,并初始化 app 日志聚合实例,采用线程池的方式启动日志聚合进程。(重点,这里会有请求阻塞写 HDFS,并且通过有限大小的线程池异步创建日志聚合线程去做日志的聚合)
  3. 根据构建的 ApplicationEvent 事件,向发送 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 事件,告知处理器日志聚合服务初始化完成。
//位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
  private void initApp(final ApplicationId appId, String user,
      Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
      Map<ApplicationAccessType, String> appAcls,
      LogAggregationContext logAggregationContext) {
    ApplicationEvent eventResponse;
    try {
      // 1、 判断 HDFS 上日志聚合的根目录是否存在,即 `/tmp/logs/` 目录(具体为 `hdfs://nameservice/tmp/logs`),由参数 `yarn.nodemanager.remote-app-log-dir` 控制
      verifyAndCreateRemoteLogDir(getConfig());
      // 重点:2、创建作业日志聚合的 HDFS 目录,并初始化 app 日志聚合实例,采用线程池的方式启动日志聚合进程
      initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls,
          logAggregationContext);
      // 构建 ApplicationEvent 事件
      eventResponse = new ApplicationEvent(appId,
          ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
    } catch (YarnRuntimeException e) {
      LOG.warn("Application failed to init aggregation", e);
      eventResponse = new ApplicationEvent(appId,
          ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
    }
    // 3、根据构建的 ApplicationEvent 事件,向发送 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 事件,告知处理器日志聚合服务初始化完成
    this.dispatcher.getEventHandler().handle(eventResponse);
  }

第1件事比较简单,主要是是判断 HDFS 聚合日志的根目录是否存在,由于目录一般都存在,这一块主要是读 HDFS 请求。我们主要来看看 initApp() 方法做的第2件事,可以看到第3件事是发送 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 表示日志聚合服务初始化完成,包括创建作业在 HDFS 的日志聚合目录和启动日志聚合线程。所以基本可以知道第2件事的 initAppAggregator() 是会创建作业日志聚合目录,并启动日志聚合线程,具体的我们来看代码。

这段代码其实主要做了两件事:

  1. 调用 createAppDir() 方法执行 HDFS 写请求为作业创建日志聚合的目录,即 hdfs://nameservice/tmp/logs/<user>/logs/ 目录,这里的写逻辑如果成功则只调用一次,一般是由第一个 Container 创建(即作业的 ApplicationMaster Container),其他 Container 只执行 HDFS 读请求判断该目录是否存在即可。
  2. 通过 threadPool 线程池创建每个作业在 NM 节点的日志聚合线程,异步处理本地日志的上传,该线程池大小由参数 yar、n.nodemanager.logaggregation.threadpool-size-max 控制,默认大小为 100.
//位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
  protected void initAppAggregator(final ApplicationId appId, String user,
      Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
      Map<ApplicationAccessType, String> appAcls,
      LogAggregationContext logAggregationContext) {

    // Get user's FileSystem credentials
    final UserGroupInformation userUgi =
        UserGroupInformation.createRemoteUser(user);
    if (credentials != null) {
      userUgi.addCredentials(credentials);
    }

    // New application
    final AppLogAggregator appLogAggregator =
        new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
            getConfig(), appId, userUgi, this.nodeId, dirsHandler,
            getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
            appAcls, logAggregationContext, this.context,
            getLocalFileContext(getConfig()));
    if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
      throw new YarnRuntimeException("Duplicate initApp for " + appId);
    }
    // wait until check for existing aggregator to create dirs
    YarnRuntimeException appDirException = null;
    try {
      // 创建作业日志聚合目录,即 hdfs://nameservice/tmp/logs/<user>/logs/ 目录
      // Create the app dir
      createAppDir(user, appId, userUgi);
    } catch (Exception e) {
      appLogAggregator.disableLogAggregation();
      if (!(e instanceof YarnRuntimeException)) {
        appDirException = new YarnRuntimeException(e);
      } else {
        appDirException = (YarnRuntimeException)e;
      }
      appLogAggregators.remove(appId);
      closeFileSystems(userUgi);
      throw appDirException;
    }

    // 创建作业的日志聚合线程,并通过线程池启动日志聚合线程,异步上传 NM 节点的日志
    // Schedule the aggregator.
    Runnable aggregatorWrapper = new Runnable() {
      public void run() {
        try {
          appLogAggregator.run();
        } finally {
          appLogAggregators.remove(appId);
          closeFileSystems(userUgi);
        }
      }
    };
    this.threadPool.execute(aggregatorWrapper);
  }

至此,从日志聚合服务组件的创建,到为作业初始化 HDFS 聚合日志目录,到启动日志聚合线程,整个日志聚合的调用逻辑已介绍完毕

相关文章:

  • Vue--》Vue中实现数据代理
  • 【Python学习笔记】第二章循环:while循环,for循环,break和continue语句,死循环,循环的嵌套
  • 计算机网络【IP协议与以太网】
  • iVX低代码平台系列详解 --界面功能(一)
  • 硬件科普系列之硬盘——总线、协议、接口和固态硬盘篇
  • 目标检测算法——遥感影像数据集资源汇总(附下载链接)
  • 第一章 时间复杂度和空间复杂度
  • 【论文阅读】SimGNN:A Neural Network Approach to Fast Graph Similarity Computation
  • Spring源码分析之AOP
  • 【0136】【libpq】startup packet应用机制及构建过程(6)
  • 如今Android 工作难找,面试也难~ 这是在劝退吗?
  • WebShell后门检测与WebShell箱子反杀
  • Java毕业设计选题推荐 SpringBoot毕设项目分享
  • 【Linux kernel/cpufreq】framework ----cpufreq core(1)
  • 一文2000字手把手教你自动化测试平台建设分享
  • 【108天】Java——《Head First Java》笔记(第1-4章)
  • 【翻译】Mashape是如何管理15000个API和微服务的(三)
  • 【跃迁之路】【519天】程序员高效学习方法论探索系列(实验阶段276-2018.07.09)...
  • Brief introduction of how to 'Call, Apply and Bind'
  • Bytom交易说明(账户管理模式)
  • Docker下部署自己的LNMP工作环境
  • ECMAScript 6 学习之路 ( 四 ) String 字符串扩展
  • ES6系统学习----从Apollo Client看解构赋值
  • exports和module.exports
  • gitlab-ci配置详解(一)
  • golang中接口赋值与方法集
  • HashMap ConcurrentHashMap
  • Linux编程学习笔记 | Linux多线程学习[2] - 线程的同步
  • mongo索引构建
  • python大佬养成计划----difflib模块
  • Python语法速览与机器学习开发环境搭建
  • rabbitmq延迟消息示例
  • swift基础之_对象 实例方法 对象方法。
  • Transformer-XL: Unleashing the Potential of Attention Models
  • vue从入门到进阶:计算属性computed与侦听器watch(三)
  • 从地狱到天堂,Node 回调向 async/await 转变
  • 解决iview多表头动态更改列元素发生的错误
  • 聚簇索引和非聚簇索引
  • 前端 CSS : 5# 纯 CSS 实现24小时超市
  • 前端临床手札——文件上传
  • 如何优雅的使用vue+Dcloud(Hbuild)开发混合app
  • 微信小程序填坑清单
  • 一、python与pycharm的安装
  • 怎么将电脑中的声音录制成WAV格式
  • $(function(){})与(function($){....})(jQuery)的区别
  • %@ page import=%的用法
  • (C11) 泛型表达式
  • (env: Windows,mp,1.06.2308310; lib: 3.2.4) uniapp微信小程序
  • (js)循环条件满足时终止循环
  • (二)springcloud实战之config配置中心
  • (附源码)计算机毕业设计ssm本地美食推荐平台
  • (企业 / 公司项目)前端使用pingyin-pro将汉字转成拼音
  • (三维重建学习)已有位姿放入colmap和3D Gaussian Splatting训练
  • .describe() python_Python-Win32com-Excel
  • .net core webapi Startup 注入ConfigurePrimaryHttpMessageHandler