当前位置: 首页 > news >正文

Spark-SparkSubmit详细过程

一、概览

《Spark-环境启动》中讲了Spark环境的启动,以及Master和多个Worker之间时基于Endpoint之间的Netty通信,也被称为Spark的RpcEnv。在此基础上我们来看下spark-submit是如何将我们写的Spark程序调起的

二、启动脚本示例:

spark-submit \
--master spark://node1:7077 \
--deploy-mode cluster \
--driver-memory 100g \
--executor-memory 20g \
--num-executors 120 \
--executor-cores 3 \
--conf spark.shuffle.io.maxRetries=2 \
--conf spark.xx.xx.xx=xx \
--class com.xx.xxx  \
--files "/data/xxxx" \
/xxx/project/xxx/spark-1.1-SNAPSHOT.jar \
p1 p2 p3 …… pn \

三、spark-submit.sh

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

四、SparkSubmit

//启动Spark应用程序的主网关。
//此程序处理设置具有相关Spark依赖关系的类路径,并在Spark支持的不同集群管理器和部署模式上提供一个层。
private[spark] class SparkSubmit extends Logging {//Yarn模式下的下一个执行的主类private[deploy] val YARN_CLUSTER_SUBMIT_CLASS ="org.apache.spark.deploy.yarn.YarnClusterApplication"//STANDALONE 下的rest模式 或 Mesos 模式的下一个执行的主类private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()//STANDALONE 下的传统模式的下一个执行的主类  我们主要分析这一个private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()//K8s下的下一个执行的主类private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS ="org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"override def main(args: Array[String]): Unit = {val submit = new SparkSubmit() {...}submit.doSubmit(args)}def doSubmit(args: Array[String]): Unit = {//解析命令行参数val appArgs = parseArguments(args)appArgs.action match {case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)case SparkSubmitAction.KILL => kill(appArgs)case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)case SparkSubmitAction.PRINT_VERSION => printVersion()}}private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {def doRunMain(): Unit = {runMain(args, uninitLog)}doRunMain()}//使用submit参数运行子类的main方法。//这分为两个步骤。首先,我们通过设置适当的类路径、系统属性和应用程序参数来准备启动环境,以便根据集群管理器和部署模式运行子主类。//请注意,如果我们运行的是集群部署模式或python应用程序,那么这个主类将不是用户提供的。  (需要跳几次才真正运行自己的main方法)private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)mainClass = Utils.classForName(childMainClass)val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]} else {new JavaMainApplication(mainClass)}app.start(childArgs.toArray, sparkConf)}//得到一个 4-tuple://  1、子进程的参数//  2、进入子类的路径列表//  3、系统环境映射//  4、子进程的主类private[deploy] def prepareSubmitEnvironment(//设置集群管理val clusterManager: Int = args.master match {case "yarn" => YARN  //1case m if m.startsWith("spark") => STANDALONE  // 2 case m if m.startsWith("mesos") => MESOS    // 4case m if m.startsWith("k8s") => KUBERNETES  // 16case m if m.startsWith("local") => LOCAL    // 8case _ =>error("Master must either be yarn or start with spark, mesos, k8s, or local")-1}    //设置部署模式;默认为客户端模式//在客户端模式下,需要下载远程文件//在YARN中运行时,对于某些具有scheme的远程资源://  1.Hadoop FileSystem不支持它们。//  2.我们使用“spark.yarn.dist.forceDownloadSchemes”明确绕过Hadoop文件系统。//在添加到YARN的分布式缓存之前,我们将把它们下载到本地磁盘。//对于yarn客户端模式,由于我们已经用上面的代码下载了它们,所以我们只需要找出本地路径并替换远程路径。//如果我们正在运行一个python应用程序,请将main类设置为我们特定的python运行器//非PySpark应用程序可能需要Python依赖关系。//在R应用程序的YARN模式下,将SparkR包存档和包含所有构建的R库的R包存档添加到存档中,以便它们可以与作业一起分发......省略......//在客户端模式下,直接启动应用程序主类//此外,将主应用程序jar和任何添加的jar(如果有的话)添加到类路径中if (deployMode == CLIENT) {childMainClass = args.mainClassif (localPrimaryResource != null && isUserJar(localPrimaryResource)) {childClasspath += localPrimaryResource}if (localJars != null) { childClasspath ++= localJars.split(",") }}//在独立集群模式下,使用REST客户端提交应用程序(Spark 1.3+)。所有Spark参数都应通过系统属性传递给客户端。if (args.isStandaloneCluster) {if (args.useRest) {childMainClass = REST_CLUSTER_SUBMIT_CLASS} else {//在传统的独立集群模式下,使用客户端作为用户类的包装器childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS}}//在yarn模式下,使用yarn.Client作为用户类的包装器if (isYarnCluster) {childMainClass = YARN_CLUSTER_SUBMIT_CLASS}}}

五、ClientApp

private[spark] class ClientApp extends SparkApplication {override def start(args: Array[String], conf: SparkConf): Unit = {val driverArgs = new ClientArguments(args)//rpc通信的ask消息超时时间默认为 10sif (!conf.contains(RPC_ASK_TIMEOUT)) {conf.set(RPC_ASK_TIMEOUT, "10s")}Logger.getRootLogger.setLevel(driverArgs.logLevel)//构建RpcEnv通信环境val rpcEnv =RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))//获取masterEndpoints val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))//设置ClientEndpoint 默认会调它的 onStart()rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))rpcEnv.awaitTermination()}
}

六、ClientEndpoint

//将消息中继到驱动程序的代理。
//如果提交失败,我们目前不支持重试。在HA模式下,客户端将向所有主机提交请求,看看哪个主机可以处理它。
private class ClientEndpoint(override val rpcEnv: RpcEnv,driverArgs: ClientArguments,masterEndpoints: Seq[RpcEndpointRef],conf: SparkConf)extends ThreadSafeRpcEndpoint with Logging {override def onStart(): Unit = {driverArgs.cmd match {case "launch" =>//Diver的包装类val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"val command = new Command(mainClass,Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,sys.env, classPathEntries, libraryPathEntries, javaOpts)val driverResourceReqs = ResourceUtils.parseResourceRequirements(conf,config.SPARK_DRIVER_PREFIX)//构建DriverDescription 用于启动Driver//里面包含jar的下载路径、所需要的内存和核数、启动命令等等val driverDescription = new DriverDescription(driverArgs.jarUrl,driverArgs.memory,driverArgs.cores,driverArgs.supervise,command,driverResourceReqs)//向Master发送 RequestSubmitDriver 消息asyncSendToMasterAndForwardReply[SubmitDriverResponse](RequestSubmitDriver(driverDescription))case "kill" =>val driverId = driverArgs.driverIdsubmittedDriverID = driverIdasyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))}
}

七、Master处理Client发出的RequestSubmitDriver 

private[deploy] class Master(override val rpcEnv: RpcEnv,address: RpcAddress,webUiPort: Int,val securityMgr: SecurityManager,val conf: SparkConf)extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case RequestSubmitDriver(description) =>//如果Master不是ALIVE 直接返回 Client 失败的响应if (state != RecoveryState.ALIVE) {val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +"Can only accept driver submissions in ALIVE state."context.reply(SubmitDriverResponse(self, false, None, msg))} else {logInfo("Driver submitted " + description.command.mainClass)//创建一个Driverval driver = createDriver(description)//把新创建的Driver添加到Master对Derviers的维护中去persistenceEngine.addDriver(driver)waitingDrivers += driverdrivers.add(driver)//在等待的应用程序之间安排当前可用的资源。每当有新应用加入或资源可用性发生变化时,都会调用此方法。schedule()//回复客户端消息context.reply(SubmitDriverResponse(self, true, Some(driver.id),s"Driver successfully submitted as ${driver.id}"))}//创建driverprivate def createDriver(desc: DriverDescription): DriverInfo = {val now = System.currentTimeMillis()val date = new Date(now)new DriverInfo(now, newDriverId(date), desc, date)}private def schedule(): Unit = {if (state != RecoveryState.ALIVE) {return}//将状态为ALIVE的Workers打散,保证多个Driver分散启动val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))val numWorkersAlive = shuffledAliveWorkers.sizevar curPos = 0迭代所有需要启动的driverfor (driver <- waitingDrivers.toList) {var launched = falsevar isClusterIdle = truevar numWorkersVisited = 0while (numWorkersVisited < numWorkersAlive && !launched) {val worker = shuffledAliveWorkers(curPos)isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmptynumWorkersVisited += 1校验该worker上剩余的内存和内核是否满足启动driver的要求if (canLaunchDriver(worker, driver.desc)) {val allocated = worker.acquireResources(driver.desc.resourceReqs)driver.withResources(allocated)//启动DriverlaunchDriver(worker, driver)waitingDrivers -= driverlaunched = true}curPos = (curPos + 1) % numWorkersAlive}}startExecutorsOnWorkers()}private def launchDriver(worker: WorkerInfo, driver: DriverInfo): Unit = {logInfo("Launching driver " + driver.id + " on worker " + worker.id)worker.addDriver(driver)driver.worker = Some(worker)//向worker ENdpoint 发送 LaunchDriver 消息worker.endpoint.send(LaunchDriver(driver.id, driver.desc, driver.resources))//将driver的状态设置称RUNNINGdriver.state = DriverState.RUNNING}}

DriverInfo的信息如下:

private[deploy] class DriverInfo(val startTime: Long,val id: String,val desc: DriverDescription,val submitDate: Date)extends Serializable {//driver的初始化状态就是 SUBMITTED@transient var state: DriverState.Value = DriverState.SUBMITTED/* 如果我们在启动Driver时失败,异常将存储在此处 */@transient var exception: Option[Exception] = None/* 最近分配给 driver 的 worker */@transient var worker: Option[WorkerInfo] = None// 分配给此驱动程序的资源(例如:gpu/fpga)从资源名称映射到资源信息//CPU、GPU都属于冯·诺依曼结构,指令译码执行、共享内存。//FPGA是一种硬件可重构的体系结构中文名是现场可编程门阵列//FPGA 之所以比 CPU 甚至 GPU 能效高,本质上是无指令、无需共享内存的体系结构带来的福利//GPU不支持超流水技术 只能数据并行//FPGA支持超流水技术,可以支持指令并行private var _resources: Map[String, ResourceInformation] = Map.empty} 

八、Worker处理Master调度发起的LaunchDriver请求

private[deploy] class Worker(override val rpcEnv: RpcEnv,webUiPort: Int,cores: Int,memory: Int,masterRpcAddresses: Array[RpcAddress],endpointName: String,workDirPath: String = null,val conf: SparkConf,val securityMgr: SecurityManager,resourceFileOpt: Option[String] = None,externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)extends ThreadSafeRpcEndpoint with Logging {override def receive: PartialFunction[Any, Unit] = synchronized {case LaunchDriver(driverId, driverDesc, resources_) =>logInfo(s"Asked to launch driver $driverId")//管理一个driver的执行,包括在失败时自动重新启动driver。这目前仅在独立集群部署模式下使用。val driver = new DriverRunner(conf,driverId,workDir,sparkHome,driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),self,workerUri,workerWebUiUrl,securityMgr,resources_)//该worker中对不同driver的维护drivers(driverId) = driver//启动driverdriver.start()//重新计算该worker可用的资源coresUsed += driverDesc.coresmemoryUsed += driverDesc.memaddResourcesUsed(resources_)}
}

九、DriverWrapper启动

object DriverWrapper extends Logging {def main(args: Array[String]): Unit = {args.toList match {case workerUrl :: userJar :: mainClass :: extraArgs =>val conf = new SparkConf()val host: String = Utils.localHostName()val port: Int = sys.props.getOrElse(config.DRIVER_PORT.key, "0").toIntval rpcEnv = RpcEnv.create("Driver", host, port, conf, new SecurityManager(conf))logInfo(s"Driver address: ${rpcEnv.address}")//创建一个workerWatcher Endpoint 意为看着这个worker进程,如果丢了自己也就over了//连接到worker进程,如果连接中断,则终止JVM。在工作进程及其关联的子进程之间提供命运共享。 它没有onStart()rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl))//............//我们编写的Spark程序在这里就会通过反射被调起执行val clazz = Utils.classForName(mainClass)val mainMethod = clazz.getMethod("main", classOf[Array[String]])mainMethod.invoke(null, extraArgs.toArray[String])rpcEnv.shutdown()}}

十、总结

1、编写Spark程序

2、编写启动脚本使用spark-submit命令提交任务

3、调用org.apache.spark.deploy.SparkSubmit中main方法

4、判断提交参数中的模式,根据不同的模式设置不同的mainClass

如果采用Standalone模式那么主类为:ClientApp

5、执行ClientApp中的start方法

6、构建RpcEnv环境并设置ClientEndpoint

7、构建DriverDescription并向Master发起启动Driver的请求

8、Master会处理所有启动Driver的请求并随机挑一个资源充足的Worker用于启动该Driver

9、Worker上启动一个DriverWrapper进程

10、DriverWrapper中的main方法会调用我们自己写的Spark程序中的main方法

下图是整体的方法调用图,下载放大后就会很清晰哟

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 数据驱动的社交网络:分析Facebook的算法与用户体验
  • 归并排序、计数排序及排序大总结
  • 论文翻译:Benchmarking Large Language Models in Retrieval-Augmented Generation
  • Python中常见数据结构
  • Python酷库之旅-第三方库Pandas(093)
  • 【iOS】——响应者链和事件传递链
  • Redis7基础篇(七)
  • 【题解】【结构体排序】—— [NOIP2009 普及组] 分数线划定
  • JavaScript 手写仿freeze
  • HTML详解
  • Java面试题———MySql篇②
  • 【C++ 面试 - 面向对象】每日 3 题(七)
  • Linux驱动学习之点灯(六,利用平台设备总线)
  • Element-UI Table实现列表筛选数据及列表嵌套选择框
  • 从行为面试问题(behavioral questions)看中美程序员差异。
  • [笔记] php常见简单功能及函数
  • 【剑指offer】让抽象问题具体化
  • CSS选择器——伪元素选择器之处理父元素高度及外边距溢出
  • Linux后台研发超实用命令总结
  • MaxCompute访问TableStore(OTS) 数据
  • vue的全局变量和全局拦截请求器
  • vue总结
  • 笨办法学C 练习34:动态数组
  • 搭建gitbook 和 访问权限认证
  • 得到一个数组中任意X个元素的所有组合 即C(n,m)
  • 好的网址,关于.net 4.0 ,vs 2010
  • 浏览器缓存机制分析
  • -- 数据结构 顺序表 --Java
  • 数据可视化之 Sankey 桑基图的实现
  • 一起来学SpringBoot | 第三篇:SpringBoot日志配置
  • 在weex里面使用chart图表
  • 智能合约开发环境搭建及Hello World合约
  • 曜石科技宣布获得千万级天使轮投资,全方面布局电竞产业链 ...
  • # Apache SeaTunnel 究竟是什么?
  • ## 基础知识
  • #define
  • #进阶:轻量级ORM框架Dapper的使用教程与原理详解
  • #经典论文 异质山坡的物理模型 2 有效导水率
  • (3)选择元素——(14)接触DOM元素(Accessing DOM elements)
  • (31)对象的克隆
  • (C++二叉树05) 合并二叉树 二叉搜索树中的搜索 验证二叉搜索树
  • (ZT)薛涌:谈贫说富
  • (阿里巴巴 dubbo,有数据库,可执行 )dubbo zookeeper spring demo
  • (多级缓存)缓存同步
  • (二)延时任务篇——通过redis的key监听,实现延迟任务实战
  • (五)关系数据库标准语言SQL
  • (转)LINQ之路
  • (轉貼)《OOD启思录》:61条面向对象设计的经验原则 (OO)
  • *(长期更新)软考网络工程师学习笔记——Section 22 无线局域网
  • ..回顾17,展望18
  • .Mobi域名介绍
  • .Net MVC4 上传大文件,并保存表单
  • .NetCore 如何动态路由
  • .net解析传过来的xml_DOM4J解析XML文件
  • .NET开发人员必知的八个网站