当前位置: 首页 > news >正文

Spark 中 任务集 TaskSet 详解

        在 Apache Spark 中,TaskSet 是任务调度系统的核心对象之一。它代表一组可以并行执行的任务,并通过 TaskScheduler 负责将这些任务分配到不同的执行器(Executor)上执行。每个 TaskSet 通常对应于一个 Stage 的所有任务。

为了全面了解 TaskSet 对象及其工作原理,我们需要从 Spark 的底层架构、任务调度流程以及源代码的角度进行深入探讨。本文将详细解析 TaskSet 对象的构造、调度机制、与其他组件的交互关系,并解释其在 Spark 任务执行中的关键作用。

1. TaskSet 的核心概念

        TaskSet 是 Spark 调度系统中的一个基本单位,它封装了一组可以并行执行的任务(Task),并由 TaskScheduler 调度执行。这些任务一般对应于某个 Stage 的所有分区数据的计算。

        每当 Spark 将一个 Stage 提交执行时,会为该 Stage 生成一个 TaskSet 对象。TaskSet 里的每个 Task 对应于一个分区的计算。因此,如果某个 RDD 有 100 个分区,那么对应的 TaskSet 将包含 100 个 Task,每个 Task 负责处理一个分区的数据。

2. TaskSet 的类定义

        在 Spark 源代码中,TaskSet 类主要定义在 TaskSet.scala 文件中。其主要作用是将一组任务(Task)封装成一个集合,并向调度器提供这些任务以进行调度。以下是 TaskSet 类的简化结构:

// TaskSet.scala
private[spark] class TaskSet(val tasks: Array[Task[_]],val stageId: Int,val stageAttemptId: Int,val priority: Int,val properties: Properties)

TaskSet 的构造函数参数:
  • tasks: 任务数组,包含 Task 对象,每个 Task 对应于一个分区的计算。
  • stageId: 当前任务集所对应的 Stage 的 ID。
  • stageAttemptId: 当前 Stage 的尝试次数(用于处理失败重试)。
  • priority: 任务的优先级,优先级较高的任务将优先调度执行。
  • properties: 任务集的配置信息。

3. TaskSetManager:任务集的管理器

        TaskSet 并不是直接调度的,而是通过 TaskSetManager 进行管理和调度。TaskSetManager 是 TaskSet 的管理类,负责对 TaskSet 中的任务进行调度、重试、监控和容错处理。

        每次当 TaskSet 提交给 TaskScheduler 时,都会创建一个对应的 TaskSetManager 实例。TaskSetManager 负责在执行期间跟踪 Task 的状态、处理任务失败并执行重试逻辑。每个 TaskSet 可能会经历多次尝试,因此 TaskSetManager 也用于处理任务重试(stage attempt)的相关逻辑。

// TaskSetManager.scala
private[scheduler] class TaskSetManager(sched: TaskSchedulerImpl,val taskSet: TaskSet,val maxTaskFailures: Int) extends Schedulable {// 当前 TaskSet 的任务状态private[scheduler] val taskInfos = Array.fill[TaskInfo](numTasks)(null)def resourceOffer(execId: String, host: String, maxLocality: TaskLocality): Option[TaskDescription] = {// 寻找一个适合执行的任务val task = findTaskToRun(execId, host, maxLocality)if (task.isDefined) {Some(new TaskDescription(execId, task.get))} else {None}}
}

TaskSetManager 的主要职责:
  • 任务调度:当集群中的某个节点可用时,TaskSetManager 负责为该节点选择合适的任务,并将任务交给执行器执行。
  • 任务状态管理TaskSetManager 会跟踪每个任务的状态,包括运行状态、完成状态、失败次数等。
  • 本地性调度TaskSetManager 实现了任务的本地性调度(data locality),尽可能将任务调度到数据所在的节点上执行,以提高性能。
  • 任务重试:如果某个任务执行失败,TaskSetManager 会负责对该任务进行重试,直到超过最大失败次数为止。

4. TaskScheduler 和 TaskSet 的交互

        TaskSet 是通过 TaskScheduler 调度的。在 Spark 的调度架构中,TaskScheduler 负责接收来自 DAGScheduler 的 TaskSet,并将其调度到集群中的执行器上执行。TaskScheduler 会根据集群资源的状态,选择合适的任务进行调度。

TaskScheduler 的主要职责是:

  1. 提交任务集:当 DAGScheduler 将 Stage 分解为 TaskSet 后,会将该 TaskSet 提交给 TaskScheduler
  2. 调度任务TaskScheduler 会调用 TaskSetManager 的方法,根据集群资源的可用情况调度任务到执行器。
  3. 监控任务执行TaskScheduler 负责监控每个任务的执行情况,并处理失败的任务。
// TaskSchedulerImpl.scala
class TaskSchedulerImpl(sc: SparkContext,...) extends TaskScheduler {override def submitTasks(taskSet: TaskSet): Unit = {val manager = new TaskSetManager(this, taskSet, maxTaskFailures)taskSetsByStageIdAndAttempt(taskSet.stageId)(taskSet.stageAttemptId) = managerbackend.reviveOffers()}def resourceOffer(execId: String, host: String, maxLocality: TaskLocality): Option[TaskDescription] = {taskSetsByStageIdAndAttempt.foreach { case (_, managers) =>managers.foreach { manager =>val taskOption = manager.resourceOffer(execId, host, maxLocality)if (taskOption.isDefined) {return taskOption}}}None}
}

任务提交流程:
  • DAGScheduler 生成 TaskSet,并通过 submitTasks 提交给 TaskSchedulerImpl
  • TaskSchedulerImpl 创建对应的 TaskSetManager,并启动任务调度过程。
  • 每当集群资源(执行器)可用时,TaskSchedulerImpl 会调用 TaskSetManager 的 resourceOffer 方法,将任务分配到可用的执行器上。

5. 本地性调度 (Task Locality)

        Spark 在调度任务时,会尽量遵循数据本地性(Data Locality)原则。所谓数据本地性,就是将任务调度到存储其输入数据的节点上执行,以减少数据传输的开销。

TaskSetManager 的 resourceOffer 方法会根据任务的本地性需求,选择合适的任务调度到相应的执行器上。Spark 定义了几种不同的任务本地性级别(TaskLocality):

  • PROCESS_LOCAL:任务可以在同一进程中运行,最佳本地性。
  • NODE_LOCAL:任务可以在同一节点上运行,但可能需要在本地磁盘之间传输数据。
  • RACK_LOCAL:任务可以在同一机架中的节点上运行。
  • ANY:任务可以在集群中的任何节点上运行,这是最低的本地性级别。

        当 TaskScheduler 接收到资源调度请求时,它会根据这些本地性级别尝试分配任务。如果没有合适的任务,系统会在经过一段时间后放宽本地性要求,允许任务在其他节点上执行。

// TaskLocality.scala
object TaskLocality extends Enumeration {val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value
}

        在 TaskSetManager 的 resourceOffer 中,会根据可用的执行器位置和任务的本地性要求,决定是否分配该任务到该执行器上。

6. 任务失败与重试机制

        任务失败在分布式系统中是常见的情况。Spark 通过 TaskSetManager 实现了任务的重试机制。当某个任务执行失败时,TaskSetManager 会负责对该任务进行重试,直到任务成功或超过最大失败次数为止。

  • 最大失败次数:每个 TaskSet 都有一个最大失败次数(maxTaskFailures),默认为 4 次。如果某个任务失败次数超过这个阈值,则整个 Stage 失败。
  • 失败重试:如果任务失败,TaskSetManager 会选择其他节点进行重试。失败可能是由于节点故障、内存不足等原因。
// TaskSetManager.scala
private[scheduler] class TaskSetManager(sched: TaskSchedulerImpl,val taskSet: TaskSet,val maxTaskFailures: Int) {def handleFailedTask(tid: Long, state: TaskState, reason: TaskFailedReason): Unit = {taskInfos(tid).failedAttempts += 1if (taskInfos(tid).failedAttempts >= maxTaskFailures) {abort(s"Task $tid failed ${taskInfos(tid).failedAttempts} times")} else {// 重试任务addPendingTask(tid)}}
}

        在重试时,TaskSetManager 会尝试将任务分配到不同的节点,以避免再次失败。

7. TaskSet 的执行流程总结

整个 TaskSet 的执行流程可以总结如下:

  1. 用户提交的 Action 操作触发 Spark 的 DAGScheduler,并将计算分解为多个 Stage
  2. 每个 Stage 会生成一个 TaskSet 对象,包含所有任务(分区)并提交给 TaskScheduler
  3. TaskScheduler 创建 TaskSetManager 并负责调度 TaskSet 中的任务。
  4. TaskSetManager 根据可用的资源和数据本地性,将任务分配给合适的执行器。
  5. 执行器执行任务并返回结果。如果任务失败,TaskSetManager 会处理重试逻辑。
  6. 当所有任务成功执行后,TaskSet 执行完成,Stage 也标记为完成。

8. 结论

        TaskSet 是 Apache Spark 中任务调度的重要组成部分,代表了可以并行执行的一组任务。TaskSetManager 负责管理这些任务的执行、调度和重试机制。TaskSet 与 TaskScheduler 紧密协作,通过数据本地性优化、任务失败重试等机制确保高效的任务调度和执行。

        从底层原理和源代码的角度来看,TaskSet 是 Spark 提高并行计算效率、容错性和任务调度性能的重要机制之一。

相关文章:

  • 青动CRM V3.2.1
  • 第十四届蓝桥杯真题Python c组F.棋盘(持续更新)
  • Linux 配置与管理 SWAP(虚拟内存)
  • 9月28日
  • 【Fastapi】参数获取,json和query
  • STM32 RTC实时时钟学习总结
  • React Native 在 build iOS 的时候如果出现关于 `metro` 的错误
  • 【音视频】ffmpeg其他常用过滤器filter实现(6-4)
  • uniapp踩坑 tabbar页面数据刷新了但视图没有更新
  • 基于python的爱心代码游戏实现 面试最常见问题(源码+内容介绍)
  • Git Stash: 管理临时更改的利器
  • 使用jdframe进行数据处理
  • 【C++笔记】初始模版和STL简介
  • 2024年云南省职业院校技能大赛-云计算应用
  • 隧道多并发IP代理:解锁网络新体验
  • [译]如何构建服务器端web组件,为何要构建?
  • classpath对获取配置文件的影响
  • JS笔记四:作用域、变量(函数)提升
  • mongo索引构建
  • nginx(二):进阶配置介绍--rewrite用法,压缩,https虚拟主机等
  • Sequelize 中文文档 v4 - Getting started - 入门
  • 关于Java中分层中遇到的一些问题
  • 前嗅ForeSpider采集配置界面介绍
  • 通过几道题目学习二叉搜索树
  • scrapy中间件源码分析及常用中间件大全
  • ​LeetCode解法汇总2583. 二叉树中的第 K 大层和
  • ​草莓熊python turtle绘图代码(玫瑰花版)附源代码
  • (09)Hive——CTE 公共表达式
  • (Demo分享)利用原生JavaScript-随机数-实现做一个烟花案例
  • (zt)基于Facebook和Flash平台的应用架构解析
  • (翻译)Quartz官方教程——第一课:Quartz入门
  • (附源码)计算机毕业设计SSM智慧停车系统
  • (每日持续更新)jdk api之StringBufferInputStream基础、应用、实战
  • (四)activit5.23.0修复跟踪高亮显示BUG
  • (贪心 + 双指针) LeetCode 455. 分发饼干
  • (原)Matlab的svmtrain和svmclassify
  • (最全解法)输入一个整数,输出该数二进制表示中1的个数。
  • .net 逐行读取大文本文件_如何使用 Java 灵活读取 Excel 内容 ?
  • .NET/C# 利用 Walterlv.WeakEvents 高性能地定义和使用弱事件
  • .NET/C#⾯试题汇总系列:⾯向对象
  • .NET程序员迈向卓越的必由之路
  • .NET建议使用的大小写命名原则
  • [ C++ ] template 模板进阶 (特化,分离编译)
  • [100天算法】-不同路径 III(day 73)
  • [2010-8-30]
  • [Flink]三、Flink1.13
  • [go] 策略模式
  • [IE编程] WebBrowser控件的多页面浏览(Tabbed Browsing)开发接口
  • [Java]深入剖析常见排序
  • [LaTex]arXiv投稿攻略——jpg/png转pdf
  • [mmucache]-ARMV8-aarch64的虚拟内存(mmutlbcache)介绍-概念扫盲
  • [mvc] 简单的forms认证
  • [NISACTF 2022]easyssrf
  • [POJ2728] Desert King
  • [python 刷题] 2866 Beautiful Towers II