当前位置: 首页 > news >正文

Spark-ShuffleWriter

一、上下文

《Spark-Task启动流程》最后讲到要从SparkEnv 获取 ShuffleManager,再从ShuffleManager获取ShuffleWriter(UnsafeShuffleWriter、BypassMergeSortShuffleWriter、SortShuffleWriter)将结果写入磁盘。下面我们就详细分析下ShuffleWriter是如何选择和写入磁盘的。

二、整体写入流程

ShuffleWriteProcessor

自定义shuffle写入过程的界面。driver 创建一个ShuffleWriteProcessor并将其放入[[ShuffleDependency]]中,executor 在每个ShuffleMapTask中使用它。

private[spark] class ShuffleWriteProcessor extends Serializable with Logging {def write(rdd: RDD[_],dep: ShuffleDependency[_, _, _],mapId: Long,context: TaskContext,partition: Partition): MapStatus = {//1、从SparkEnv获取shuffleManagerval manager = SparkEnv.get.shuffleManager//2、从shuffleManager获取ShuffleWritervar writer: ShuffleWriter[Any, Any] = manager.getWriter[Any, Any](dep.shuffleHandle,mapId,context,createMetricsReporter(context))//3、调用ShuffleWriter将结果写入磁盘writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])//4、获取写入状态val mapStatus = writer.stop(success = true)//5、如果写入成功且开启push-based shuffle 就尽可能的将结果推送到目标节点供下一个Stage使用if (mapStatus.isDefined) {if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {//....详见《Spark push-based shuffle》博客}}}}

三、获取ShuffleHandle

ShuffleDependency决定了ShuffleHandle,ShuffleHandle决定了ShuffleWriter

private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {override def registerShuffle[K, V, C](shuffleId: Int,dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {//1、下游分区数 <= spark.shuffle.sort.bypassMergeThreshold(默认值200)的分区//2、不需要map侧聚合,//满足 1和2 返回这个ShuffleHandle //这种方式会直接溢写numPartitions文件,并在最后将它们连接起来。//这避免了两次进行序列化和反序列化以将溢出的文件合并在一起,(避免合并排序)//缺点是一次打开多个文件,因此分配给缓冲区的内存更多。new BypassMergeSortShuffleHandle[K, V](shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {//1、serializer支持Relocation//2、map侧没有聚合//3、下游分区数 <= 2 的 24次方 (16777215) //满足这3个条件,返回 SerializedShuffleHandlenew SerializedShuffleHandle[K, V](shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else {// 如果以上两个 ShuffleHandle所要求的条件都不满足,走这个,想想也知道它的效率最低new BaseShuffleHandle(shuffleId, dependency)}}}

四、获取ShuffleWriter

1、ShuffleManager

详细可以看《Spark-ShuffleManager》博客

  def getWriter[K, V](handle: ShuffleHandle,mapId: Long,context: TaskContext,metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]

2、SortShuffleManager

private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {//从shuffle_id到mapTask_id的映射,为这些shuffle生成输出private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, OpenHashSet[Long]]()override def getWriter[K, V](handle: ShuffleHandle,mapId: Long,context: TaskContext,metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {//每个shuffleId 对应的mapTask 数量初始值为16//什么是OpenHashSet?//一个简单、快速的HashSe,针对仅插入非空值的用例进行了优化,其中键永远不会被删除。//底层实现使用Scala编译器的专门化为四种基本类型(Long、Int、Double和Float)生成优化的存储。它比Java的标准HashSet快得多,同时产生的内存开销要小得多。这可以作为更高级数据结构(如优化的HashMap)的构建块。//这个OpenHashSet旨在作为更高层数据结构的构建块,如优化的HashMap。与标准HashSet实现相比,此类提供了各种回调接口(例如allocateFunc、moveFunc)和检索底层数组中键位置的接口。  //它使用二次探测,HashTable大小为2的幂,保证能探索每个键的所有空间  val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(handle.shuffleId, _ => new OpenHashSet[Long](16))mapTaskIds.synchronized { mapTaskIds.add(mapId) }val env = SparkEnv.get//根据ShuffleHandle获取对应的ShuffleWriterhandle match {case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>new UnsafeShuffleWriter(env.blockManager,context.taskMemoryManager(),unsafeShuffleHandle,mapId,context,env.conf,metrics,shuffleExecutorComponents)case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>new BypassMergeSortShuffleWriter(env.blockManager,bypassMergeSortHandle,mapId,env.conf,metrics,shuffleExecutorComponents)case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)}}}

五、使用ShuffleWriter写入数据

我们先粗略看下每个ShuffleWriter是如何将数据写入磁盘的,后面我们对每个ShuffleWriter单独详细分析

1、SortShuffleWriter

private[spark] class SortShuffleWriter[K, V, C](handle: BaseShuffleHandle[K, V, C],mapId: Long,context: TaskContext,shuffleExecutorComponents: ShuffleExecutorComponents)extends ShuffleWriter[K, V] with Logging {//将一堆记录写入此任务的输出override def write(records: Iterator[Product2[K, V]]): Unit = {//根据map端是否有聚合 构建排序器sorter = if (dep.mapSideCombine) {new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)} else {// 在这种情况下,我们既不向排序器传递聚合器也不传递排序器,//因为我们不关心 key 是否在每个分区中排序;//如果正在运行的操作是sortByKey,则将在reduce侧完成。new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)}//将数据添加到ExternalSorter中sorter.insertAll(records)//不要费心在shuffle写入时间中包括打开合并输出文件的时间,因为它只打开一个文件,所以通常太快而无法准确测量//每个map任务调用一次,以创建一个写入器,该写入器将负责持久化该map任务写入的所有分区字节。val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(dep.shuffleId, mapId, dep.partitioner.numPartitions)//将添加到此ExternalSorter中的所有数据写入mapOutputWriter ,mapOutputWriter 将字节数据推送到某个任意的后备存储。由SortShuffleWriter调用。sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)//返回两部分信息://1、一个long数组,对于从(0)到(numPartitions-1)的每个分区,它应该包含分区写入器为该分区id写入的字节数。//2、可供shuffleReader使用的可选元数据blob。//确保此模块的分区写入程序进行的写入可用于下游的reduce任务//关心随机数据损坏原因的随机扩展应该正确存储校验和。当发生损坏时,Spark会向shuffle扩展提供所获取分区的校验和,以帮助诊断损坏的原因。partitionLengths = mapOutputWriter.commitAllPartitions(sorter.getChecksums).getPartitionLengths//ShuffleMapTask向scheduler返回的结果。包括任务存储shuffle文件的block manager地址,以及每个reducer的输出大小,以便传递给reduce任务。mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)}}

2、BypassMergeSortShuffleWriter

它实现了基于排序的shuffle的 hash-style shuffle回退路径。此写入路径将传入记录写入单独的文件,每个reduce分区一个文件,然后将这些每个分区的文件连接起来,形成一个输出文件,其区域供reducer使用。记录不在内存中缓冲。它以一种可以通过IndexShuffleBlockResolver提供/使用的格式写入磁盘。

此写入路径对于具有大量reduce分区的混洗来说效率低下,因为它同时为所有分区打开单独的序列化器和文件流。因此仅在满足map端没有聚合且下游分区数 <= spark.shuffle.sort.bypassMergeThreshold (默认200)时使用

这段代码曾经是OexternalSorter的一部分,但为了降低代码复杂性被重构

final class BypassMergeSortShuffleWriter<K, V>extends ShuffleWriter<K, V>implements ShuffleChecksumSupport {public void write(Iterator<Product2<K, V>> records) throws IOException {assert (partitionWriters == null);//每个map任务调用一次,以创建一个mapOutputWriter ,它将负责持久化该map任务写入的所有分区字节。ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(shuffleId, mapId, numPartitions);try {if (!records.hasNext()) {//如果迭代器中的数据都处理完毕 统计分区数量,返回 MapStatus//返回两部分信息://1、一个long数组,对于从(0)到(numPartitions-1)的每个分区,它应该包含分区写入器为该分区id写入的字节数。//2、可供shuffleReader使用的可选元数据blob。//确保此模块的分区写入程序进行的写入可用于下游的reduce任务//关心随机数据损坏原因的随机扩展应该正确存储校验和。当发生损坏时,Spark会向shuffle扩展提供所获取分区的校验和,以帮助诊断损坏的原因。partitionLengths = mapOutputWriter.commitAllPartitions(ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE).getPartitionLengths();//ShuffleMapTask向scheduler返回的结果。包括任务存储shuffle文件的block manager地址,以及每个reducer的输出大小,以便传递给reduce任务。mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);return;}//从同一序列化程序实例创建多个序列化/反序列化流是合法的,只要这些流都在同一线程中使用。final SerializerInstance serInstance = serializer.newInstance();final long openStartTime = System.nanoTime();//分区溢写器数组//一个用于将JVM对象直接写入磁盘文件的类。此类允许将数据附加到现有块。为了提高效率,它在多个提交中保留了底层文件通道。此通道保持打开状态,直到调用close()。如果发生故障,调用者应使用revertPartialWritesAndClose()关闭,以原子方式还原未提交的部分写入。//此类不支持并发写入。此外,一旦 writer 被打开,就不能再重新打开。partitionWriters = new DiskBlockObjectWriter[numPartitions];//根据偏移量和长度引用文件的特定段(可能是整个文件)。partitionWriterSegments = new FileSegment[numPartitions];//循环每一个分区for (int i = 0; i < numPartitions; i++) {//从BlockManager获取 DiskBlockManager 生成用于写入的唯一 blockId  和文件final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =blockManager.diskBlockManager().createTempShuffleBlock();final File file = tempShuffleBlockIdPlusFile._2();final BlockId blockId = tempShuffleBlockIdPlusFile._1();//获取磁盘块写入器DiskBlockObjectWriterDiskBlockObjectWriter writer =blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);if (partitionChecksums.length > 0) {writer.setChecksum(partitionChecksums[i]);}//将每个 DiskBlockObjectWriter 和 对应的分区绑定partitionWriters[i] = writer;}//创建要写入的文件和创建磁盘写入器都涉及与磁盘的交互,当我们打开许多文件时,总共可能需要很长时间,因此应该包含在shuffle写入时间中。writeMetrics.incWriteTime(System.nanoTime() - openStartTime);while (records.hasNext()) {//如果迭代器还有记录,获取记录final Product2<K, V> record = records.next();final K key = record._1();//根据key获取对应的分区,并调用该分区对应的DiskBlockObjectWriter  直接写入文件partitionWriters[partitioner.getPartition(key)].write(key, record._2());}for (int i = 0; i < numPartitions; i++) {try (DiskBlockObjectWriter writer = partitionWriters[i]) {//刷新部分写入并将其作为单个原子块提交。提交可能会写入额外的字节来构建原子块。partitionWriterSegments[i] = writer.commitAndGet();}}//将所有每个分区的文件合并到一个文件partitionLengths = writePartitionedData(mapOutputWriter);mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);} catch (Exception e) {......}}}

3、UnsafeShuffleWriter

public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {boolean success = false;try {while (records.hasNext()) {//如果迭代器还有数据,将其写入到shuffle sorterinsertRecordIntoSorter(records.next());}//合并写入结果文件closeAndWriteOutput();success = true;} finally {if (sorter != null) {try {//强制删除所有内存和溢出文件sorter.cleanupResources();} catch (Exception e) {......}}}}}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 风中摇曳的小萝卜(机器学习)笔记 EM算法
  • AutoIT:强大的RPA自动化脚本神器,安装到使用的保姆级教程!
  • Matlab程序练习
  • 4千6历年高考英语试题大全ACCESS\EXCEL数据库
  • strncpy陷阱
  • 运维问题0002:SAP多模块问题-SAP系统程序在执行时,跳出“加急快件”窗口,提示:快件文档“更新已终止”从作者***收到
  • SonicWall SSL VPN曝出高危漏洞,可能导致防火墙崩溃
  • 如何修复软件中的BUG
  • 第四章 类和对象 课后训练(1)
  • 数据看板多端查看无压力,教你轻松设置响应式布局
  • Dockerfile中的RUN、CMD、ENTRYPOINT指令区别
  • PCL-统计滤波
  • 十、组合模式
  • 计算机毕业设计选题推荐-自驾游攻略管理系统-Java/Python项目实战
  • 让PPT动起来:用python-pptx轻松添加动画效果
  • 【Amaple教程】5. 插件
  • CAP理论的例子讲解
  • Docker容器管理
  • HTTP 简介
  • Iterator 和 for...of 循环
  • Python十分钟制作属于你自己的个性logo
  • RxJS: 简单入门
  • socket.io+express实现聊天室的思考(三)
  • 阿里云Kubernetes容器服务上体验Knative
  • 爱情 北京女病人
  • 关于 Linux 进程的 UID、EUID、GID 和 EGID
  • 如何优雅地使用 Sublime Text
  • 入门到放弃node系列之Hello Word篇
  • 思否第一天
  • 积累各种好的链接
  • 扩展资源服务器解决oauth2 性能瓶颈
  • ​​​【收录 Hello 算法】10.4 哈希优化策略
  • ​埃文科技受邀出席2024 “数据要素×”生态大会​
  • #{} 和 ${}区别
  • #我与Java虚拟机的故事#连载01:人在JVM,身不由己
  • (12)Linux 常见的三种进程状态
  • (java)关于Thread的挂起和恢复
  • (react踩过的坑)antd 如何同时获取一个select 的value和 label值
  • (vue)el-tabs选中最后一项后更新数据后无法展开
  • (九)信息融合方式简介
  • (一)Mocha源码阅读: 项目结构及命令行启动
  • (原创)攻击方式学习之(4) - 拒绝服务(DOS/DDOS/DRDOS)
  • *setTimeout实现text输入在用户停顿时才调用事件!*
  • .jks文件(JAVA KeyStore)
  • .NET Core使用NPOI导出复杂,美观的Excel详解
  • .Net mvc总结
  • .NET 设计模式初探
  • .net 桌面开发 运行一阵子就自动关闭_聊城旋转门家用价格大约是多少,全自动旋转门,期待合作...
  • .Net(C#)常用转换byte转uint32、byte转float等
  • .NET大文件上传知识整理
  • .NET简谈设计模式之(单件模式)
  • :中兴通讯为何成功
  • @requestBody写与不写的情况
  • [ 2222 ]http://e.eqxiu.com/s/wJMf15Ku
  • [ C++ ] STL---string类的模拟实现