Spark-ShuffleWriter
一、上下文
《Spark-Task启动流程》最后讲到要从SparkEnv 获取 ShuffleManager,再从ShuffleManager获取ShuffleWriter(UnsafeShuffleWriter、BypassMergeSortShuffleWriter、SortShuffleWriter)将结果写入磁盘。下面我们就详细分析下ShuffleWriter是如何选择和写入磁盘的。
二、整体写入流程
ShuffleWriteProcessor
自定义shuffle写入过程的界面。driver 创建一个ShuffleWriteProcessor并将其放入[[ShuffleDependency]]中,executor 在每个ShuffleMapTask中使用它。
private[spark] class ShuffleWriteProcessor extends Serializable with Logging {def write(rdd: RDD[_],dep: ShuffleDependency[_, _, _],mapId: Long,context: TaskContext,partition: Partition): MapStatus = {//1、从SparkEnv获取shuffleManagerval manager = SparkEnv.get.shuffleManager//2、从shuffleManager获取ShuffleWritervar writer: ShuffleWriter[Any, Any] = manager.getWriter[Any, Any](dep.shuffleHandle,mapId,context,createMetricsReporter(context))//3、调用ShuffleWriter将结果写入磁盘writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])//4、获取写入状态val mapStatus = writer.stop(success = true)//5、如果写入成功且开启push-based shuffle 就尽可能的将结果推送到目标节点供下一个Stage使用if (mapStatus.isDefined) {if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {//....详见《Spark push-based shuffle》博客}}}}
三、获取ShuffleHandle
ShuffleDependency决定了ShuffleHandle,ShuffleHandle决定了ShuffleWriter
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {override def registerShuffle[K, V, C](shuffleId: Int,dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {//1、下游分区数 <= spark.shuffle.sort.bypassMergeThreshold(默认值200)的分区//2、不需要map侧聚合,//满足 1和2 返回这个ShuffleHandle //这种方式会直接溢写numPartitions文件,并在最后将它们连接起来。//这避免了两次进行序列化和反序列化以将溢出的文件合并在一起,(避免合并排序)//缺点是一次打开多个文件,因此分配给缓冲区的内存更多。new BypassMergeSortShuffleHandle[K, V](shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {//1、serializer支持Relocation//2、map侧没有聚合//3、下游分区数 <= 2 的 24次方 (16777215) //满足这3个条件,返回 SerializedShuffleHandlenew SerializedShuffleHandle[K, V](shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else {// 如果以上两个 ShuffleHandle所要求的条件都不满足,走这个,想想也知道它的效率最低new BaseShuffleHandle(shuffleId, dependency)}}}
四、获取ShuffleWriter
1、ShuffleManager
详细可以看《Spark-ShuffleManager》博客
def getWriter[K, V](handle: ShuffleHandle,mapId: Long,context: TaskContext,metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]
2、SortShuffleManager
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {//从shuffle_id到mapTask_id的映射,为这些shuffle生成输出private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, OpenHashSet[Long]]()override def getWriter[K, V](handle: ShuffleHandle,mapId: Long,context: TaskContext,metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {//每个shuffleId 对应的mapTask 数量初始值为16//什么是OpenHashSet?//一个简单、快速的HashSe,针对仅插入非空值的用例进行了优化,其中键永远不会被删除。//底层实现使用Scala编译器的专门化为四种基本类型(Long、Int、Double和Float)生成优化的存储。它比Java的标准HashSet快得多,同时产生的内存开销要小得多。这可以作为更高级数据结构(如优化的HashMap)的构建块。//这个OpenHashSet旨在作为更高层数据结构的构建块,如优化的HashMap。与标准HashSet实现相比,此类提供了各种回调接口(例如allocateFunc、moveFunc)和检索底层数组中键位置的接口。 //它使用二次探测,HashTable大小为2的幂,保证能探索每个键的所有空间 val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(handle.shuffleId, _ => new OpenHashSet[Long](16))mapTaskIds.synchronized { mapTaskIds.add(mapId) }val env = SparkEnv.get//根据ShuffleHandle获取对应的ShuffleWriterhandle match {case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>new UnsafeShuffleWriter(env.blockManager,context.taskMemoryManager(),unsafeShuffleHandle,mapId,context,env.conf,metrics,shuffleExecutorComponents)case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>new BypassMergeSortShuffleWriter(env.blockManager,bypassMergeSortHandle,mapId,env.conf,metrics,shuffleExecutorComponents)case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)}}}
五、使用ShuffleWriter写入数据
我们先粗略看下每个ShuffleWriter是如何将数据写入磁盘的,后面我们对每个ShuffleWriter单独详细分析
1、SortShuffleWriter
private[spark] class SortShuffleWriter[K, V, C](handle: BaseShuffleHandle[K, V, C],mapId: Long,context: TaskContext,shuffleExecutorComponents: ShuffleExecutorComponents)extends ShuffleWriter[K, V] with Logging {//将一堆记录写入此任务的输出override def write(records: Iterator[Product2[K, V]]): Unit = {//根据map端是否有聚合 构建排序器sorter = if (dep.mapSideCombine) {new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)} else {// 在这种情况下,我们既不向排序器传递聚合器也不传递排序器,//因为我们不关心 key 是否在每个分区中排序;//如果正在运行的操作是sortByKey,则将在reduce侧完成。new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)}//将数据添加到ExternalSorter中sorter.insertAll(records)//不要费心在shuffle写入时间中包括打开合并输出文件的时间,因为它只打开一个文件,所以通常太快而无法准确测量//每个map任务调用一次,以创建一个写入器,该写入器将负责持久化该map任务写入的所有分区字节。val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(dep.shuffleId, mapId, dep.partitioner.numPartitions)//将添加到此ExternalSorter中的所有数据写入mapOutputWriter ,mapOutputWriter 将字节数据推送到某个任意的后备存储。由SortShuffleWriter调用。sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)//返回两部分信息://1、一个long数组,对于从(0)到(numPartitions-1)的每个分区,它应该包含分区写入器为该分区id写入的字节数。//2、可供shuffleReader使用的可选元数据blob。//确保此模块的分区写入程序进行的写入可用于下游的reduce任务//关心随机数据损坏原因的随机扩展应该正确存储校验和。当发生损坏时,Spark会向shuffle扩展提供所获取分区的校验和,以帮助诊断损坏的原因。partitionLengths = mapOutputWriter.commitAllPartitions(sorter.getChecksums).getPartitionLengths//ShuffleMapTask向scheduler返回的结果。包括任务存储shuffle文件的block manager地址,以及每个reducer的输出大小,以便传递给reduce任务。mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)}}
2、BypassMergeSortShuffleWriter
它实现了基于排序的shuffle的 hash-style shuffle回退路径。此写入路径将传入记录写入单独的文件,每个reduce分区一个文件,然后将这些每个分区的文件连接起来,形成一个输出文件,其区域供reducer使用。记录不在内存中缓冲。它以一种可以通过IndexShuffleBlockResolver提供/使用的格式写入磁盘。
此写入路径对于具有大量reduce分区的混洗来说效率低下,因为它同时为所有分区打开单独的序列化器和文件流。因此仅在满足map端没有聚合且下游分区数 <= spark.shuffle.sort.bypassMergeThreshold (默认200)时使用
这段代码曾经是OexternalSorter的一部分,但为了降低代码复杂性被重构
final class BypassMergeSortShuffleWriter<K, V>extends ShuffleWriter<K, V>implements ShuffleChecksumSupport {public void write(Iterator<Product2<K, V>> records) throws IOException {assert (partitionWriters == null);//每个map任务调用一次,以创建一个mapOutputWriter ,它将负责持久化该map任务写入的所有分区字节。ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(shuffleId, mapId, numPartitions);try {if (!records.hasNext()) {//如果迭代器中的数据都处理完毕 统计分区数量,返回 MapStatus//返回两部分信息://1、一个long数组,对于从(0)到(numPartitions-1)的每个分区,它应该包含分区写入器为该分区id写入的字节数。//2、可供shuffleReader使用的可选元数据blob。//确保此模块的分区写入程序进行的写入可用于下游的reduce任务//关心随机数据损坏原因的随机扩展应该正确存储校验和。当发生损坏时,Spark会向shuffle扩展提供所获取分区的校验和,以帮助诊断损坏的原因。partitionLengths = mapOutputWriter.commitAllPartitions(ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE).getPartitionLengths();//ShuffleMapTask向scheduler返回的结果。包括任务存储shuffle文件的block manager地址,以及每个reducer的输出大小,以便传递给reduce任务。mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);return;}//从同一序列化程序实例创建多个序列化/反序列化流是合法的,只要这些流都在同一线程中使用。final SerializerInstance serInstance = serializer.newInstance();final long openStartTime = System.nanoTime();//分区溢写器数组//一个用于将JVM对象直接写入磁盘文件的类。此类允许将数据附加到现有块。为了提高效率,它在多个提交中保留了底层文件通道。此通道保持打开状态,直到调用close()。如果发生故障,调用者应使用revertPartialWritesAndClose()关闭,以原子方式还原未提交的部分写入。//此类不支持并发写入。此外,一旦 writer 被打开,就不能再重新打开。partitionWriters = new DiskBlockObjectWriter[numPartitions];//根据偏移量和长度引用文件的特定段(可能是整个文件)。partitionWriterSegments = new FileSegment[numPartitions];//循环每一个分区for (int i = 0; i < numPartitions; i++) {//从BlockManager获取 DiskBlockManager 生成用于写入的唯一 blockId 和文件final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =blockManager.diskBlockManager().createTempShuffleBlock();final File file = tempShuffleBlockIdPlusFile._2();final BlockId blockId = tempShuffleBlockIdPlusFile._1();//获取磁盘块写入器DiskBlockObjectWriterDiskBlockObjectWriter writer =blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);if (partitionChecksums.length > 0) {writer.setChecksum(partitionChecksums[i]);}//将每个 DiskBlockObjectWriter 和 对应的分区绑定partitionWriters[i] = writer;}//创建要写入的文件和创建磁盘写入器都涉及与磁盘的交互,当我们打开许多文件时,总共可能需要很长时间,因此应该包含在shuffle写入时间中。writeMetrics.incWriteTime(System.nanoTime() - openStartTime);while (records.hasNext()) {//如果迭代器还有记录,获取记录final Product2<K, V> record = records.next();final K key = record._1();//根据key获取对应的分区,并调用该分区对应的DiskBlockObjectWriter 直接写入文件partitionWriters[partitioner.getPartition(key)].write(key, record._2());}for (int i = 0; i < numPartitions; i++) {try (DiskBlockObjectWriter writer = partitionWriters[i]) {//刷新部分写入并将其作为单个原子块提交。提交可能会写入额外的字节来构建原子块。partitionWriterSegments[i] = writer.commitAndGet();}}//将所有每个分区的文件合并到一个文件partitionLengths = writePartitionedData(mapOutputWriter);mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);} catch (Exception e) {......}}}
3、UnsafeShuffleWriter
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {boolean success = false;try {while (records.hasNext()) {//如果迭代器还有数据,将其写入到shuffle sorterinsertRecordIntoSorter(records.next());}//合并写入结果文件closeAndWriteOutput();success = true;} finally {if (sorter != null) {try {//强制删除所有内存和溢出文件sorter.cleanupResources();} catch (Exception e) {......}}}}}