当前位置: 首页 > news >正文

论文阅读-MapReduce

论文名称:MapReduce: Simplified Data Processing on Large Clusters

翻译的效果不是很好,有空再看一遍,参照一下别人翻译的。

MapReduce:Simplified Data Processing on Large Clusters 中文翻译版(转) - 阿洒 - 博客园 (cnblogs.com)

概要

MapReduce是一种处理和生成大数据集的编程模型和相关实现。用户可以指定一个map函数,该函数处理键/值对以生成一组中间键/值对,还可以指定一个reduce函数,该函数合并与同一中间键相关联的所有中间值。许多真实世界的任务可以用这个模型来表达,如论文中所示。

以这种函数式风格编写的程序会自动并行化,并在一组商用计算机集群上执行。运行时系统负责处理输入数据的分区、在一组计算机上调度程序的执行、处理机器故障以及管理所需的机器间通信。这使得没有并行和分布式系统经验的程序员可以轻松地利用大型分布式系统的资源。

我们的MapReduce实现在一个由商用计算机组成的大型集群上运行,并具有高度可扩展性:一个典型的MapReduce计算可以在数千台机器上处理数TB的数据。程序员们发现这个系统很容易使用:已经实现了数百个MapReduce程序,并且每天在Google的集群上执行数千个MapReduce作业。

1 介绍

在过去的五年里,Google的作者和许多其他人已经实现了数百个专用计算,处理大量原始数据,例如爬取的文档、Web请求日志等,以计算各种派生数据,比如倒排索引、Web文档图结构的各种表示形式、每个主机爬取页面数的摘要、给定日期中最频繁查询的集合等。大多数这样的计算在概念上都很简单。然而,输入数据通常很大,计算必须分布在数百或数千台机器上,才能在合理的时间内完成。并行化计算、分发数据和处理故障的问题使得我们需要编写大量复杂的代码来解决这些问题。

为了应对这种复杂性,他们设计了一个新的抽象,允许他们表达正在尝试执行的简单计算,但隐藏了并行化、容错、数据分发和负载平衡的混乱细节,在库中进行处理。他们的抽象受到了Lisp和许多其他函数式语言中存在的映射和约简原语的启发。他们意识到,他们的大多数计算都涉及将map操作应用于输入中的每个逻辑“记录”,以便计算一组中间键/值对,然后将reduce操作应用于共享相同键的所有值,以便适当地组合派生数据。他们使用具有用户指定的map和reduce操作的函数模型,可以轻松并行化大型计算,并将重新执行作为容错的主要机制。

这项工作的主要贡献是一个简单而强大的接口,可以自动并行化和分布大规模计算,结合实现此接口的实现,在商用PC集群上实现了高性能。

第2节介绍了基本的编程模型,并给出了几个示例。第3节描述了针对我们基于集群的计算环境量身定制的MapReduce接口的实现。第4节描述了我们发现有用的编程模型的几个改进。第5节对我们的实现在各种任务中的性能进行了测量。第6节探讨了在Google内部使用MapReduce的情况,包括我们将其作为重写生产索引系统的基础的经验。第7节讨论了相关和未来的工作。

2 编程模型

计算接受一组输入键/值对,并生成一组输出键/值对。MapReduce库的用户通过两个函数来表达计算:Map和Reduce。

由用户编写的Map函数接受一个输入对,并产生一组中间键/值对。MapReduce库会将所有与相同中间键I关联的中间值组合在一起,并将它们传递给Reduce函数。

Reduce函数也由用户编写,接受一个中间键I和该键的一组值。它将这些值合并在一起,形成一个可能较小的值集合。通常,每次调用Reduce只会产生零个或一个输出值。中间值通过迭代器提供给用户reduce函数。这允许我们处理无法全部放入内存的值列表。

2.1 示例

考虑在大量文档集合中统计每个单词出现次数的问题。用户可以编写类似于以下伪代码的代码:

map(String key, String value):// key: document name// value: document contentsfor each word w in value:EmitIntermediate(w, "1");
reduce(String key, Iterator values):// key: a word// values: a list of countsint result = 0;for each v in values:result += ParseInt(v);Emit(AsString(result));

map函数发出每个单词及其关联的出现次数计数(在这个简单示例中只是'1')。reduce函数将为特定单词发出的所有计数求和。

此外,用户还需要编写代码来填充一个MapReduce规范对象,包括输入和输出文件的名称以及可选的调优参数。然后,用户调用MapReduce函数,将规范对象传递给它。用户的代码与MapReduce库(用C++实现)链接在一起。附录A中包含了此示例的完整程序文本。

2.2 类型

尽管前面的伪代码是以字符串输入和输出为基础编写的,但用户提供的map和reduce函数在概念上具有关联类型:

map(k1,v1) → list(k2,v2)

reduce(k2,list(v2)) → list(v2)

也就是说,输入键和值和输出键和值来自不同的域。此外,中间键和值与输出键和值来自相同的域。 我们的C++实现将字符串传递给用户定义的函数,并让用户代码在字符串和适当的类型之间进行转换。

2.3 更多示例

下面是一些可以轻松表达为MapReduce计算的有趣程序的简单示例。

Distributed Grep:如果匹配了提供的文本模式或关键词,则map函数发出一行。reduce函数是一个恒等函数,只是将中间数据复制到输出中。

URL访问频率计数(Count of URL Access Frequency):map函数处理网页请求日志并输出<URL,1>。reduce函数将相同URL的所有值相加,并发出<URL,总数i>。

反向Web-Link图(Reverse Web-Link Graph):map函数输出<target,source>,对于在名为source的页面中找到的指向目标URL的每个链接。reduce函数将与给定目标URL关联的所有源URL列表连接起来,并发出<target,list(source)>。它可以帮助我们了解网页之间的引用关系,即哪些网页链接到了目标网页。

每个主机的术语向量(Term-Vector per Host):它可以帮助我们了解在一个大规模的文本数据集中,每个主机(例如网站)包含哪些重要的单词以及它们的频率。一个术语向量将一组文档中出现的最重要的单词总结为<word,frequency>的列表。map函数为每个输入文档(其中主机名从文档的URL中提取)发出<hostname,term vector>。reduce函数接收给定主机的所有每个文档术语向量。它将这些术语向量相加,丢弃不频繁的术语,然后发出最终的<hostname,term vector>。

倒排索引(Inverted Index):映射函数解析每个文档,并生成一系列的 (单词, 文档ID) 对。归约函数接受给定单词的所有对,对相应的文档ID进行排序,并生成一个 (单词, 文档ID列表) 对。所有输出对的集合形成了一个简单的倒排索引。可以很容易地扩展这个计算以跟踪单词的位置。

分布式排序:映射函数从每个记录中提取键,并生成一个 (键, 记录) 对。归约函数保持所有对不变地输出。这种计算依赖于第4.1节描述的分区设施和第4.2节描述的排序属性。

3 实现

MapReduce接口有许多不同的实现方式。正确的选择取决于环境。例如,一个实现可能适用于小型共享内存机器,另一个适用于大型NUMA多处理器,还有一个适用于更大规模的网络机器集合。

本节描述了一个针对谷歌广泛使用的计算环境的实现:由廉价PC组成的大型集群,它们通过交换式以太网连接在一起[4]。在我们的环境中:

(1)机器通常是运行Linux的双处理器x86处理器,每台机器配备2-4 GB内存。

(2)使用的是廉价的网络硬件 - 通常是机器级别的100兆位/秒或1千兆位/秒,但在整体双向带宽上平均要低得多。

(3)一个集群由数百台或数千台机器组成,因此机器故障很常见。

(4)存储由直接连接到各个机器的廉价IDE磁盘提供。我们使用内部开发的分布式文件系统[8]来管理这些磁盘上存储的数据。该文件系统使用复制来提供不可靠硬件上的可用性和可靠性。

(5)用户向调度系统提交作业。每个作业由一组任务组成,并由调度程序映射到集群中的一组可用机器。

3.1 执行概述

Map调用通过自动将输入数据分割成一组M个片段来在多个计算机上进行分发。可以使用不同的计算机并行处理输入片段。Reduce调用通过使用分区函数(例如,hash(key) mod R)将中间键空间划分为R个部分来进行分发。用户指定分区数(R)和分区函数。 图1显示了我们实现中MapReduce操作的整体流程。当用户程序调用MapReduce函数时,会发生以下一系列动作(图1中的编号与下面列表中的数字相对应):

  1. 用户程序中的MapReduce库首先将输入文件分割为通常为16兆字节至64兆字节(MB)每个片段的M个片段(可由用户通过可选参数进行控制)。然后,在计算机集群上启动许多程序副本。
  2. 程序的其中一个副本是特殊的——主节点。其余的是由主节点分配工作的工作节点。需要分配M个Map任务和R个Reduce任务。主节点选择空闲的工作节点,并为每个工作节点分配一个Map任务或Reduce任务。
  3. 被分配了Map任务的工作节点读取相应输入片段的内容。它从输入数据中解析出键/值对,并将每个对传递给用户定义的Map函数。Map函数产生的中间键/值对被缓存在内存中。
  4. 定期,缓冲的键/值对被写入本地磁盘,并通过分区函数分成R个区域。这些缓冲的键/值对在本地磁盘上的位置被传回给主节点,主节点负责将这些位置转发给Reduce工作节点。
  5. 当Reduce工作节点被主节点通知这些位置时,它使用远程过程调用从Map工作节点的本地磁盘读取缓冲数据。当Reduce工作节点读取完所有中间数据后,它按照中间键对数据进行排序,以使相同键的所有出现被分组在一起。排序是必需的,将具有相同键的键值对聚合在一起,以便进行归约操作。如果中间数据量太大无法放入内存,则使用外部排序。
  6. Reduce工作节点遍历排序后的中间数据,并对每个唯一的中间键遇到的键和相应的中间值集合调用用户的Reduce函数。Reduce函数的输出被追加到该Reduce分区的最终输出文件中。
  7. 当所有Map任务和Reduce任务都完成时,主节点唤醒用户程序。此时,用户程序中的MapReduce调用返回给用户代码。 成功完成后,MapReduce执行的输出可在R个输出文件中获得(每个Reduce任务一个文件,文件名由用户指定)。通常,用户不需要将这些R个输出文件合并成一个文件,他们通常将这些文件作为另一个MapReduce调用的输入,或者从另一个能处理分成多个文件的分布式应用中使用它们。

3.2 主节点数据结构

主节点保留了几个数据结构。对于每个Map任务和Reduce任务,它存储状态(空闲、进行中或已完成)以及工作节点机器的标识(对于非空闲任务)。

主节点是从Map任务到Reduce任务传递中间文件区域位置的通道。因此,对于每个已完成的Map任务,主节点存储由Map任务产生的R个中间文件区域的位置和大小。在Map任务完成时,接收到的有关位置和大小信息的更新被逐步推送给具有进行中的Reduce任务的工作节点。

3.3 容错性

由于 MapReduce 库旨在使用数百或数千台机器处理大量数据,因此该库必须能够优雅地容忍机器故障。

工作机器故障

主节点会定期向每个工作节点发送 ping 消息。如果在一定时间内未收到工作节点的响应,主节点将标记该工作节点已失败。由该工作节点完成的所有 map 任务都将被重置回其初始空闲状态,因此可以安排在其他工作节点上。类似地,由失败工作节点执行的任何正在进行中的 map 任务或 reduce 任务也将被重置为空闲并可以重新安排。

由于已完成的 map 任务的输出存储在失败机器的本地磁盘上,因此无法访问,因此必须在发生故障时重新执行这些任务。而已完成的 reduce 任务不需要重新执行,因为其输出存储在全局文件系统中。

当 map 任务首先由工作节点 A 执行,然后因为 A 失败而稍后由工作节点 B 执行时,所有执行 reduce 任务的工作节点将被通知重新执行。尚未从工作机器 A 读取数据的任何 reduce 任务将从工作机器 B 读取数据。MapReduce 对大规模的工作机器故障具有弹性。例如,在一个 MapReduce 操作期间,运行集群上的网络维护导致每次有 80 台机器无法访问数分钟。MapReduce 主节点仅重新执行不可访问工作机器执行的任务,并继续前进,最终完成 MapReduce 操作。

主节点故障

很容易使主节点定期写入上述主数据结构的检查点。如果主任务失败,则可以从最后一个检查点状态开始启动新副本。但是,由于只有一个主节点,其故障不太可能发生。因为我们当前的实现会在主节点失败时中止 MapReduce 计算。客户端可以检查此条件并在需要时重试 MapReduce 操作。

存在故障时的语义

当用户提供的 map 和 reduce 运算符是其输入值的确定性函数(幂等函数)时,我们的分布式实现产生的输出与整个程序的非故障顺序执行产生的输出相同。

我们依靠 map 和 reduce 任务输出的原子提交来实现此属性。每个正在进行的任务将其输出写入私有临时文件。一个 reduce 任务产生一个这样的文件,而一个 map 任务产生 R 个这样的文件(每个 reduce 任务一个)。当 map 任务完成时,工作节点向主节点发送消息,并在消息中包括 R 个临时文件的名称。如果 master 已经接收到了某个 map 任务的完成消息,那么再次接收到相同任务的完成消息时,master 将不会对其进行处理,而是直接忽略此消息。否则,它会在主数据结构中记录 R 个文件的名称。当 reduce 任务完成时,reduce 工作节点会将其临时输出文件原子重命名为最终输出文件。如果多台机器上执行了相同的 reduce 任务,则会为该最终输出文件执行多个重命名调用。我们依靠底层文件系统提供的原子重命名操作来保证最终文件系统状态仅包含 reduce 任务执行的一次数据。

我们的大多数 map 和 reduce 运算符都是确定性的,而且我们的语义在这种情况下等同于顺序执行,因此程序员很容易推断出其程序的行为。当 map 和/或 reduce 运算符是非确定性的时,我们提供弱但仍合理的语义。在存在非确定性运算符的情况下,特定 reduce 任务 R1 的输出与非确定性程序的顺序执行产生的 R1 输出相同。然而,不同 reduce 任务 R2 的输出可能对应于不同的非确定性程序的顺序执行产生的 R2 输出。

考虑 map 任务 M 和 reduce 任务 R1 和 R2。让 e(Ri) 表示提交的 Ri 执行(恰好有一个这样的执行)。弱语义的产生是因为 e(R1) 可能已读取由 M 的一个执行产生的输出,而 e(R2) 可能已读取由一个不同的 M 执行产生的输出。

3.4 局部性

在我们的计算环境中,网络带宽是相对稀缺的资源。通过利用输入数据(由 GFS 管理)存储在构成我们集群的机器的本地磁盘上这一事实,我们节省了网络带宽。GFS 将每个文件分为 64MB 块,并在不同的机器上存储多个副本(通常是 3 个副本)。MapReduce 主节点考虑输入文件的位置信息,并尝试在包含相应输入数据副本的机器上安排 map 任务。如果失败,则尝试在与该任务输入数据的副本相邻近的工作机器上安排 map 任务(例如,与包含数据的机器位于同一网络交换机上的工作机器)。在集群的大部分工作机器上运行大型 MapReduce 操作时,大部分输入数据都是本地读取的,不会占用网络带宽。

3.5 任务粒度

我们将 map 阶段分成 M 个部分,将 reduce 阶段分成 R 个部分,如上所述。理想情况下,M 和 R 应远大于工作机器的数量。使每个工作节点执行许多不同的任务可以改善动态负载平衡,并且当出现工作节点故障时可以加快恢复:它已完成的许多 map 任务可以分散在所有其他工作节点上。

在我们的实现中,M 和 R 可以有实际限制,因为主节点必须做出 O(M+R) 个调度决策,并以上述方式在内存中保留 O(MR) 的状态。(但是,内存使用的常量因子很小:O(MR) 状态部分包含每个 map 任务/ reduce 任务对应的大约一个字节的数据。)

此外,R 经常受到用户的限制,因为每个 reduce 任务的输出都会放入一个单独的输出文件中。在实践中,我们倾向于选择 M,使得每个单独的任务的输入数据大约为 16MB 到 64MB(这样上述的局部性优化才最有效),而且我们会将 R 设置为预计使用的 worker 机器数量的一个较小倍数。我们通常使用 M = 200,000 和 R = 5,000,并使用 2,000 台 worker 机器进行 MapReduce 计算。

3.6 备份任务

导致 MapReduce 操作总时间延长的一个常见原因是“慢节点”:一个机器在计算中的最后几个 map 或 reduce 任务中花费了异常长的时间。慢节点可能出现的原因有很多。例如,一台磁盘有问题的机器可能会频繁发生可纠正错误,从而将其读取性能从 30MB/s 降低到 1MB/s。集群调度系统可能在机器上安排了其他任务,导致由于 CPU、内存、本地磁盘或网络带宽的竞争,执行 MapReduce 代码变慢。我们最近遇到的一个问题是机器初始化代码中的一个 bug,导致处理器缓存被禁用:受影响的机器上的计算速度下降了一百倍以上。

我们有一个通用的机制来缓解慢节点问题。当 MapReduce 操作接近完成时,主节点会安排备份执行剩余的正在进行中的任务。只要主节点任务或备份任务中的任何一个完成,该任务就被标记为已完成。我们调整了这个机制,使其通常仅增加操作使用的计算资源不超过几个百分点。我们发现这显著减少了完成大型 MapReduce 操作所需的时间。例如,在第5.3节描述的排序程序中,如果禁用了备份任务机制,完成时间将增加44%。

4 优化

尽管简单地编写 Map 和 Reduce 函数提供的基本功能对于大多数需求来说已经足够了,但我们发现一些扩展非常有用。本节将介绍其中一些。

4.1 分区函数

MapReduce 的用户指定他们需要的 reduce 任务/输出文件的数量(R)。数据使用一个基于中间键的分区函数分配到这些任务上。我们提供了一个默认的分区函数,使用哈希函数(例如 "hash(key) mod R")。这通常会得到相对均衡的分区。然而,在某些情况下,按照键的某些其他函数进行分区是有用的。例如,有时输出键是 URL,我们希望同一主机的所有条目都放在同一个输出文件中。为了支持这样的情况,MapReduce 库的使用者可以提供一个特殊的分区函数。例如,使用 "hash(Hostname(urlkey)) mod R" 作为分区函数会使得所有来自同一主机的 URL 最终都放在同一个输出文件中。

4.2 顺序保证

我们保证在给定的分区内,中间键/值对按递增键的顺序进行处理。这个顺序保证使得在每个分区生成一个排序的输出文件变得容易,当输出文件格式需要通过键进行高效的随机访问查找,或者输出的用户希望数据被排序时,这非常有用。

4.3 组合器函数

在某些情况下,每个 map 任务产生的中间键存在显著的重复,并且用户指定的 Reduce 函数是可交换和可结合的。示例是第2.1节中的单词计数问题。由于单词频率往往遵循 Zipf 分布,每个 map 任务将产生形如 <the, 1> 的数百个或数千个记录。所有这些计数将通过网络发送到单个 reduce 任务,然后由 Reduce 函数相加以生成一个数字。我们允许用户指定一个可选的组合器函数,在数据发送到网络之前对其进行部分合并。

组合器函数在执行 map 任务的每台机器上执行。通常,同样的代码用于实现组合器和 reduce 函数。reduce 函数和组合器函数之间唯一的区别是 MapReduce 库如何处理函数的输出。reduce 函数的输出被写入最终的输出文件,而组合器函数的输出被写入将要发送给 reduce 任务的中间文件。

部分合并显著加快了某些类别的 MapReduce 操作。附录 A 中包含了一个使用组合器的示例。

4.4 输入和输出类型

MapReduce库提供了对多种不同格式的输入数据的支持。例如,“文本”模式的输入将每一行视为一个键/值对:键是文件中的偏移量,值是行内容。另一种常见的支持格式是按键排序的键/值对序列存储。每种输入类型的实现都知道如何将自身拆分为有意义的范围,以便作为单独的映射任务进行处理(例如,文本模式的范围拆分确保范围拆分仅在行边界处发生)。用户可以通过提供简单读取器接口的实现来添加对新输入类型的支持,不过大多数用户只使用预定义的少数输入类型之一。 读取器不一定需要提供从文件读取的数据。例如,可以很容易地定义一个从数据库或内存中映射的数据结构中读取记录的读取器。

类似地,我们还支持一组输出类型,用于以不同的格式生成数据,并且用户代码很容易添加对新输出类型的支持。

4.5 副作用

在某些情况下,MapReduce的用户发现从他们的映射和/或归约操作中生成附加输出文件非常方便。我们依赖应用程序编写者使这些副作用具有原子性和幂等性。通常,应用程序会将数据写入临时文件,并在生成完全后以原子方式重命名此文件。

我们不支持由单个任务产生的多个输出文件的原子两阶段提交。因此,产生具有跨文件一致性要求的多个输出文件的任务应该是确定性的。在实践中,这种限制从未成为问题。

4.6 跳过错误记录

有时用户代码中存在错误,导致映射或归约函数在某些记录上发生确定性崩溃。此类错误会导致MapReduce操作无法完成。通常的做法是修复错误,但有时这是不可行的;也许错误出现在源代码不可用的第三方库中。此外,在对大型数据集进行统计分析时,忽略一些记录是可以接受的。我们提供了一种可选的执行模式,其中MapReduce库检测到哪些记录导致确定性崩溃,并跳过这些记录以便继续执行。

每个工作进程都安装了一个信号处理程序,用于捕获分段违规和总线错误。在调用用户映射或归约操作之前,MapReduce库将参数的序列号存储在全局变量中。如果用户代码生成了一个信号,信号处理程序会向MapReduce主节点发送一个“最后一口气”的UDP数据包,其中包含序列号。当主节点在特定记录上看到多个失败时,它表示在下一次重新执行相应的映射或归约任务时应该跳过该记录。

4.7 本地执行

在Map或Reduce函数中调试问题可能会很棘手,因为实际计算发生在分布式系统中,通常在数千台机器上进行,并且由主节点动态进行工作分配决策。为了帮助进行调试、分析和小规模测试,我们开发了MapReduce库的另一种实现方式,该实现在本地机器上顺序执行MapReduce操作的所有工作。用户可以提供控制参数,以便将计算限制在特定的映射任务上。用户可以使用特殊标志运行程序,然后轻松使用任何有用的调试或测试工具(例如gdb)。

4.8 状态信息

主节点运行一个内部HTTP服务器,并导出一组用于人类查看的状态页面。这些状态页面显示计算的进度,例如已完成的任务数量、正在进行的任务数量、输入字节数、中间数据字节数、输出字节数、处理速率等。这些页面还包含链接到每个任务生成的标准错误和标准输出文件。用户可以使用这些数据来预测计算所需的时间,以及是否应该向计算中添加更多资源。这些页面还可以用于确定计算是否比预期慢得多。

此外,顶级状态页面显示了哪些工作进程失败以及它们在失败时正在处理的映射和归约任务。在诊断用户代码中的错误时,这些信息非常有用。

4.9 计数器

MapReduce库提供了一个计数器功能,用于计算各种事件的发生次数。例如,用户代码可能希望计算已处理的单词总数或索引的德语文档数等。

要使用此功能,用户代码创建一个命名计数器对象,并在映射和/或归约函数中适当地增加计数器。例如:

Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):for each word w in contents:if (IsCapitalized(w)):uppercase->Increment();EmitIntermediate(w, "1");

来自各个工作机器的计数器值会定期传播到主节点(通过 ping 响应进行打包)。主节点会合并成功的映射和减少任务的计数器值,并在 MapReduce 操作完成时将它们返回给用户代码。当前的计数器值也会显示在主节点状态页上,以便人类能够观察实时计算的进展。在汇总计数器值时,主节点会消除相同映射或减少任务的重复执行的影响,以避免重复计数。(重复执行可能来源于我们使用的备用任务以及由于失败而重新执行任务。)

MapReduce 库会自动维护一些计数器值,例如处理的输入键/值对数量以及生成的输出键/值对数量。

用户已经发现计数器设施对于检查 MapReduce 操作的行为非常有用。例如,在某些 MapReduce 操作中,用户代码可能希望确保生成的输出对数量恰好等于处理的输入对数量,或者德语文档处理的比例在可容忍的文档总数的一定比例内。

5 性能

在本节中,我们测量了在一个大型机器群上运行的两个计算的 MapReduce 性能。一个计算搜索大约一T字节的数据,寻找特定的模式。另一个计算对大约一T字节的数据进行排序。

这两个程序代表了由 MapReduce 用户编写的真实程序的一个大子集 - 其中一类程序将数据从一种表示转换为另一种表示,另一类程序从大数据集中提取少量有趣的数据。

5.1 集群配置

所有程序都在一个大约由 1800 台机器组成的集群上执行。每台机器都配备了两个2GHz 的英特尔至强处理器,启用了超线程技术,4GB 的内存,两块 160GB 的 IDE 硬盘和一个千兆以太网连接。这些机器被布置在一个两级树形交换网络中,根节点处可用的聚合带宽约为100-200 Gbps。所有机器都位于同一个托管设施中,因此任何两台机器之间的往返时间都不到一毫秒。

在 4GB 内存中,大约有 1-1.5GB 被集群上运行的其他任务所占用。这些程序是在周末下午执行的,当时 CPU、硬盘和网络大部分时间都处于空闲状态。

5.2 Grep

grep 程序扫描了 10^10 个100字节的记录,查找一个相对罕见的三字符模式(该模式出现在92337条记录中)。输入被分割成大约64MB的片段(M=15000),整个输出被放入一个文件(R=1)。

图2显示了计算随时间的进展情况。Y轴显示了输入数据被扫描的速率。随着更多的机器被分配给这个 MapReduce 计算,速率逐渐提高,在分配了1764台工作机器后达到30 GB/s以上的峰值。随着映射任务的完成,速率开始下降,并在计算进行约80秒后降至零。整个计算从开始到结束大约需要150秒。这包括大约一分钟的启动开销。开销是由于程序传播到所有工作机器,以及与 GFS 的交互延迟(用于打开1000个输入文件以及获取用于优化局部性的信息)。

5.3 Sort

sort 程序对10^10个100字节的记录进行排序(大约1T字节的数据)。该程序是按照 TeraSort 基准 [10] 进行建模的。

排序程序包括不到50行用户代码。三行的 Map 函数从文本行中提取一个10字节的排序键,并将该键和原始文本行作为中间键/值对发射出来。我们使用了内置的 Identity 函数作为 Reduce 运算符。该函数将中间键/值对不变地传递为输出键/值对。最终排序的输出被写入一组2路复制的 GFS 文件中(即,输出的程序写入了2T字节)。

与之前一样,输入数据被分成64MB的片段(M = 15000)。我们将排序后的输出分成4000个文件(R = 4000)。划分函数使用键的初始字节将其分割为R个部分之一。对于这个基准测试,我们的划分函数具有对键的分布的内置知识。在一般的排序程序中,我们会添加一个预处理的MapReduce操作,收集键的样本,并使用样本键的分布来计算最终排序的划分点。

图3(a)显示了排序程序正常执行的进度。左上角的图表显示了读取输入数据的速率。速率峰值约为13GB/s,并且在很短时间内迅速下降,因为所有的map任务在200秒内就完成了。需要注意的是,输入速率比grep要低。这是因为排序的map任务大约有一半的时间和I/O带宽用于将中间输出写入本地磁盘。而grep的中间输出大小可以忽略不计。

中间左侧的图表显示了从map任务发送到reduce任务的网络传输数据的速率。这种随机化在第一个map任务完成后立即开始。图中的第一个峰值是大约1700个reduce任务的第一批(整个MapReduce任务分配了大约1700台机器,每台机器同时执行最多一个reduce任务)。在计算进行到大约300秒时,第一批reduce任务中的一些任务完成,我们开始为剩余的reduce任务进行数据随机化。整个随机化过程在计算进行到大约600秒时完成。

左下角的图表显示了reduce任务将排序数据写入最终输出文件的速率。在第一个随机化阶段结束和写入阶段开始之间存在一段延迟,因为机器正在忙于对中间数据进行排序。一段时间内,写入速率约为2-4GB/s。所有的写入在计算进行到大约850秒时完成。包括启动开销,整个计算过程需要891秒。这与TeraSort基准测试的当前最佳报告结果1057秒相似。

需要注意的几点是:输入速率高于随机化速率和输出速率,这是因为我们的本地化优化——大多数数据从本地磁盘读取,并且绕过了相对带宽受限的网络。随机化速率高于输出速率,这是因为输出阶段会写入排序数据的两个副本(为了可靠性和可用性的原因,我们复制了输出的两个副本)。我们写入两个副本是因为这是底层文件系统提供的可靠性和可用性机制。如果底层文件系统使用纠删码而不是复制,那么写入数据的网络带宽需求将会降低。

5.4 备份任务的影响

在图3(b)中,我们展示了禁用备份任务的排序程序执行情况。执行流程与图3(a)中所示类似,只是存在一个非常长的尾部,在该尾部几乎没有写入活动。在960秒后,除了5个reduce任务外,全部任务都已完成。然而,这最后几个耗时较长的任务直到300秒后才完成。整个计算花费了1283秒,增加了44%的运行时间。

5.5 机器故障

在图3(c)中,我们展示了排序程序的一次执行,在计算进行几分钟后,我们有意杀死了1746个worker进程中的200个。底层集群调度程序立即在这些机器上重新启动了新的worker进程(因为只是进程被杀死了,机器仍然正常运行)。

由于一些先前完成的map任务消失(因为相应的map worker被杀死),需要重新执行这些map任务,所以工作进度显示为负输入速率。这些map任务的重新执行相对较快。整个计算在包括启动开销在内的933秒内完成(只比正常执行时间增加了5%)。

6 经验

我们在2003年2月编写了MapReduce库的第一个版本,并在2003年8月进行了重大改进,包括局部性优化、动态负载平衡等。自那时以来,我们惊喜地发现MapReduce库在我们处理的各种问题中都非常适用。它已经被广泛应用于Google内部的各个领域,包括:

  • 大规模机器学习问题
  • Google News和Froogle产品的聚类问题
  • 提取用于生成热门查询报告(如Google Zeitgeist)的数据
  • 从大量网页中提取属性用于新实验和产品(例如从大量网页语料库中提取地理位置以进行本地搜索)
  • 大规模图计算

在图4中,展示了随着时间推移,在我们的主要源代码管理系统中检入的独立MapReduce程序数量的显著增长,从2003年初的0个到截至2004年9月底近900个实例。MapReduce之所以如此成功,是因为它使得编写一个简单的程序并在半小时内在一千台机器上高效运行成为可能,大大加快了开发和原型开发周期。此外,它让没有分布式和/或并行系统经验的程序员轻松利用大量资源。

在每个任务结束时,MapReduce库记录关于任务使用的计算资源的统计信息。在表1中,我们展示了Google在2004年8月运行的MapReduce任务子集的一些统计数据。

6.1 大规模索引

到目前为止,我们最重要的 MapReduce 应用是重新设计了 Google 网络搜索服务所需的生产索引系统。该索引系统的输入是一组由我们的爬虫系统检索而来、以 GFS 文件的形式存储的大量文档。这些文档的原始内容超过 20 TB。索引过程作为五至十个 MapReduce 操作序列运行。使用 MapReduce(而不是旧索引系统中的特定分布式处理)带来了多个好处:

• 索引代码更简单、更小、更易于理解,因为处理容错、分布和并行化的代码都隐藏在 MapReduce 库中。例如,使用 MapReduce 表示的计算过程中的一个阶段,其 C++ 代码行数从约 3800 行减少到了约 700 行。

• MapReduce 库的性能足够好,可以保持概念上无关的计算过程分离,而不是将它们混合起来以避免对数据进行额外的处理。这使得修改索引过程变得更加容易。例如,在旧的索引系统中需要几个月的时间才能实现的一项更改,在新系统中只需要几天时间即可实现。

• 索引过程更易于操作,因为由 MapReduce 库自动处理大部分由机器故障、慢速机器和网络中断引起的问题,无需操作员干预。而且,通过向索引集群添加新的机器,可以轻松提高索引过程的性能。

7 相关工作

许多系统提供了受限编程模型,并利用这些限制自动并行化计算。例如,可以使用并行前缀计算[6, 9, 13] 在 N 个处理器上对 N 元素数组的所有前缀进行关联函数计算,其时间复杂度为 O(log N) 。MapReduce 可以被认为是这些模型的简化和提炼,基于我们处理大型真实世界计算的经验。更重要的是,我们提供了一个容错实现,可扩展到数千个处理器。相比之下,大多数并行处理系统仅在较小规模上实现,并将处理机器故障的详细信息留给程序员处理。

Bulk Synchronous Programming [17] 和一些 MPI 原语 [11] 提供了更高级别的抽象,使程序员更容易编写并行程序。 MapReduce 与这些系统的主要区别在于,MapReduce 利用受限编程模型自动并行化用户程序,并提供透明的容错处理。

我们的本地性优化受到像 active disks [12, 15] 这样的技术的启发,在这些技术中,计算被推进到靠近本地磁盘的处理元素中,以减少在 I/O 子系统或网络上发送的数据量。我们在普通处理器上运行,其中连接了少量磁盘,而不是直接在磁盘控制器处理器上运行,但总体方法类似。

我们的备份任务机制类似于 Charlotte System [3] 中采用的急切调度机制。简单急切调度的一个缺点是,如果给定任务导致重复故障,则整个计算将无法完成。我们通过跳过坏记录的机制来修复一些这种问题。

MapReduce 实现依赖于一个内部集群管理系统,负责在大量共享机器上分发和运行用户任务。尽管不是本文的重点,但集群管理系统与其他系统(例如 Condor [16])的精神相似。

MapReduce 库中的排序功能与 NOW-Sort [1] 类似。源机器(map worker)将要排序的数据分区并发送到 R 个 reduce worker 中的一个。每个 reduce worker 在本地对其数据进行排序(如果可能的话,在内存中)。当然,NOW-Sort 没有用户可定义的 Map 和 Reduce 函数,这使得我们的库具有广泛的适用性。

River[2]提供了一种编程模型,其中进程通过在分布式队列上发送数据来相互通信。与MapReduce类似,River系统试图在异构硬件或系统扰动引入的非均匀性的情况下,提供良好的平均情况性能。River通过精心安排磁盘和网络传输来实现平衡的完成时间来实现这一点。MapReduce采用了不同的方法。通过限制编程模型,MapReduce框架能够将问题分割成大量的细粒度任务。这些任务会动态地在可用的工作节点上进行调度,以便更快的节点处理更多的任务。受限的编程模型还允许我们在作业结束时调度任务的冗余执行,从而极大地减少了在非均匀性存在的情况下(如慢速或卡住的工作节点)的完成时间。

BAD-FS[5]与MapReduce有着非常不同的编程模型,与MapReduce不同的是,它针对的是在广域网上执行作业。然而,两者有两个基本的相似之处。(1)两个系统都使用冗余执行来从失败引起的数据丢失中恢复。(2)两者都使用地域感知调度来减少在拥塞的网络链路上传输的数据量。

TACC[7]是一个旨在简化高可用网络服务构建的系统。与MapReduce一样,它依赖于重新执行作为实现容错机制的一种方式。

8 结论

MapReduce编程模型在Google被成功地用于许多不同的目的。我们将这一成功归因于几个原因。首先,该模型易于使用,即使对于没有并行和分布式系统经验的程序员来说,因为它隐藏了并行化,容错,地域优化和负载均衡的细节。其次,大量的问题可以很容易地表达为MapReduce计算。例如,MapReduce被用于生成谷歌生产的网络搜索服务的数据,用于排序,数据挖掘,机器学习等许多系统。第三,我们已经开发了一个能够扩展到由数千台机器组成的大型机器群的MapReduce实现。该实现有效地利用了这些机器资源,因此适用于Google遇到的许多大型计算问题。

我们从这项工作中学到了几件事情。首先,限制编程模型使得并行化和分布计算以及使这些计算具备容错性变得容易。其次,网络带宽是一种稀缺资源。因此,我们的系统中的许多优化都针对减少在网络上传输的数据量:地域优化使我们能够从本地磁盘读取数据,并将中间数据写入本地磁盘的单个副本节省了网络带宽。第三,冗余执行可用于减少慢速机器的影响,并处理机器故障和数据丢失。

相关文章:

  • Netty源码三:NioEventLoop创建与run方法
  • Linux ---- Shell编程之正则表达式
  • Java 的 Map 與 List
  • linux新增用户,指定home目录和bash脚本且加入到sudoer列表
  • 从0开始搭建若依微服务项目 RuoYi-Cloud(保姆式教程完结)
  • css3表格练习
  • 不同的强化学习模型适配与金融二级市场的功能性建议
  • 爬虫学习笔记-selenium交互
  • HttpHeaders 源码中headers成员变量为什么声明为final
  • Wireshark网络协议分析 - Wireshark速览
  • 详解SpringCloud微服务技术栈:深入ElasticSearch(1)——数据聚合
  • 设计模式篇---备忘录模式
  • uniapp微信小程序-请求二次封装(直接可用)
  • JavaEE学习笔记 2024-1-25 --VUE的入门使用
  • element ui组件 el-date-picker设置default-time的默认时间
  • 230. Kth Smallest Element in a BST
  • EventListener原理
  • HTTP 简介
  • MYSQL 的 IF 函数
  • Python十分钟制作属于你自己的个性logo
  • React-flux杂记
  • Spring Boot快速入门(一):Hello Spring Boot
  • ViewService——一种保证客户端与服务端同步的方法
  • vue.js框架原理浅析
  • Vue2.0 实现互斥
  • vue-router 实现分析
  • vue从创建到完整的饿了么(18)购物车详细信息的展示与删除
  • 基于web的全景—— Pannellum小试
  • 如何借助 NoSQL 提高 JPA 应用性能
  • 如何邀请好友注册您的网站(模拟百度网盘)
  • 视频flv转mp4最快的几种方法(就是不用格式工厂)
  • 用quicker-worker.js轻松跑一个大数据遍历
  • 优化 Vue 项目编译文件大小
  • 走向全栈之MongoDB的使用
  • Spring第一个helloWorld
  • ​Java并发新构件之Exchanger
  • ​人工智能书单(数学基础篇)
  • # 日期待t_最值得等的SUV奥迪Q9:空间比MPV还大,或搭4.0T,香
  • #我与Java虚拟机的故事#连载07:我放弃了对JVM的进一步学习
  • (html5)在移动端input输入搜索项后 输入法下面为什么不想百度那样出现前往? 而我的出现的是换行...
  • (poj1.2.1)1970(筛选法模拟)
  • (Redis使用系列) Springboot 实现Redis消息的订阅与分布 四
  • (附源码)ssm基于web技术的医务志愿者管理系统 毕业设计 100910
  • (附源码)计算机毕业设计ssm高校《大学语文》课程作业在线管理系统
  • (七)Knockout 创建自定义绑定
  • (七)微服务分布式云架构spring cloud - common-service 项目构建过程
  • (十)c52学习之旅-定时器实验
  • (四)Android布局类型(线性布局LinearLayout)
  • (转)eclipse内存溢出设置 -Xms212m -Xmx804m -XX:PermSize=250M -XX:MaxPermSize=356m
  • (转)Groupon前传:从10个月的失败作品修改,1个月找到成功
  • (转)平衡树
  • (转)重识new
  • * 论文笔记 【Wide Deep Learning for Recommender Systems】
  • .FileZilla的使用和主动模式被动模式介绍
  • .NET : 在VS2008中计算代码度量值