当前位置: 首页 > news >正文

Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九) ...

Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)

配置

  • The default number of partitions to use when shuffling data for joins or aggregations.
spark.sql.shuffle.partitions=200

QueryExecution.executedPlan

  • 调用 QueryExecution.prepareForExecution
 // executedPlan should not be used to initialize any SparkPlan. It should be
  // only used for execution.
  lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

QueryExecution.prepareForExecution

  • 调用函数 QueryExecution.preparations
  /**
   * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
   * row format conversions as needed.
   */
  protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
    preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
  }

QueryExecution.preparations

  • 调用EnsureRequirements.apply
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
    PlanSubqueries(sparkSession),
    EnsureRequirements(sparkSession.sessionState.conf),
    CollapseCodegenStages(sparkSession.sessionState.conf),
    ReuseExchange(sparkSession.sessionState.conf),
    ReuseSubquery(sparkSession.sessionState.conf))

EnsureRequirements.apply

  • 调用 EnsureRequirements.ensureDistributionAndOrdering
  def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
    // TODO: remove this after we create a physical operator for `RepartitionByExpression`.
    case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
      child.outputPartitioning match {
        case lower: HashPartitioning if upper.semanticEquals(lower) => child
        case _ => operator
      }
    case operator: SparkPlan =>
      ensureDistributionAndOrdering(reorderJoinPredicates(operator))
  }

EnsureRequirements.ensureDistributionAndOrdering

  • SQLConf,默认sql.shuffle的并行度为200,可以通过spark.sql.shuffle.partitions来进行配置
def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
  val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
    .doc("The default number of partitions to use when shuffling data for joins or aggregations.")
    .intConf
    .createWithDefault(200)
  • defaultPartitioning
private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions
  • val numPartitions = distribution.requiredNumPartitions

          .getOrElse(defaultNumPreShufflePartitions) 得到numPartitions
  • 调用ShuffleExchangeExec(defaultPartitioning, c)
private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
    val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
    val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
    var children: Seq[SparkPlan] = operator.children
    assert(requiredChildDistributions.length == children.length)
    assert(requiredChildOrderings.length == children.length)

    // Ensure that the operator's children satisfy their output distribution requirements.
    children = children.zip(requiredChildDistributions).map {
      case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>
        child
      case (child, BroadcastDistribution(mode)) =>
        BroadcastExchangeExec(mode, child)
      case (child, distribution) =>
        val numPartitions = distribution.requiredNumPartitions
          .getOrElse(defaultNumPreShufflePartitions)
        ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
    }

    // Get the indexes of children which have specified distribution requirements and need to have
    // same number of partitions.
    val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
      case (UnspecifiedDistribution, _) => false
      case (_: BroadcastDistribution, _) => false
      case _ => true
    }.map(_._2)

    val childrenNumPartitions =
      childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet

    if (childrenNumPartitions.size > 1) {
      // Get the number of partitions which is explicitly required by the distributions.
      val requiredNumPartitions = {
        val numPartitionsSet = childrenIndexes.flatMap {
          index => requiredChildDistributions(index).requiredNumPartitions
        }.toSet
        assert(numPartitionsSet.size <= 1,
          s"$operator have incompatible requirements of the number of partitions for its children")
        numPartitionsSet.headOption
      }

      val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max)

      children = children.zip(requiredChildDistributions).zipWithIndex.map {
        case ((child, distribution), index) if childrenIndexes.contains(index) =>
          if (child.outputPartitioning.numPartitions == targetNumPartitions) {
            child
          } else {
            val defaultPartitioning = distribution.createPartitioning(targetNumPartitions)
            child match {
              // If child is an exchange, we replace it with a new one having defaultPartitioning.
              case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c)
              case _ => ShuffleExchangeExec(defaultPartitioning, child)
            }
          }

        case ((child, _), _) => child
      }
    }

    // Now, we need to add ExchangeCoordinator if necessary.
    // Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges.
    // However, with the way that we plan the query, we do not have a place where we have a
    // global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator
    // at here for now.
    // Once we finish https://issues.apache.org/jira/browse/SPARK-10665,
    // we can first add Exchanges and then add coordinator once we have a DAG of query fragments.
    children = withExchangeCoordinator(children, requiredChildDistributions)

    // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
    children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
      // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort.
      if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) {
        child
      } else {
        SortExec(requiredOrdering, global = false, child = child)
      }
    }

    operator.withNewChildren(children)
  }

end

相关文章:

  • CentOS6 Shell脚本/bin/bash^M: bad interpreter错误解决方法
  • 搭建gitbook 和 访问权限认证
  • 测试开发系类之接口自动化测试
  • Chrome 控制台报错Unchecked runtime.lastError: The message port closed before a response was received...
  • 读vue源码看前端百态2--打包工具
  • NoSQL是什么?
  • [ES-5.6.12] x-pack ssl
  • 20190220w
  • 怎么将电脑中的声音录制成WAV格式
  • 你的微博也被盗赞?试试HSTS强制HTTPS加密
  • Linux或UNIX系统配置检查
  • NutzWk 5.1.5 发布,Java 微服务分布式开发框架
  • 17-成员访问权限
  • 警报:线上事故之CountDownLatch的威力
  • Linux基金会施行安全关键系统打造共享工具、流程
  • python3.6+scrapy+mysql 爬虫实战
  • 【mysql】环境安装、服务启动、密码设置
  • Android组件 - 收藏集 - 掘金
  • angular组件开发
  • create-react-app项目添加less配置
  • Date型的使用
  • Java 23种设计模式 之单例模式 7种实现方式
  • Linux gpio口使用方法
  • python 学习笔记 - Queue Pipes,进程间通讯
  • session共享问题解决方案
  • vue从入门到进阶:计算属性computed与侦听器watch(三)
  • 基于组件的设计工作流与界面抽象
  • 解析带emoji和链接的聊天系统消息
  • 前端面试之闭包
  • 前嗅ForeSpider教程:创建模板
  • 悄悄地说一个bug
  • 听说你叫Java(二)–Servlet请求
  • 1.Ext JS 建立web开发工程
  • # .NET Framework中使用命名管道进行进程间通信
  • #使用清华镜像源 安装/更新 指定版本tensorflow
  • $.ajax,axios,fetch三种ajax请求的区别
  • (floyd+补集) poj 3275
  • (Java数据结构)ArrayList
  • (PWM呼吸灯)合泰开发板HT66F2390-----点灯大师
  • (pytorch进阶之路)CLIP模型 实现图像多模态检索任务
  • (附源码)ssm高校实验室 毕业设计 800008
  • (论文阅读30/100)Convolutional Pose Machines
  • (生成器)yield与(迭代器)generator
  • (十三)Maven插件解析运行机制
  • (算法)Game
  • (原創) 人會胖會瘦,都是自我要求的結果 (日記)
  • (转)程序员疫苗:代码注入
  • *++p:p先自+,然后*p,最终为3 ++*p:先*p,即arr[0]=1,然后再++,最终为2 *p++:值为arr[0],即1,该语句执行完毕后,p指向arr[1]
  • .net core 客户端缓存、服务器端响应缓存、服务器内存缓存
  • .NET 中使用 Mutex 进行跨越进程边界的同步
  • .net遍历html中全部的中文,ASP.NET中遍历页面的所有button控件
  • .NET学习教程二——.net基础定义+VS常用设置
  • .NET中两种OCR方式对比
  • .stream().map与.stream().flatMap的使用
  • .vollhavhelp-V-XXXXXXXX勒索病毒的最新威胁:如何恢复您的数据?