【译】MapReduce: Simplified Data Processing on Large Clusters
本文翻译了论文《MapReduce: Simplified Data Processing on Large Clusters》的全部内容,原论文“科学”下载。作为谷歌的三驾马车之一,是谷歌早期(2004年)在大数据领域的精华作品,对后来的大数据领域其他产品的孵化有深远的影响。同样的,在工程化方面也有很多可以借鉴的地方。阅读愉快,欢迎在留言区勘误和提出意见。
MapReduce:简化大型集群的数据处理
摘要
MapReduce 是一种用于处理和生成大型数据集的编程模型和相关实现。 用户指定处理键值对以生成一组中间键值对的map函数,以及合并与同一中间键关联的所有中间值的reduce函数。 如论文中所示,许多现实世界的任务都可以在这个模型中表达出来。
以这种函数式风格编写的程序会自动并行化并在大型商用机器集群上执行。 运行时系统负责对输入数据进行分区、在一组机器上调度程序执行、处理机器故障以及管理所需的机器间通信等细节。 这使得没有任何并行和分布式系统经验的程序员可以轻松利用大型分布式系统的资源。
我们的 MapReduce 实现在大型商用机器集群上运行,并且具有高度可扩展性:典型的 MapReduce 计算在数千台机器上处理数 TB 级的数据。 程序员发现该系统易于使用:已实施数百个 MapReduce 程序,每天在 Google 集群上执行超过一千个 MapReduce 作业。
1 简介
在过去的五年里,作者和许多其他在谷歌的人已经实现了数百个特殊用途的计算,这些计算处理大量原始数据,例如抓取的文档、网络请求日志等,以计算各种派生数据,例如 作为倒排索引、Web 文档图形结构的各种表示、每个主机抓取的页面数量的摘要、给定日期中最频繁的查询集等。大多数此类计算在概念上都很简单。 然而,输入数据通常很大,计算必须分布在成百上千台机器上才能在合理的时间内完成。 如何并行化计算、分发数据和处理故障的问题合在一起,用大量复杂的代码来处理这些问题,从而掩盖了原来的简单计算。
为了应对这种复杂性,我们设计了一个新的抽象,它允许我们表达我们试图执行的简单计算,但在库中隐藏了并行化、容错、数据分布和负载平衡的杂乱细节。 我们的抽象受到 Lisp 和许多其他函数式语言中存在的 map 和 reduce 原语的启发。 我们意识到,我们的大部分计算都涉及对输入中的每个逻辑record 应用map操作,以计算一组中间键值对,然后对所有共享相同建的值应用reduce操作,以便适当地组合派生数据。 我们使用具有用户指定map和reduce操作的功能模型使我们能够轻松地并行化大型计算,并使用重新执行作为容错的主要机制。
这项工作的主要贡献是一个简单而强大的接口,可以实现大规模计算的自动并行化和分布式,结合该接口的实现,在大型商用 PC 集群上实现高性能。
第 2 节描述了基本的编程模型并给出了几个例子。 第 3 节描述了为我们的基于集群的计算环境量身定制的 MapReduce 接口的实现。 第 4 节描述了几个有用的对编程模型的改进。第 5 节对各种任务的实现进行了性能测量。 第 6 节探讨 MapReduce 在 Google 中的使用,包括我们将其用作重写生产索引系统的基础的经验。 第 7 节讨论相关的和未来的一些工作。
2 编程模型
计算采用一组输入键值对并产生一组输出键值对。 MapReduce 库的用户将计算表示为两个函数:Map 和 Reduce。
由用户编写的 Map 接受一个输入对并生成一组中间键值对。 MapReduce 库将与同一中间键 I 关联的所有中间值组合在一起,并将它们传递给 Reduce 函数。
同样的,也由用户编写的 Reduce 函数接受一个中间键 I 和该键的一组值。 它将这些值合并在一起以形成可能规模较小的一组值。 通常每次 Reduce 调用只产生零个或一个输出值。 中间值通过迭代器提供给用户的reduce 函数。 这允许我们处理太大而无法放入内存的值列表。
2.1 例子
考虑计算大量文档中每个单词出现的次数的问题。 用户将编写类似于以下伪代码的代码:
1 |
|
map 函数发出每个单词加上相关的出现次数(在这个简单的例子中只有 1)。 reduce 函数将针对特定单词发出的所有计数加在一起。
此外,用户编写代码以使用输入和输出文件的名称以及可选的调整参数填充 mapreduce 规范对象。 然后用户调用 MapReduce 函数,将规范对象传递给它。 用户的代码与 MapReduce 库(用 C++ 实现)链接在一起。 附录 A 包含此示例的完整程序文本。
2.2 类型
尽管前面的伪代码是根据字符串输入和输出编写的,但从概念上讲,用户提供的 map 和 reduce 函数具有关联类型:
map (k1,v1) → list(k2,v2)
reduce (k2,list(v2)) → list(v2)
即,输入键和值来自与输出键和值不同的域。 此外,中间键和值与输出键和值来自同一域。
我们的 C++ 实现将字符串传入和传出用户定义的函数,并将它们留给用户代码以在字符串和适当类型之间进行转换。
2.3 更多的例子
下面是一些有趣的程序的简单示例,它们可以很容易地表示为 MapReduce 计算。
分布式 Grep:如果 map 函数与提供的模式匹配,则它会发出一行。 reduce 函数是一个恒等函数,它只是将提供的中间数据复制到输出。
URL访问频率计数:map函数处理网页请求的日志并输出<URL, 1>
。 reduce 函数将同一 URL 的所有值加在一起,并发出一个 <URL, total count>
对。
翻转 Web 链接图:map 函数为指向在名为 source 的页面中找到的目标 URL 的每个链接输出 <target, source>
对。 reduce 函数连接与给定目标 URL 关联的所有源 URL 的列表,并发出对: <target, list(source)>
单主机术语向量:术语向量将出现在一个文档或一组文档中的最重要的词总结为 <词,频率>
对的列表。 map 函数为每个输入文档发出一个 <hostname, term vector>
对(其中主机名是从文档的 URL 中提取的)。 将给定主机的所有每文档术语向量传递给 reduce 函数。 它将这些术语向量加在一起,丢弃不常用的术语,然后发出最终的 <hostname, term vector>
对。
倒排索引:map 函数解析每个文档,并发出一系列 <word, document ID>
对。 reduce 函数接受给定单词的所有对,对相应的文档 ID 进行排序并发出一个 <word, list(document ID)>
对。 所有输出对的集合形成一个简单的倒排索引。 很容易增加这种计算以跟踪单词位置。
分布式排序:map 函数从每条记录中提取键,并发出一个 <key, record>
对。 reduce 函数发出所有对不变。 此计算取决于第 4.1 节中描述的分区工具和第 4.2 节中描述的排序属性。
3 具体实现
MapReduce 接口有许多不同的实现可能。 正确的选择取决于使用环境。 例如,一种实现可能适用于小型共享内存机器,另一种适用于大型 NUMA 多处理器,而另一种适用于更大的联网机器集合。
本节描述了针对 Google 广泛使用的计算环境的实现:通过交换以太网连接在一起的大型商品 PC 集群 [4] 。在我们的环境中:
(1) 机器通常是运行 Linux 的双核 x86 处理器,每台机器有 2-4 GB 的内存。
(2) 使用商用网络硬件-通常在机器级别上为 100 兆位/秒或 1 千兆位/秒,但在整体平分带宽中平均要少得多。
(3) 一个集群由成百上千台机器组成,因此机器故障很常见。
(4) 存储由直接连接到单个机器的廉价 IDE 磁盘提供。内部开发的分布式文件系统 [8] 用于管理存储在这些磁盘上的数据。文件系统使用副本在不可靠的硬件之上提供可用性和可靠性。
(5) 用户向调度系统提交作业。每个作业由一组任务组成,并由调度程序映射到集群中的一组可用机器。
3.1 执行概述
Map 调用,通过自动将输入数据划分为一组 M 个分割,来分布在多台机器上。 输入分片可以由不同的机器并行处理。 通过使用分区函数(例如,hash(key) mod R)将中间键空间划分为 R 块来分布 Reduce 调用。 分区数(R)和分区函数由用户指定。
图 1 显示了我们实现中 MapReduce 操作的整体流程。 当用户程序调用 MapReduce 函数时,会发生以下操作序列(图 1 中的编号标签对应于下表中的编号):
用户程序中的 MapReduce 库首先将输入文件拆分为 M 块,通常为每块 16 兆字节到 64 兆字节 (MB)(可由用户通过可选参数控制)。 然后它会在一组机器上启动该程序的许多副本。
该程序的副本之一是特殊的 - 主副本。 其余的是由主副本分配工作的工作者。 有 M 个 map 任务和 R 个 reduce 任务要分配。 master 挑选空闲的 工作者 并为每个 工作者 分配一个 map 任务或一个 reduce 任务。
分配了 map 任务的 工作者 读取相应输入 split 的内容。 它从输入数据中解析出键值对,并将每一对传递给用户定义的 Map 函数。 Map 函数产生的中间键值对缓存在内存中。
缓冲的对定期写入本地磁盘,由分区函数划分为 R 个区域。 这些缓存对在本地磁盘上的位置被传递回主控,master负责将这些位置转发给reduce工作者。
当 master 将这些位置通知给 reduce 工作者时,它使用远程过程调用从 map 工作者 的本地磁盘读取缓冲数据。 当reduce 工作者 读取所有中间数据时,它会按中间键对其进行排序,以便将所有出现的相同键组合在一起。 需要排序是因为通常许多不同的键映射到同一个 reduce 任务。 如果中间数据量太大而无法放入内存,则使用外部排序。
reduce 工作者遍历排序后的中间数据,对于遇到的每个唯一的中间键,它将键和相应的一组中间值传递给用户的 Reduce 函数。 Reduce 函数的输出被附加到这个 reduce 分区的最终输出文件中。
当所有map任务和reduce任务都完成后,master唤醒用户程序。 此时用户程序中的MapReduce调用返回用户代码。
成功完成后,mapreduce 执行的输出在 R 输出文件中可用(每个 reduce 任务一个,文件名由用户指定)。 通常,用户不需要将这些 R 输出文件合并到一个文件中—他们经常将这些文件作为输入传递给另一个 MapReduce 调用,或者从另一个能够处理分成多个文件的输入的分布式应用程序中使用它们。
3.2 主数据的结构
Master保留了几个数据结构。 对于每个 map 任务和 reduce 任务,它存储状态(空闲、进行中或已完成)和工作机器的身份(对于非空闲任务)。
master 是将中间文件区域的位置从 map 任务传播到 reduce 任务的管道。 因此,对于每个完成的map任务,master都会存储map任务产生的R个中间文件区域的位置和大小。 当map任务完成时,会收到对此位置和大小信息的更新。 信息被增量推送给正在进行的 reduce 任务的工作者。
3.3 容错
由于 MapReduce 库旨在帮助使用成百上千台机器处理大量数据,因此该库必须优雅地容忍机器故障。
工作者失败
Master会定期检查每个工作者。 如果在一定时间内没有收到工作者的响应,则master会将工作者标记为失败。 工作者完成的任何map任务都将重置为其初始空闲状态,因此有资格在其他工作者上进行调度。 类似地,在失败的工作者上正在进行的任何 map 任务或 reduce 任务也被重置为空闲状态并有资格重新调度。
已完成的map任务会在失败时重新执行,因为它们的输出存储在故障机器的本地磁盘上,因此无法访问。 完成的reduce 任务不需要重新执行,因为它们的输出存储在全局文件系统中。
当一个map任务首先由 A 工作者执行,然后由B工作者执行时(因为A失败了),所有执行reduce任务的工作者都会收到重新执行的通知。 任何尚未从 A 工作者读取数据的 reduce 任务将从B 工作者读取数据。
MapReduce 对大规模的工作者故障具有弹性。 例如,在一次 MapReduce 操作期间,正在运行的集群上的网络维护导致一次 80 台机器的组在几分钟内无法访问。 MapReduce master 简单地重新执行了无法访问的工作者机器所做的工作,并继续向前推进,最终完成了 MapReduce 操作。
Master失败
很容易让 master 定期写入上述 master 数据结构的检查点。 如果master任务挂了,可以从最后一个检查点状态开始一个新的副本。 但是,考虑到只有一个master,它的失败可能性不大; 因此,如果 master 失败,我们当前的实现会中止 MapReduce 计算。 客户端可以检查这种情况并根据需要重试 MapReduce 操作。
出现故障时的语义
当用户提供的 map 和 reduce 运算符是其输入值的确定性函数时,我们的分布式实现产生的输出与整个程序的无故障顺序执行产生的输出相同。
我们依靠 map 和reduce任务输出的原子性提交来实现此属性。 每个正在进行的任务将其输出写入私有临时文件。 一个reduce 任务生成一个这样的文件,一个map 任务生成R 个这样的文件(每个reduce 任务一个)。 当 map 任务完成时,工作者 会向 master 发送一条消息,并在消息中包含 R 个临时文件的名称。 如果 master 收到一个已经完成的 map 任务的完成消息,它会忽略该消息。 否则,它会在master的数据结构中记录 R 文件的名称。
当 reduce 任务完成时,reduce 工作者 原子性地将其临时输出文件重命名为最终输出文件。 如果在多台机器上执行同一个reduce 任务,将会对同一个最终输出文件执行多次重命名调用。 我们依靠底层文件系统提供的原子性重命名操作来保证最终的文件系统状态只包含一次reduce任务执行产生的数据。
我们的绝大多数 map 和 reduce 运算符都是确定性的,而且我们的语义等同于顺序执行,在这种情况下,让程序员很容易推理他们的程序的行为。 当 map 和/或 reduce 运算符是不确定的时,我们提供较弱但仍然合理的语义。 在存在非确定性运算符的情况下,特定reduce 任务R1 的输出等效于非确定性程序的顺序执行所产生的R1 的输出。 然而,不同reduce 任务R2 的输出可能对应于由非确定性程序的不同顺序执行产生的R2 输出。
考虑map任务 M 和reduce任务 R1 和 R2。 设 e(Ri) 是已提交的 Ri 的执行(恰好有一个这样的执行)。 较弱的语义出现是因为 e(R1) 可能读取了 M 的一次执行产生的输出,而 e(R2) 可能读取了 M 的不同执行产生的输出。
3.4 局部性
在我们的计算环境中,网络带宽是一种相对稀缺的资源。 我们利用输入数据(由 GFS [8] 管理)存储在构成我们集群的机器的本地磁盘上这一事实来节省网络带宽。 GFS 将每个文件分成 64 MB 的块,并在不同的机器上存储每个块的多个副本(通常为 3 个副本)。 MapReduce master 考虑输入文件的位置信息,并尝试在包含相应输入数据副本的机器上安排map任务。 如果失败,它会尝试在该任务的输入数据的副本附近安排 map 任务(例如,在与包含数据的机器位于同一网络交换机上的工作机器上)。 在集群中的大部分工作程序上运行大型 MapReduce 操作时,大多数输入数据在本地读取并且不消耗网络带宽。
3.5 任务粒度
如上所述,我们将 map 阶段细分为 M 个部分,将 reduce 阶段细分为 R 个部分。 理想情况下,M 和 R 应该比工作机器的数量大得多。 让每个 worker 执行许多不同的任务可以改善动态负载均衡,并在 worker 失败时加快恢复速度:它已完成的许多 map 任务可以分布在所有其他 worker 机器上。
在我们的实现中 M 和 R 的大小有实际限制,因为主机必须做出 O(M + R) 调度决策,并在内存中保持 O(M ∗ R) 状态,如上所述。 (内存使用的常数因素很小,但是:状态的 O(M * R) 部分由每个 map 任务/reduce 任务对大约一个字节的数据组成。)
此外,R 经常受到用户的限制,因为每个 reduce 任务的输出最终都在一个单独的输出文件中。 在实践中,我们倾向于选择 M,这样每个单独的任务大约是 16 MB 到 64 MB 的输入数据(这样上面描述的局部优化最有效),并且我们使 R 是我们期待使用的工作机器数量的一个小倍数。 我们经常使用 2,000 台工作机器执行 M = 200, 000 和 R = 5, 000 的 MapReduce 计算。
3.6 备份任务
拖累 MapReduce 操作总时间的一个常见原因是“掉队者”:一台机器需要非常长的时间来完成计算中的最后几个 map 或 reduce 任务之一。 掉队者的出现有很多原因。 例如,磁盘损坏的机器可能会经常遇到可纠正的错误,从而将读取性能从 30 MB/s 降低到 1 MB/s。 集群调度系统可能已经在机器上调度了其他任务,导致它由于 CPU、内存、本地磁盘或网络带宽的竞争而更慢地执行 MapReduce 代码。 我们最近遇到的一个问题是机器初始化代码中的一个错误,它导致处理器缓存被禁用:受影响机器上的计算速度减慢了 100 多倍。
我们有一个通用的机制来缓解掉队者的问题。 当 MapReduce 操作接近完成时,master 会调度剩余正在进行任务的备份执行。 每当主要或备份执行完成时,该任务就会被标记为已完成。 我们已经调整了这种机制,以便它通常将操作使用的计算资源增加不超过几个百分点。 我们发现这显着减少了完成大型 MapReduce 操作的时间。 例如,当备份任务机制被禁用时,第 5.3 节中描述的排序程序需要增加 44% 的时间才能完成。
4 优化
虽然通过简单地编写 Map 和 Reduce 函数提供的基本功能足以满足大多数需求,但我们发现一些扩展很有用。 这些在本节中进行了描述。
4.1 分区功能
MapReduce 的用户指定他们想要的 reduce 任务/输出文件的数量(R)。 使用中间键上的分区函数在这些任务中对数据进行分区。 提供了使用散列的默认分区函数(例如“hash(key) mod R”)。 这往往会产生相当均衡的分区。 但是,在某些情况下,通过键的某些其他功能对数据进行分区是有用的。 例如,有时输出键是 URL,我们希望单个主机的所有条目都在同一个输出文件中结束。 为了支持这种情况,MapReduce 库的用户可以提供一个特殊的分区功能。 例如,使用“hash(Hostname(urlkey)) mod R”作为分区函数会使得来自同一主机的所有 URL 最终出现在同一个输出文件中。
4.2 顺序保证
我们保证在给定的分区内,中间键值对按递增的键顺序进行处理。 这种排序保证使得为每个分区生成一个排序的输出文件变得容易,这在输出文件格式需要支持通过键进行高效随机访问查找时非常有用,或者输出的用户发现对数据进行排序很方便。
4.3 组合器功能
在某些情况下,每个 map 任务产生的中间键都有明显的重复,用户指定的 Reduce 函数是可交换和关联的。 一个很好的例子是第 2.1 节中的单词计数示例。 由于词频倾向于遵循齐夫定律分布,因此每个 map 任务都会产生成百上千条 <the, 1>
形式的记录。 所有这些计数都将通过网络发送到单个 reduce 任务,然后由 Reduce 函数相加以产生一个数字。 我们允许用户指定一个可选的组合器函数,该函数在通过网络发送数据之前对数据进行部分合并。
组合器功能在执行map任务的每台机器上运行。 通常,相同的代码用于实现组合器和reduce函数。 reduce 函数和组合器函数之间的唯一区别是 MapReduce 库如何处理函数的输出。 reduce 函数的输出被写入最终的输出文件。 组合器函数的输出被写入一个中间文件,该文件将被发送到一个reduce 任务。
部分组合显著加快了某些类型的 MapReduce 操作。 附录 A 包含一个使用组合器的示例。
4.4 输入和输出类型
MapReduce 库支持读取多种不同格式的输入数据。 例如,“文本”模式输入将每一行视为一个键值对:键是文件中的偏移量,值是该行的内容。 另一种普遍支持的格式存储按键排序的键值对序列。 每个输入类型的实现都知道如何将自身拆分为有意义的范围以作为单独的map任务进行处理(例如,文本模式的范围拆分确保范围拆分仅发生在行边界处)。 用户可以通过提供简单的读取接口的实现来添加对新输入类型的支持,尽管大多数用户只使用少数预定义输入类型中的一种。
读取器不一定需要提供从文件中读取的数据。 例如,很容易定义一个读取器从数据库或映射到内存中的数据结构中读取记录。
以类似的方式,我们支持一组输出类型以生成不同格式的数据,并且用户代码很容易添加对新输出类型的支持。
4.5 副作用
在某些情况下,MapReduce 的用户发现从他们的 map 和/或 reduce 运算符生成辅助文件作为附加输出很方便。 我们依靠应用程序的写入来使这种副作用实现原子化和幂等。 通常,应用程序写入一个临时文件并在该文件完全生成后自动重命名该文件。
我们不支持由单个任务生成的多个输出文件的原子性两阶段提交。 因此,生成具有跨文件一致性要求的多个输出文件的任务应该是确定性的。 这种限制在实践中从未成为问题。
4.6 忽略不好的记录
有时,用户代码中的错误会导致 Map 或 Reduce 函数在某些记录上确定性崩溃。 此类错误会阻止 MapReduce 操作完成。 通常的做法是修复错误,但有时这是不可行的; 可能该错误位于无法获得源代码的第三方库中。 此外,有时忽略一些记录也是可以接受的,例如在对大型数据集进行统计分析时。 我们提供了一种可选的执行模式,其中 MapReduce 库检测哪些记录导致确定性崩溃并跳过这些记录以取得进展。
每个工作者进程都安装一个信号处理程序来捕获分段违规和总线错误。 在调用用户 Map 或 Reduce 操作之前,MapReduce 库将参数的序列号存储在全局变量中。 如果用户代码生成一个信号,信号处理程序会向 MapReduce 主机发送一个包含序列号的“last gasp”UDP 数据包。 当master在特定记录上看到不止一个失败时,表示在下一次重新执行相应的Map或Reduce任务时应该跳过该记录。
4.7 本地执行
在 Map 或 Reduce 函数中调试问题可能很棘手,因为实际计算发生在分布式系统中,通常在数千台机器上,工作分配决策由master动态做出。 为了帮助调试、分析和小规模测试,我们开发了 MapReduce 库的替代实现,它在本地机器上按顺序执行 MapReduce 操作的所有工作。 向用户提供控件,以便将计算限制为特定的map任务。 用户使用特殊标志调用他们的程序,然后可以轻松使用他们认为有用的任何调试或测试工具(例如 gdb)。
4.8 状态信息
master 运行一个内部 HTTP 服务器并导出一组状态页面供人使用。 状态页面显示计算的进度,例如有多少任务已经完成,有多少正在进行中,输入字节数,中间数据字节数,输出字节数,处理率等。这些页面还包含指向 每个任务生成的标准错误和标准输出文件。 用户可以使用这些数据来预测计算需要多长时间,以及是否应该将更多资源添加到计算中。 这些页面还可用于确定计算何时比预期慢得多。
此外,顶层状态页面会显示哪些 worker 失败了,以及他们在失败时正在处理哪些 map 和 reduce 任务。 在尝试诊断用户代码中的错误时,此信息很有用。
4.9 计数器
MapReduce 库提供了一个计数器工具来计算各种事件的发生次数。 例如,用户代码可能想要计算处理的单词总数或索引的德语文档数等。
为了使用这个工具,用户代码创建一个命名的计数器对象,然后在 Map 和/或 Reduce 函数中适当地增加计数器。 例如:
1 |
|
来自各个工作者机器的计数器值会定期传播到主机器(搭载在 ping 响应上)。 master 聚合来自成功的 map 和 reduce 任务的计数器值,并在 MapReduce 操作完成时将它们返回给用户代码。 当前计数器值也显示在主状态页面上,以便人们可以查看实时计算的进度。 聚合计数器值时,master 会消除重复执行同一 map 或 reduce 任务的影响,以避免重复计数。 (重复执行可能源于我们使用备份任务以及由于失败而重新执行任务。)
MapReduce 库会自动维护一些计数器值,例如处理的输入键值对的数量和生成的输出键值对的数量。
用户发现计数器工具对于检查 MapReduce 操作的行为非常有用。 例如,在某些 MapReduce 操作中,用户代码可能希望确保生成的输出对的数量与处理的输入对的数量完全相等,或者处理的德国文档的比例在文档总数的某个可容忍的范围内处理。
5 性能
在本节中,我们测量 MapReduce 在大型机器集群上运行的两个计算的性能。 一项计算搜索大约 1 TB 的数据以寻找特定模式。 另一个计算对大约 1 TB 的数据进行排序。
这两个程序代表了 MapReduce 用户编写的真实程序的一大子集——一类程序将数据从一种表示形式混洗到另一种表示形式,另一类程序从大数据集中提取少量有趣的数据。
5.1 集群配置
所有程序都在由大约 1800 台机器组成的集群上执行。 每台机器都有两个支持超线程的 2GHz Intel Xeon 处理器、4GB 内存、两个 160GB IDE 磁盘和一个千兆以太网链接。 这些机器被安排在一个两级树形交换网络中,在根节点有大约 100-200 Gbps 的总带宽可用。 所有机器都在同一个托管设施中,因此任何一对机器之间的往返时间都小于一毫秒。
在 4GB 内存中,集群上运行的其他任务保留了大约 1-1.5GB。 这些程序是在一个周末下午执行的,当时 CPU、磁盘和网络大多处于空闲状态。
5.2 Grep
grep 程序扫描 10^10 条 100 字节的记录,搜索相对少见的三字符模式(该模式出现在 92,337 条记录中)。 输入被分成大约 64MB 的部分 (M = 15000),整个输出被放置在一个文件中 (R = 1)。
图 2 显示了计算随时间的进度。 Y 轴显示扫描输入数据的速率。 随着更多的机器被分配给这个 MapReduce 计算,速率逐渐加快,当分配了 1764 名工作人员时,速率会达到峰值超过 30 GB/s。 当map任务完成时,速率开始下降并在计算大约 80 秒后达到零。 整个计算从开始到结束大约需要 150 秒。 这包括大约一分钟的启动开销。 开销是由于程序传播到所有工作机器,以及与 GFS 交互的延迟以打开 1000 个输入文件的集合并获得局部优化所需的信息。
5.3 排序
排序程序对 10^10 条 100 字节的记录(大约 1 TB 的数据)进行排序。 该程序以 TeraSort 基准 [10] 为模型。
排序程序由少于 50 行的用户代码组成。 三行 Map 函数从文本行中提取一个 10 字节的排序键,并将键和原始文本行作为中间键值对发出。 我们使用内置的 Identity 函数作为 Reduce 运算符。 该函数将中间键值对作为输出键值对原样传递。 最终排序的输出被写入一组 2 路复制 GFS 文件(即,2 TB 被写入作为程序的输出)。
和以前一样,输入数据被分成 64MB 块 (M = 15000)。 我们将排序后的输出划分为 4000 个文件(R = 4000)。 分区函数使用键的初始字节将其分隔为 R 个块之一。
我们针对该基准测试的分区函数具有键分布的内置知识。 在一般的排序程序中,我们会添加一个预传递 MapReduce 操作,该操作将收集键的样本并使用采样键的分布来计算最终排序传递的分割点。
图3(a)显示了排序程序正常执行的进度。 左上角的图表显示了读取输入的速率。 由于所有map任务都在 200 秒之前完成,因此速率在大约 13 GB/s 时达到峰值并很快消失。 请注意,输入速率小于 grep。 这是因为排序map任务花费大约一半的时间和 I/O 带宽将中间输出写入其本地磁盘。 grep 的相应中间输出的大小可以忽略不计。
左中图显示了数据通过网络从 map 任务发送到 reduce 任务的速率。 一旦第一个map任务完成,这种洗牌就会开始。 图中的第一个驼峰是第一批大约 1700 个 reduce 任务(整个 MapReduce 分配了大约 1700 台机器,每台机器一次最多执行一个 reduce 任务)。 在计算开始大约 300 秒后,其中一些第一批 reduce 任务完成,我们开始为剩余的 reduce 任务打乱数据。 所有的洗牌都在大约 600 秒的计算中完成。
左下角的图表显示了reduce任务将排序数据写入最终输出文件的速率。 由于机器忙于对中间数据进行排序,因此在第一次混洗期结束和写入期开始之间存在延迟。 写入会以大约 2-4 GB/s 的速度持续一段时间。 所有写入在计算中完成大约 850 秒。 包括启动开销在内,整个计算需要 891 秒。 这类似于 TeraSort 基准测试的当前最佳报告结果 1057 秒 [18]。
需要注意的几点:由于我们的局部优化,输入速率高于混洗速率和输出速率——大多数数据是从本地磁盘读取并绕过我们相对带宽受限的网络。 shuffle 率高于输出率,因为输出阶段写入了排序数据的两个副本(出于可靠性和可用性的原因,我们制作了输出的两个副本)。 我们写了两个副本,因为这是我们底层文件系统提供的可靠性和可用性机制。 如果底层文件系统使用纠删码 [14] 而不是复制,则写入数据的网络带宽要求将会降低。
5.4 备份任务的效果
在图 3 (b) 中,我们展示了在禁用备份任务的情况下执行排序程序。 执行流程类似于图 3 (a) 中所示的流程,不同之处在于有一个非常长的尾巴,几乎没有任何写入活动发生。 960 秒后,除 5 个 reduce 任务之外的所有任务都完成了。 然而,这最后几个落后者直到 300 秒后才完成。 整个计算耗时 1283 秒,所用时间增加了 44%。
5.5 机器故障
在图 3 (c) 中,我们展示了排序程序的执行,其中我们在计算几分钟后有意杀死了 1746 个工作进程中的 200 个。 底层集群调度器立即在这些机器上重新启动新的工作进程(因为只有进程被杀死,机器仍然正常运行)。
工作者终止显示为负输入率,因为一些先前完成的地map工作消失了(因为相应的map工作者被杀死)并且需要重做。 此map工作的重新执行发生得相对较快。 整个计算在 933 秒内完成,包括启动开销(仅比正常执行时间增加 5%)。
6 经验
我们在 2003 年 2 月编写了 MapReduce 库的第一个版本,并在 2003 年 8 月对其进行了重大改进,包括局部优化、跨工作机任务执行的动态负载均衡等。从那时起,我们一直惊讶于 MapReduce 库对我们处理的各种问题的广泛适用性。 它已在 Google 内的广泛使用,包括:
- 大规模的机器学习问题
- Google 新闻和Froogle产品的集群问题
- 提取数据用于生成流行的查询报告(例如 Google Zeitgeist),
- 为新的实验和产品提取网页的属性(例如,从大型网页语料库中提取地理位置以进行本地化搜索),以及
- 大规模的图计算
图 4 显示了随着时间的推移,使用我们主要源代码管理系统的独立 MapReduce 程序的数量有显著的增长,从 2003 年初的 0 到截至 2004 年 9 月下旬的近 900 个独立实例。 MapReduce 如此成功,因为它使之成为可能,编写一个简单的程序,并在半小时内在上千台机器上高效运行,大大加快了开发和原型制作周期。 此外,它允许没有分布式和/或并行系统经验的程序员轻松利用大量资源。
在每个作业结束时,MapReduce 库会记录有关作业使用的计算资源的统计信息。 在表 1 中,我们显示了 2004 年 8 月在 Google 运行的 MapReduce 作业子集的一些统计数据。
6.1 大规模索引
迄今为止,我们对 MapReduce 最重要的用途之一是完全重写生产索引系统,该系统生成用于 Google 网络搜索服务的数据结构。 索引系统将我们的爬行系统检索到的大量文档作为输入,存储为一组 GFS 文件。 这些文档的原始内容超过 20 TB 的数据。 索引过程以五到十次 MapReduce 操作的顺序运行。 使用 MapReduce(而不是先前版本的索引系统中的临时分布式传递)提供了几个好处:
索引代码更简单、更小、更容易理解,因为处理容错、分布和并行化的代码隐藏在 MapReduce 库中。 例如,当使用 MapReduce 表示时,计算的一个阶段的大小从大约 3800 行 C++ 代码减少到大约 700 行。
MapReduce 库的性能足够好,我们可以将概念上不相关的计算分开,而不是将它们混合在一起以避免额外的数据传递。 这使得更改索引过程变得容易。 例如,在我们的旧索引系统中需要几个月才能完成的一项更改在新系统中只需要几天时间就可以实现。
索引过程变得更容易操作,因为大多数由机器故障、机器缓慢和网络故障引起的问题都由 MapReduce 库自动处理,无需程序员干预。 此外,通过向索引集群添加新机器可以轻松提高索引过程的性能。
7 相关工作
许多系统提供了受限的编程模型,并使用这些限制来自动并行化计算。 例如,可以使用并行前缀计算 [6] [9] [13] 在 N 个处理器上以 log N 时间在 N 元素数组的所有前缀上计算关联函数。 MapReduce 可以被视为基于我们在大型现实世界计算中的经验对其中一些模型的简化和提炼。 更重要的是,我们提供了可扩展到数千个处理器的容错实现。 相比之下,大多数并行处理系统仅在较小规模上实现,并将处理机器故障的细节留给程序员。
批量同步编程 [17] 和一些 MPI 原语 [11] 提供了更高级别的抽象,使程序员更容易编写并行程序。 这些系统与 MapReduce 之间的主要区别在于 MapReduce 利用受限编程模型来自动并行化用户程序并提供透明的容错。
我们的局部优化从诸如活动磁盘 [12] [15] 等技术中汲取灵感,其中计算被推送到靠近本地磁盘的处理元素中,以减少通过 I/O 子系统或网络发送的数据量。 我们在少量磁盘直接连接的商用处理器上运行,而不是直接在磁盘控制器处理器上运行,但一般方法是相似的。
我们的备份任务机制类似于Charlotte系统 [3] 中采用的急切调度机制。 简单的急切调度的缺点之一是,如果给定的任务导致重复失败,则整个计算无法完成。 我们使用跳过不良记录的机制修复了此问题的一些实例。
MapReduce 实现依赖于内部集群管理系统,该系统负责在大量共享机器上分发和运行用户任务。 尽管不是本文的重点,但集群管理系统在精神上与 Condor [16] 等其他系统相似。
作为 MapReduce 库一部分的排序工具在操作上类似于 NOW-Sort [1]。 源机器(map 工作者)对要排序的数据进行分区并将其发送给某一个 R reduce 工作者。 每个reduce工作者在本地(如果可能,在内存中)对其数据进行排序。 当然,NOW-Sort 没有用户可定义的 Map 和 Reduce 函数,使我们的库广泛适用。
River [2] 提供了一种编程模型,其中进程通过在分布式队列上发送数据来相互通信。 与 MapReduce 一样,即使存在由异构硬件或系统扰动引入的非均匀性,River 系统也试图提供良好的均衡性能。 River 通过仔细安排磁盘和网络传输以实现平衡的完成时间来实现这一点。 MapReduce 使用不同的方法。 通过限制编程模型,MapReduce 框架能够将问题划分为大量细粒度的任务。 这些任务在可用的工作者上动态安排,以便更快的工作者处理更多任务。 受限编程模型还允许我们在接近作业结束时安排任务的冗余执行,这在存在不均匀性(例如缓慢或卡住的工作者)的情况下大大减少了完成时间。
BAD-FS [5] 具有与 MapReduce 截然不同的编程模型,并且与 MapReduce 不同,它的目标是跨广域网执行作业。 然而,有两个基本的相似之处。 (1) 两个系统都使用冗余执行来从故障导致的数据丢失中恢复。 (2) 两者都使用局部感知调度来减少通过拥塞的网络链接发送的数据量。
TACC [7] 是一个旨在简化高可用网络服务构建的系统。 与 MapReduce 一样,它依赖重新执行作为实现容错的机制。
8 结论
MapReduce 编程模型已在 Google 成功用于许多不同的目的。 我们将这一成功归因于几个原因。 首先,该模型易于使用,即使对于没有并行和分布式系统经验的程序员也是如此,因为它隐藏了并行化、容错、局部优化和负载均衡的细节。 其次,各种各样的问题很容易表达为 MapReduce 计算。 例如,MapReduce 用于为 Google 的生产网络搜索服务、排序、数据挖掘、机器学习和许多其他系统生成数据。 第三,我们开发了 MapReduce 的实现,它可以扩展到包含数千台机器的大型机器集群。 该实现有效地利用了这些机器资源,因此适用于 Google 遇到的许多大型计算问题。
我们从这项工作中学到了一些东西。 首先,限制编程模型可以很容易地并行化和分布计算并使此类计算具有容错性。 其次,网络带宽是一种稀缺资源。 因此,我们系统中的许多优化旨在减少通过网络发送的数据量:局部性优化允许我们从本地磁盘读取数据,将中间数据的单个副本写入本地磁盘可以节省网络带宽。 第三,冗余执行可用于减少慢速机器的影响,并处理机器故障和数据丢失。
致谢
Josh Levenberg 根据他使用 MapReduce 的经验和其他人的增强建议,在修改和扩展用户级 MapReduce API 方面发挥了重要作用,并提供了许多新功能。 MapReduce 从 Google 文件系统读取其输入并将其输出写入 Google 文件系统 [8]。我们要感谢 Mohit Aron、Howard Gobioff、Markus Gutschke、David Kramer、Shun-Tak Leung 和 Josh Redstone 在开发 GFS 方面所做的工作。我们还要感谢 Percy Liang 和 Olcan Sercinoglu 在开发 MapReduce 使用的集群管理系统方面所做的工作。 Mike Burrows、Wilson Hsieh、Josh Levenberg、Sharon Perl、Rob Pike 和 Debby Wallach 对本文的早期草稿提供了有益的评论。匿名的 OSDI 审稿人和我们的牧羊人 Eric Brewer 就论文可以改进的领域提供了许多有用的建议。最后,我们感谢 Google 工程组织中所有 MapReduce 用户提供有用的反馈、建议和错误报告。
附录A 词频
本节包含一个程序,用于计算命令行上指定的一组输入文件中每个唯一单词的出现次数。
1 |
|
参考
- Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau,David E. Culler, Joseph M. Hellerstein, and David A. Patterson. High-performance sorting on networks of workstations. In Proceedings of the 1997 ACM SIGMOD International Conference on Management of Data, Tucson,Arizona, May 1997. ↩
- Remzi H. Arpaci-Dusseau, Eric Anderson, NoahTreuhaft, David E. Culler, Joseph M. Hellerstein, DavidPatterson, and Kathy Yelick. Cluster I/O with River:Making the fast case common. In Proceedings of the SixthWorkshop on Input/Output in Parallel and DistributedSystems (IOPADS ’99), pages 10–22, Atlanta, Georgia,May 1999. ↩
- Arash Baratloo, Mehmet Karaul, Zvi Kedem, and PeterWyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Paralleland Distributed Computing Systems, 1996. ↩
- Luiz A. Barroso, Jeffrey Dean, and Urs Holzle. ¨ Websearch for a planet: The Google cluster architecture. IEEEMicro, 23(2):22–28, April 2003. ↩
- John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau,Remzi H. Arpaci-Dusseau, and Miron Livny. Explicitcontrol in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on NetworkedSystems Design and Implementation NSDI, March 2004. ↩
- Guy E. Blelloch. Scans as primitive parallel operations.IEEE Transactions on Computers, C-38(11), November1989. ↩
- Armando Fox, Steven D. Gribble, Yatin Chawathe,Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACMSymposium on Operating System Principles, pages 78–91, Saint-Malo, France, 1997. ↩
- Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29–43, Lake George,New York, 2003. ↩
- S. Gorlatch. Systematic efficient parallelization of scanand other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par’96.Parallel Processing, Lecture Notes in Computer Science1124, pages 401–408. Springer-Verlag, 1996. ↩
- Jim Gray. Sort benchmark home page.http://research.microsoft.com/barc/SortBenchmark/. ↩
- William Gropp, Ewing Lusk, and Anthony Skjellum.Using MPI: Portable Parallel Programming with theMessage-Passing Interface. MIT Press, Cambridge, MA,1999. ↩
- L. Huston, R. Sukthankar, R. Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX Fileand Storage Technologies FAST Conference, April 2004. ↩
- Richard E. Ladner and Michael J. Fischer. Parallel prefixcomputation. Journal of the ACM, 27(4):831–838, 1980. ↩
- Michael O. Rabin. Efficient dispersal of information forsecurity, load balancing and fault tolerance. Journal ofthe ACM, 36(2):335–348, 1989. ↩
- Erik Riedel, Christos Faloutsos, Garth A. Gibson, andDavid Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68–74, June 2001. ↩
- Douglas Thain, Todd Tannenbaum, and Miron Livny.Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004. ↩
- L. G. Valiant. A bridging model for parallel computation.Communications of the ACM, 33(8):103–111, 1997. ↩
- Jim Wyllie. Spsort: How to sort a terabyte quickly.http://alme1.almaden.ibm.com/cs/spsort.pdf. ↩