当前位置: 首页 > news >正文

Spark源码分析 -- TaskScheduler

Spark在设计上将DAGScheduler和TaskScheduler完全解耦合, 所以在资源管理和task调度上可以有更多的方案

现在支持, LocalSheduler, ClusterScheduler, MesosScheduler, YarnClusterScheduler

先分析ClusterScheduler, 即standalone的Spark集群上, 因为比较单纯不涉及其他的系统, 看看Spark的任务是如何被执行的

 

  private var taskScheduler: TaskScheduler = {
      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new ClusterScheduler(this)  // 创建ClusterScheduler
        val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName) // 创建SparkDeploySchedulerBackend
        scheduler.initialize(backend)
        scheduler
  } 


TaskScheduler接口, 注释写的非常清楚

/**
 * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
 * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
 * and are responsible for sending the tasks to the cluster, running them, retrying if there
 * are failures, and mitigating stragglers. They return events to the DAGScheduler through
 * the TaskSchedulerListener interface.
 */
private[spark] trait TaskScheduler {
  def rootPool: Pool
  def schedulingMode: SchedulingMode
  def start(): Unit // 启动
  def postStartHook() { }
  def stop(): Unit
  // Submit a sequence of tasks to run.
  def submitTasks(taskSet: TaskSet): Unit // 核心, 提交taskset的接口
  // Set a listener for upcalls. This is guaranteed to be set before submitTasks is called.
  def setListener(listener: TaskSchedulerListener): Unit // TaskScheduler会使用这个listener来汇报当前task的运行状况,会注册DAGScheduler
  // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
  def defaultParallelism(): Int
}

 

ClusterScheduler

对于集群的TaskScheduler实现, 相对于LocalScheduler
主要就是创建和管理schedulable tree, 参考Spark源码分析 – SchedulableBuilder
当然最终和cluster的executor通信还是需要依赖SparkDeploySchedulerBackend, 参考Spark源码分析 – SchedulerBackend

 

对于submitTasks,
首先将tasksetmanager放入schedulable tree等待schedule (delay schedule, 不一定会马上被调度到)
然后给SchedulerBackend发送reviveOffers event, 请求分配资源并launch tasks (launch的并一定是刚提交的tasks)
SchedulerBackend会向cluster申请workOffers(对于standalonebackend, 这步省略了), 然后再调用ClusterScheduler.resourceOffers来根据可用的workOffers分配tasks
最终给executors发送LaunchTask, 启动tasks

 

resourceOffers是核心函数, 当得到可用的workerOffer后, 用于从schedulable tree中schedule合适的被执行的tasks
resourceOffers的逻辑有点小复杂
1. 首先依次遍历sortedTaskSets, 并对于每个Taskset, 遍历TaskLocality
2. 越local越优先, 找不到(launchedTask为false)才会到下个locality级别
3. 在多次遍历offer list, 因为一次taskSet.resourceOffer只会占用一个core, 而不是一次用光所有的core, 这样有助于一个taskset中的task比较均匀的分布在workers上
4. 只有在该taskset, 该locality下, 对所有worker offer都找不到合适的task时, 才跳到下个locality级别

 

private[spark] class ClusterScheduler(val sc: SparkContext) extends TaskScheduler with Logging
{
  var listener: TaskSchedulerListener = null
  var backend: SchedulerBackend = null
  val mapOutputTracker = SparkEnv.get.mapOutputTracker
  var schedulableBuilder: SchedulableBuilder = null
  var rootPool: Pool = null
  // default scheduler is FIFO
  val schedulingMode: SchedulingMode = SchedulingMode.withName(
    System.getProperty("spark.scheduler.mode", "FIFO"))
  def initialize(context: SchedulerBackend) {
    backend = context    // 初始化SchedulerBackend 
    // temporarily set rootPool name to empty
    rootPool = new Pool("", schedulingMode, 0, 0) // 创建Schedulable tree的root pool
    schedulableBuilder = { // 用schedulableBuilder初始化Schedulable tree
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool)
      }
    }
    schedulableBuilder.buildPools()
  }

  override def start() {
    backend.start() // 启动SchedulerBackend 
  }

  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = new ClusterTaskSetManager(this, taskSet)
      activeTaskSets(taskSet.id) = manager
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) // 将TaskSetManager加到Schedulable tree等待被调度执行
      taskSetTaskIds(taskSet.id) = new util.HashSet[Long]()
      backend.reviveOffers() // 调用SchedulerBackend的reviveOffers, 其实就是往DriverActor发送reviveOffers事件
  }
 
  /**
   * Called by cluster manager to offer resources on slaves. We respond by asking our active task
   * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
   * that tasks are balanced across the cluster.
   */
  // 根据当前可用的worker offers, 分配tasks
  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    SparkEnv.set(sc.env)

    // Build a list of tasks to assign to each worker
    val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores)) // 每个core可以分配一个task,所以对每个offer生成length为cores数目的ArrayBuffer
    val availableCpus = offers.map(o => o.cores).toArray  // 每个work可用的core数目的array
    val sortedTaskSets = rootPool.getSortedTaskSetQueue() // 得到根据schedule算法排序后的TaskSetManager列表
    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) { // 嵌套, 遍历sortedTaskSets, 并对每个taskSet遍历所有TaskLocality
      do {
        launchedTask = false
        for (i <- 0 until offers.size) { // 遍历每个offer, 试图在当前的taskset和当前的locality上找到合适的task
          val execId = offers(i).executorId
          val host = offers(i).host
          for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) { // 每次只会返回最多一个task
            tasks(i) += task
            val tid = task.taskId
            taskIdToTaskSetId(tid) = taskSet.taskSet.id
            taskSetTaskIds(taskSet.taskSet.id) += tid
            taskIdToExecutorId(tid) = execId
            activeExecutorIds += execId
            executorsByHost(host) += execId
            availableCpus(i) –= 1 // 分配一个task, 所以availableCpus - 1
            launchedTask = true
          }
        }
      } while (launchedTask) // 找到,就继续在这个locality上找task, 否则放宽到下个locality,或下个taskset
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }
}

转载于:https://www.cnblogs.com/fxjwind/p/3504159.html

相关文章:

  • BZOJ 1974 [Sdoi2010] auction 代码拍卖会(数位dp)
  • Java中单例设计模式之最佳实践举例
  • Redkale 入门教程 01 -- Hello Word!
  • iOS sqlite 使用事务操作数据库
  • 【队列】【P2827】【NOIP2016D2T3】蚯蚓
  • java中Xml、json之间的相互转换
  • 新概念书店无非内容电商线下变体,西西弗终难逃被资本吞并命运?
  • android应用activity中调出输入法后界面调整问题的解决
  • watch深度监测
  • PHP-学习大规模高并发Web系统架构及开发推荐书籍
  • [caffe(二)]Python加载训练caffe模型并进行测试1
  • 【转】ini载入保存类,操作INI配置文件方便的很
  • PostgreSQL 连接的问题
  • 珍爱之礼 美妙感受
  • Python Flask-Mail环境变量配置
  • 《Java8实战》-第四章读书笔记(引入流Stream)
  • canvas实际项目操作,包含:线条,圆形,扇形,图片绘制,图片圆角遮罩,矩形,弧形文字...
  • C语言笔记(第一章:C语言编程)
  • flutter的key在widget list的作用以及必要性
  • Nodejs和JavaWeb协助开发
  • Python 反序列化安全问题(二)
  • React-生命周期杂记
  • Ruby 2.x 源代码分析:扩展 概述
  • socket.io+express实现聊天室的思考(三)
  • ucore操作系统实验笔记 - 重新理解中断
  • 初探 Vue 生命周期和钩子函数
  • 面试题:给你个id,去拿到name,多叉树遍历
  • 扑朔迷离的属性和特性【彻底弄清】
  • 使用agvtool更改app version/build
  • 腾讯优测优分享 | Android碎片化问题小结——关于闪光灯的那些事儿
  • 找一份好的前端工作,起点很重要
  • AI又要和人类“对打”,Deepmind宣布《星战Ⅱ》即将开始 ...
  • ​直流电和交流电有什么区别为什么这个时候又要变成直流电呢?交流转换到直流(整流器)直流变交流(逆变器)​
  • # C++之functional库用法整理
  • $ git push -u origin master 推送到远程库出错
  • ${ }的特别功能
  • (1/2) 为了理解 UWP 的启动流程,我从零开始创建了一个 UWP 程序
  • (DenseNet)Densely Connected Convolutional Networks--Gao Huang
  • (第27天)Oracle 数据泵转换分区表
  • (翻译)Entity Framework技巧系列之七 - Tip 26 – 28
  • (附源码)springboot建达集团公司平台 毕业设计 141538
  • (三) prometheus + grafana + alertmanager 配置Redis监控
  • (三)elasticsearch 源码之启动流程分析
  • (四)库存超卖案例实战——优化redis分布式锁
  • (转载)CentOS查看系统信息|CentOS查看命令
  • ***详解账号泄露:全球约1亿用户已泄露
  • .describe() python_Python-Win32com-Excel
  • .h头文件 .lib动态链接库文件 .dll 动态链接库
  • .md即markdown文件的基本常用编写语法
  • .NET C# 使用 SetWindowsHookEx 监听鼠标或键盘消息以及此方法的坑
  • .NET/C# 使用反射注册事件
  • .NET/C# 项目如何优雅地设置条件编译符号?
  • .net实现客户区延伸至至非客户区
  • .net之微信企业号开发(一) 所使用的环境与工具以及准备工作
  • .sdf和.msp文件读取