mit 6.824 学习笔记 (一) MapReduce 详解

0x00 序

本来前年的这个时候就打算刷这个课了,但是在抗击懒癌的过程中节节败退。一直就拖到了现在,也不知道这次能坚持多久。总之争取每个月能出一篇文章吧。主要是为了克服自己对于知识的遗忘,如果同时能给你带来一些启发或者帮助那就更好了。考虑到自己吃了没文化的亏,为了能在某些时候能读到最新的一些技术文章或者论文。现在就必须克服读英语的障碍,我会强迫自己去理解并翻译原文,参考其他文章的理解,参考讲义,最后总结出来一篇文章。

0x01 关于分布式系统

什么是分布式系统?

许多协同工作的计算机的计算机组成的系统是分布式系统,比如大型网站的存储系统,MapReduce,p2p 等等

如何分布式

  • 将物理上独立的个体组织到一起工作
  • 使用隔离策略来实现安全
  • 使用副本技术来容错
  • 使用水平扩展 cpu/mem/disk/net 来提高吞吐

缺点

  • 复杂: 许多并发的部分
  • 必须要处理局部失败
  • 很难了解潜在的性能

0x02 MapReduce简介

什么是MapReduce

MapReduce 是一种编程模型 论文原文发表在osdi上,MapReduce分为Map 和 Reduce 两个过程,Map读取巨大数据集,处理后产生中间结果,Reduce则读取Map产生的中间结果,并将结果输出。

注:MapReduce是一种编程模型,MapReduce库则是MapReduce编程模型的实现,可以隐藏掉具体的分布式系统的细节。下文中提到的MapReduce 包含了 MapReduce 编程模型和MapReduce库 两个内容,需要注意区分。

MapReduce产生的背景

  • 单机难以处理海量的数据,数据需要在一定时间内完成。
  • 程序员很多都没有分布式系统相关的经验,开发分布式处理系统非常困难。
  • 恰好MapReduce编程模型能够描述许多现实生活中的计算场景。

MapReduce 在Google

  • 通用计算机组成的集群
  • 每天有数百个任务在集群中运行
  • 已经实现了数百个MapReduce程序
  • 一个典型的程序使用数百台机器处理数TB的数据
  • 程序员很容易上手

MapReduce库设计目标

  • 隐藏并行处理的细节
  • 隐藏错误处理
  • 隐藏负载均衡及调度任务
  • 隐藏数据分发的细节
  • 隐藏集群通讯的细节
  • 提供简单的接口 Map Reduce
  • 能够自动将MapReduce程序运行到一个巨大的集群中,利用集群计算资源。

用户如何使用MapReduce编程

  • 使用MapReduce库 实现 Map 和 Reduce方法并且准备好数据即可。

实现

MapReduce有很多种实现本文的实现是针对大规模器群的实现.
集群规格如下:

  1. dual-processor x86 处理器 linux 2-4G内存
  2. 100Mb网卡 或者1gb网卡 但是带宽使用量远小于一半
  3. 集群由数百或者数千台机器组成,所以失败是常态。
  4. 提供的存储是便宜的 IDE硬盘直接连接到机器上,google内部开发的分布式存储系统来管理这些磁盘,在这些不稳定的硬件上提供了稳定和可靠。
  5. 用户向调度系统提交job,每个job由一组任务构成,这些任务会被提交到集群内一组可用的机器内。

0x03 MapReduce 执行的过程

image

上图展示了MapReduce如何执行

首先我们来回答下面的问题

  • 用户需要实现的部分

    • Map 方法
    • Reduce 方法
  • 集群中的角色分类

    • Master 负责任务调度(分配任务,重新执行,调度等)
    • Worker 负责运行 Map 任务 或者 Reduce 任务
  • worker 运行的任务分类

    • Map 任务: 每个Map 任务读取部分输入 产生中间的k v 数据
    • Reduce 任务: 读取map 产生的中间 k v 数据每个Reduce 产出一个输出文件

kv 数据

可能是 <url,访问次数>,也可能是<单词,出现次数>,还可能是<行号,行内容> 总之map方法的输出,和reduce方法的输入都这这种格式。

Map 和 Reduce 方法在执行过程中会自动被划分为多个任务,每个任务会被调度到一个Worker上进行执行。部分处理快的Worker可以执行多个任务(不能同时执行,一定要一个任务完成后才能执行另外的任务。)

  1. MapReduce程序负责将用户的输入划分为M块 16M ~ 64M 的块大小。通过划分函数(hash(key) mod R) 会把Map中间数据划分为R个分区。
  2. 将程序复制到集群中的各个需要运行的机器上并启动
  3. Master 给空闲的机器分配Map 或者Reduce 任务,由于(1) 中说输入文件被划分了M块,分区函数 mod R 所以此时Map任务被划分为了M个任务,Reduce任务被划分了R个分区,同时最终结果也会产生 <= R 个最终输出的文件
  4. 执行Map任务的worker读取相应的输入块,解析后发送给用户自定义的Map程序,用户Map程序将处理后的中间结果保存在内存当中。
  5. 保存在内存中的中间结果会定期的被根据分区函数划分为R个区域写入本地磁盘,本地磁盘保存的位置信息会被传输到Master,Master将这些partation位置信息转发到Reduce 的worker。
  6. Reduce worker 接收到这些位置信息后会通过RPC调用从Map Worker的磁盘中读取相应partation的中间结果,当Reduce读取了所有的中间结果的之后将按照key进行一次排序,因为多个worker任务产生的中间结果会被同一个Reduce worker 读取,所以为了保证结果有序还需要重新排序一次。
  7. reduce worker 遍历排序过的中间数据,给每个遇到的唯一的中间key,将这个key和对应的value传递到用户的reduce 方法中。reduce 方法的输出会被添加到这个分区最终输出文件中。
  8. 所有任务结束后会产生R个输出文件,不需要合并。

0x04 MapReduce数据模型

原文中先介绍的数据模型,我觉得介绍完执行过程后再介绍数据模型会更容易理解数据模型是怎么运行的。

我们来看一段统计单词个数的伪代码如下所示:

1
2
3
4
5
6
Map(k, v)
split v into words
for each word w
emit(w, "1")
Reduce(k, v)
emit(len(v))

我们可以看到map方法做的就是从输入的字符串把出入的字符串拆分为单词形式为 <xxx word,1> 表示某个单词出现了一次。

reduce方法 做的就是从一组map中获取到的中间key由于相同的key会合并最后产生的是 <xxx word,[1,1,1,1,1,1]> 这种形式。循环迭代将xxx word的值累加起来。

这就是一次典型的MapReduce的处理代码。
至于数据读取部分则是由MapReduce库提供。

更多例子

  • 分布式grep

map 方法产生一行,如果这行与提供的规则匹配,reduce 方法仅仅提供将中间结果复制到输出即可。

  • 统计url 访问频率

map方法处理网页请求日志并且输出<url,1> ,reduce方法将所有具有相同url的值累加并添加到<url, total count>中

  • web反向链接图

map方法给每个 在每个源页面中找到了目标url 则 输出 <target, source> ,reduce 方法将所有的源地址连接到目标url输出<target, list(source)>

  • 每个域名的 term-vector

term vector 将一篇文档或者一组文档中最重要的单词描述为<word,frequency> ,map 方法为每个输入的文档产生一个<hostname, term vector>, reduce方法处理所有的预处理过的文档的 term vector 进行累加丢弃低频的 term vector 重新产生一个 <hostname, term vector>

  • 倒排索引

map方法解析文档并且输出<word,document id>,reduce 方法接收给定单词的所有输出,排序相应的文档id并且输出<word, list(document ID)>,输出结果形成了一组倒排索引,倒排索引通过简单的办法,跟踪到了单词在文档中的位置。

  • 分布式排序

map方法从每条记录中提取key 输出<key,record>,reduce方法为空即可,因为从map获取中间结果的时候已经做了排序,切reduce只获得一个partation内的数据,则这个partaion内的数据已经是有序的了,所以什么都不用做。

0x05 实现

上文0x03 MapReduce执行的过程已经提到了MapReduce的执行过程,我们在本节会进一步谈到MapReduce库的实现细节。

Master数据结构

状态信息

master负责分配任务到worker所以会保存如下信息。

  • map任务状态
  • reduce任务状态
  • worker状态
文件位置信息

master是将map任务产的中间数据位置传送到reduce任务的管道,所以master保存了map 任务完成后产生的R个中间文件位置信息包含大小,当map任务结束的时候会更新位置和大小信息。这个信息会被逐步的推送到正在处理reduce 任务的worker。

容错

因为MapReduce库被设计用来使用数百或者数千台机器,处理总量巨大的数据时。这个库必须以优雅的办法进行容错。

worker 失败
  • 失败检测: master 周期性的ping 每个worker。如果没有从worker收到一定数量的回复就会由master将woker标记为失败。

  • 失败处理:

    • 任何由失败woker完成的map任务会被重置为等待执行的状态。因此该map任务有资格被调度到其他woker处理。
    • 任何map 或者reduce任务在失败worker上运行中的都会被重置为等待执行状态(任务尚未被调度的状态,简单的说就是让这个任务重做),变得有资格被重新调度。

这里有个细节需要注意,已经完成的reduce任务不需要重置,而已经完成的map任务需要重置主要是因为:

完成的map任务会被重新执行是因为他们的输出是存在失败worker的本地磁盘,因此那些数据是不可被访问的。(而某些reduce任务需要获取这个数据所以要重新执行)完成reduce任务不需要重新执行因为,他们的输出存在了全局文件系统当中。

当一个map任务首先被worker A执行然后由B执行(因为A失败了),所有在执行reduce任务的worker都会被通知重新执行。任何一个还没有从A读取数据的worker将会从B读取数据。

MapReduce 能够应对大规模worker失失败。MapReduce master 简单的重新执行由那些不可访问机器执行的任务。并且继续向前执行任务,最终完成MapReduce作业。

master 失败

当前MapReduce库的实现是master失败就放弃MapReduce计算。客户端可以检测到这个状态,并且如果他们希望的话可以重新执行整个MapReduce任务。主要是因为简化了设计只有单个master所以,继续执行未完成的任务是不可能的。

失败处理的语义

如果用户定义的Map 和 Reduce的函数的拥有确定的输入输出,那么这个函数在分布式环境下的执行结果与用户单机执行的结果是一致的。不管是否有遇到错误。

为了实现这个特性MapReduce依赖于map 和 reduce 任务的输出是原子提交实现如下:

map 侧
  • 每个工作中的任务写他的输出到一个私有的临时文件
  • 一个 reduce任务产生一个这样的文件,一个map任务产生R个这样的文件。
  • 当一个map任务完成了,woker发送一个消息到master里面包含这R个文件的名称。
  • 对于一个已经完成的map任务,如果master收到了一个完成的消息,master将会忽略这个消息
  • 如果不存在则将会记录这r个文件的名字到master的数据结构当中。
reduce 侧
  • 当 reduce 任务完成的时候,reduce自动重命名输出的临时文件到最终的输出文件。
  • 如果相同的reduce任务被在多台机器上执行。针对同一个最终文件的重命名操作有多个,依赖底层文件系统提供的原子性的重命名操作,来保证最终文件系统只包含一个reduce任务产生的数据。

绝大多数map和reduce操作是确定的,并且实际上我们的语义是等于串行执行的因为这个原因,程序员很容易推测他们程序的行为。当map 或者 reduce 操作是不确定的时候,提供了一个弱的,但是仍然合理的语义。在出现不确定操作时,给定的reduce 任务:

  • R1的输出相当于串行执行这个不确定程序的输出
  • 对于不同的reduce任务R2 的输出相当于另外一个不确定程序串行执行的输出

存储位置优化

MapReduce对存储进行了一定的优化主要是因为: 系统当中网络带宽有限

  • MapReduce通过从本地磁盘读取文件,来节约网络带宽。
  • GFS 将每个文件拆分为64M的块大小,然后将每个块分散的存储在不同的机器上(通常是3份)。
  • MapReduce master 会获取到输入文件的位置信息,并且会尝试将调度到包含输入文件副本的机器
  • 如果失败了的话,则会尝试将map任务调度到输入文件副本存放机器附近的worker(在同一个交换机下的worker)。
  • 当MapReduce运行在一个非常大的集群当中时,大部分输入数据能够从本地读取,不消耗过多的网络带宽。

任务粒度

通常情况下我们将map切分为M块,reduce切分为R块,理想情况下M和R应当比worker机器的数量大很多每个worker执行许多不同的任务,能够提高动态负载,同时也能在worker失败的情况下提高恢复速度:许多这个失败worker完成的任务能扣分担给其他worker。

因为MapReduce任务信息存储在master上,所以会占用内存等,所以不能无限的增加任务数。

备用worker

MapReduce 操作花费的时间变长,其中一个共同的原因是 落伍者(“straggler”):一个机器花费了一长段明显不同时间去完成最计算任务中的最后几个map或者reduce任务。落伍者的出现很可能是因为系统负载过高,磁盘损坏,代码bug等一系列问题。

解决思路是,有一些提前保留的worker,当一个MapReduce操作接近完成的时候,master把任务调度到备用worker去实行剩下的处于进行中的任务。当任何一个主或者备的执行完成,这个任务被标记为完成。
在google的实践中,通常情况下计算资源消耗的增加将不会超过几个百分点。但能够有效的减小大型MapReduce作业所花费的时间。例如排序程序,当关闭这个备用任务机制的时候,将会多花费44%的时间。

0x06 优化

分区函数

当用户指定的输出为 R时 默认的分区方式是使用哈希(hash (key) mode R)。这个会是分区结果相当的均衡。然后一些场景用户希望使用其他分区函数,例如有时输出的key是urls 并且用户希望对于所有的条目,相同的host将会被输出到相同的文件当中。为了支持这种场景,mapreudce库的用户能够提供一个特别的分区函数。例如使用户 hash(hostname(urlkey)) mod R作为分区函数作用于所有的来自同一个网址的url输出到相同的文件。

顺序保证

我们保证在一个给定的分区内,中间kv pair是增序处理的。

Combiner 函数

在某些场景下,map产生的中间key会有许多重复的结果。一个比较好的例子就是wordcount的例子。因为单词分布倾向于Zipf分布,每个map产生成百上千个 <word,1>形式的记录。这些数量将会通过网络发送到一个reduce任务,并且由这个reduce任务将他们相加输出一个数字。允许用户自定义一个Combiner函数,在发送数据前对数据进行局部合并。减少网络数据传输。

输入输出类型

MapReduce库提供了几种读取不同格式的数据输入的支持。

  • 例如文本模式对待每个输入为一个kv pair :key 是文件的偏移量value是该行的内容
  • 另一种已经支持的格式是一系列连续根据key排序的kv pair

每一个输入类型的实现知道如何进行切分范围能够使得对每个独立的map任务都有意义。(例如text模式切割的范围会保证发生在行的边界)

用户能够通过实现一个简单的reduce接口来增加新类型的支持,一个redaer不必非要从文件去读取数据内容,例如很容易定义一个reader从数据库读取数据,或者从内存数据结构读取数据。

Side Effect

某些场景下可能需要执行多个同样的任务,所以输出到同一个文件会产生错误,通常的做法是输出的时候写临时文件,当完全写完的时候做一次原子性的重命名(主要是因为有很多任务在同时进行,如果同时进行读写的话会产生冲突)。

本地执行

调试Map 或 Reduce的问题可以使用一些tricky的技巧,因为真实的计算任务在分布式系统当中,通常在成百上千台机器中运行,任务都是由master动态指定的。为了减轻调试,优化,和小规模测试的痛苦。Google开发了一个非正规的MapReduce实现,继而将所有的任务都在本地执行。控制权交给了用户,因此计算程序可以限制到某个map任务上。用户带有某些参数的执行他们的程序,因此可以很容易的使用他们觉得有用的某些debug程序或者测试程序。(例如gdb)

状态信息

内部建立了http服务器展示了job的运行状态方便排查

计数器

MapReduce库提供了一个计数器,来统计各种事件发生的次数,例如用户想要统计已经处理的单词数量。

为了使用这个特性:

  • 用户需要创建一个counter对象并且在map或者reduce任务中适当的增加计数值。
  • 计数值定期的从独立的worker传送到master(承载于ping的回包中)。
  • master从成功的map和reduce任务中汇聚计数值,并且当任务结束是返回给用户。

当前的计数值也会展示在master状态页面,因此用户可以实时的关注处理进度。当聚合计数值时master忽略重复执行的map和reduce任务计数,来防止重复计数。(重复执行来自我们的备用worker和由于失败重新执行的任务)

一些计数器的值由MapReduce库来维护,例如已经处理的输入kv pair和已经产生的 kv pair。

计数器机制对于检测MapReduce操作行为很有用。例如在某些MapReduce操作,用户代码希望能够保证输出的pair数量等于已经处理的输入pair的数量。

0x07 性能

本节展示了google对MapReduce性能的相关的测试和结果的讨论

主要进行了如下任务:

  • 一个计算是在1t的数据当中进行近似的模式匹配
  • 一个任务是近似的排序1tb的数据。

这两个程序在用户写的MapReduce程序中是非常具有代表性的,一类是数据shuffles 数据从一种形式到另一种形式,另一种是在一个大的数据集中提取一小部分感兴趣的数据。

集群配置

  • 1800~ 节点
  • 2c 2Ghz cpu
  • 4G 内存
  • 160g hdd(ide) * 2
  • 1G nic
  • 树状网络
  • 核心交换机有200G带宽
  • rtt < 1ms

4G内存中有1-1.5G内存被其他运行中的进程使用,程序在一个周末下午执行,此时cpu 硬盘和网络都处于空闲状态。

Grep

grep 程序扫描了 10^10 条100字节的记录,查找了比较少见的三个字符的模式(这个匹配发生了92337次 )。输入大小为64M的大小(M=15000),所有的输出都在一个文件当中(R=1)。

image

上图展示了计算进程随时间的变化

  • x轴表示花费的时间
  • y轴表示输入数据扫描速率

运行状态的解读

  • 随着被分配任务的worker的数量的增加速率渐渐提高
  • 当1764个worker被分配任务的时候出现了读取峰值,峰值大约30Gb/s
  • 随着map任务的结束,在80秒时读取速率下降到了0

整个任务大概花费了150秒左右从开始到结束,包含了1分钟启动的开销。开销由于用户程序需要分发到所有的worker上,gfs打开1000个文件的延时,为了优化本地操作,获取必要的信息。

排序

排序程序排序 10^10 条100字节的记录(接近1tb),这个程序模仿TeraSort benchmark。

这个程序由不到50行用户代码组成。一个三行的map方法提取了10字节的排序key从一行文本当中,并且提交了key和原始的文本数据作为中间kv pair。我们使用了一个内建的函数作为reduce操作。这个方法将中间kv pair 没有任何变化的传送到输出文件。最终输出的结果2副本的写入到gfs(2T的输入结果)。

像之前一样,输入数据被切分成64Mb大小(M=15000),我们将输出分成了4000个文件(R=4000),分区函数使用key的第一个字节,进行分区将数据分散R个分片中的一个。

这个benchmark 我们的分区函数知道key的分布情况,在一个通用的排序程序中,我们会使用一个预处理的MapReduce程序来收集key的样本。并且使用样本分布来计算最终的分割点。

image

上图展示了一次排序过程,左上方的图展示了读取文件的速率。峰值为13Gb/s随着时间的推移,读取速率迅速下降因为200左右所有的map任务已经结束了。值得注意的是读取速率比grep任务要低。这是因为排序的map任务花费了一半时间和io带宽用来将中间结果写入本地。与之相对的grep任务写入本地磁盘的数据可以忽略不计。

中间左边的图展示了map任务通过网络将数据传送到reduce任务的速率,shuffling 的过程始于第一个map任务完成的时候,第一个峰值出现的原因是因为,第一批reduce任务已经全部被分配到了1700个机器上了,并且每个机器最多执行一个reduce任务。在计算进入到大概300秒的时候,这一批任务中的一些reduce任务已经完成,为了启动剩下的reduce任务,开始shuffling 数据。shuffling 数据结束的时候大约在整个计算过程进行到600秒的时候。

左下方的图片展示了reduce任务将最终输出写入文件的速率。有一点下降在第一次shuffling期间和写入刚开始的时候,因为机器正忙于排序中间结果。写入数据在2-4Gb/s 速度持续了一会,所有的写入结束在850秒左右。包含了启动开销,全部计算任务花费了891秒。这个时间与TeraSort benchmark的最好记录1057秒相差不多。

一些事情需要注意:读取速率高过shuffling 速率和输出速率,因为本地数据优化,大部分数据读取都是通过本地磁盘获取的,同时绕开了网络传输的带宽的限制。shuffle的速率高过输出的速率,因为输出需要写2副本。(出于可用性和稳定性的原因我们写了两副本的数据)。写两副本的数据,是因为底层文件系统提供的可靠性和可用性机制的要求。写数据方面使用EC需要的网络带宽要比副本策略要低。

备用worker的影响

为了方便观看我把上张图再次贴到这里

image

在上图中的b我们展示了一个关闭了备用worker的,排序程序执行的过程。执行过程与a类似,除了有一个几乎没有写入长尾以外,在960秒的时候所有剩下的五个reduce任务没有执行完了。然而这些少数的落伍者并没有结束,直到300秒之后。全部任务执行花费了1283秒,时间消耗增长了44%。

机器故障

上图a中 在一次排序过程中,在几分钟后google故意的干掉了1746 中的 200个,底层集群立即将新的处理worker调度到这些机器上。(因为仅仅是worker 被kill了机器本身工作正常)

worker的死亡表现为速度为负数,因为之前做过的任务已经消失了(因为相应的map worker被杀死了)并且需要被重做。重做map任务相对来说很快,全部任务完成于933秒,包含了启动开销。(仅仅增长了5%的总时间)

0x08 结语

我们再来复习下MapReduce的执行过程

  1. master分发任务 把map 任务 和 reduce任务分发下去
  2. map worker读取输入,进行map计算写入本地临时文件
  3. map任务完成通知master 告知本地中间结果文件的位置大小信息
  4. master通知reduce worker
  5. reduce worker 从不同的 map worker 读取自己负责的部分 这个过程称之为 shuffle。
  6. reduce worker 读取之后会在本地进行排序
  7. 调用用户的reduce方法进行计算
  8. 最终reduce将结果输出到全局的文件系统(GFS)

我们来复习下失败处理的过程

  • master 故障不处理,外部程序决定是否重做
  • worker 故障信息通过master ping 各个worker
  • 如果发现了某个worker故障了
    • 这个worker执行中的`reduce 和 map 任务都会被标记为空闲重新分配
    • 如果是reduce任务并且执行完了那么就忽略不管
    • 如果是map任务并且执行完了,那么就要重置状态 要求重新执行,因为中间文件可能丢失了。

理解了MapReduce相关的失败处理,和执行过程基本也就算是理解了MapReduce,下一篇文章我会说一下mit6.824的第一个实验。