date: 2019-09-20
tags: OS “distributed” 6.824
这个学期的主要的任务就是学一下6.824。因为哥大的分布式课是按照6.824 spring 2015的安排走的,那个时候是实现paxos,而现在是实现raft,所以打算都实现一下。然后这个课重要的论文阅读部分也做一下并集。今天是第一篇,来读map reduce。
就按照文章的顺序来记录吧。
跳过。
整体计算输入是一系列键值对,输出也是键值对。就如名字所示,分为map和reduce这两步。
Map: 接受一个input pair,输出一系列键值对作为中间结果。MapReduce库会把中间结果中键相同的对聚到一起备Reduce用。
Reduce: 接受一个中间key,以及这个key对应的很多值。其功能是把这些值聚合起来。
下面是一个word count的经典例子:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
注意这里为了省事,例子里的map并没有进行计数,而是输出了很多{word: 1}
这样的对。
除了以上的代码,用户还需要写一个config,包含了输入输出的文件名等参数。
在这个例子里,输入输出的类型如下:
map (k1,v1) -> list(k2,v2)
reduce (k2,list(v2)) -> list(v2)
论文中的实现把所有的数字都转化成了字符串,估计是为了避免一些编码的问题。
文中还有一些别的例子,感兴趣的朋友可以自己去查看。
对于不同的系统应该有不同的合适的实现。本节介绍在Google被广泛运用的一种实现:commodity PC的大集群(基于Ethernet)。在该环境中
Map步骤:数据先被自动分为M份。这M份可以被不同的机器并行处理。
Reduce步骤:把intermediate key分为R份(如对key加hash时候取模),然后分给若干机器并行运行。
整体的运行情况如下图:
运行相关还有一些细节:
最后的结果存在了R个output file中(每个reduce task一个)。特别的,用户不需要合并这R份,因为这个输出经常会作为另一个MapReduce的输入。
master节点会保存诸多数据结构,比如会保存所有worker的state (idle, in-progress, completed)以及worker machine的identity。
master还是把中间量文件从map传向reduce的导管(conduit)。所以每当map做完之后,master会保存map生成的R份给reduce用的中间量文件的地址。
因为MapReduce是给大集群用的,所以fault tolerance很重要。
master会周期性的ping worker。如果没有在规定时间内收到恢复,master就把这个worker当成fail了的。任何in progress的map task和reduce task以及已完成了的map task都会被重新设置为idle。注意,已完成了的map需要重新执行,因为他们的结果都被存在这个worker的硬盘上。而已完成的reduce不用重新执行。
当一个map任务被worker A执行完再被worker B执行的时候(因为A fail了),所有执行reduce的worker都会被告知此事,从而让任何还未从A读取数据的worker从B读。
很容易建立master的周期性checkpoint来存储master data structure。如果master down了,重载一下checkpoint就好。然而鉴于只有一个master,不太可能会fail,所以现在实现的版本是如果master fail了,就重新开始整个运算。
当用户输入的map和reduce函数都是deterministic function的时候,我们的如上实现可以的到和non-faulting sequential execution一样的结果。上述的结果以来的是map和reduce的原子性。每个in-progress的task写到其自己私有的临时文件中。不过注意,当存在non-deterministic的函数的时候,可能会得到和sequential execution不同的结果。
为了降低带宽压力,会给map任务分配到里对应的split比较近的worker。
考虑我们在实际中应该采用多大的M和R。因为master需要O(M+R)
的决策并存储O(M*R)
的状态(不过内存的这个常数系数会很小,因为就是存个状态还有地址啥的)。
R经常会通过用户需求进行限制,因为这毕竟和最终输出有多少个文件相关。而M一般都是把input data分为16MB~64MB(这样可以让上述locality optimization最优)。我们会把R设置为worker总数的一个小倍数。比如对于2000个worker machines,, 。
一个拖慢计算速度的很重要的原因是存在某一个worker很慢(straggler)。比如某一个机器的硬盘不好,导致读写速度从30MB/s降低到1MB/s。集群也可能在同一台机器上分配了不同的任务(这里感觉任务是指不限于当前在做的mapreduce的任务),从而抢占了CPU,内存等等。这些情况可能会把计算拖慢约100倍。
减轻这种问的影响的方法是:当一个MapReduce operation接近完成了,master对当前的in-progress tasks进行backup execution。无论是primary还是backup完成了都标志着这个这个任务完成了。通过调参,增加的计算量只有百分之几,但是去可以大幅提高运行时间。例如,文中提到的排序任务如果关闭backup,运行时间会长44%。
尽管普通的mapreduce已经可以满足绝大多数需要了,我们还加入了几个比较常用的extension。
用来帮助在reduce之前把中间量数据分为R份的,默认就是hash(key)%R
。不过在一些时候,比如说URL,可能直接hash效果不好,我们加入了对hostname的前处理函数,从而变为hash(Hostname(urlkey))%R
。
我们保证在指定的partition下,所有的键值对都是按照key的升序排列的。这样能够更容易生成一个有序的结果,便于查找。
对于比如word count这个任务,可能会有很多<the, 1>
这样的对,应该用一个combiner预先merge一下(每个map任务之后跟一个combiner)。
实现了一个reader interface,用于读写不同类型的数据。
有的时候可以生成中间状态