本文简单介绍分布式计算框架 MapReduce



(一)、引入


MapReduce 的设计灵感来源于 Python 中的 $map$ 和 $reduce$ 函数。首先来看一下这两个函数是什么样的:

$map$ : 接收一个函数 $f$ 和 一个数组 $list$ (或者可以 iterate 的一串数据) , 然后把该函数运用在该数组中的每个元素(element)


$reduce$:接收一个函数 $g$ 和 一个数组 $list$ ;然后对该 list 中的前两个元素运行函数 $g$ ,生成的结果当成 list 的一个元素,然后递归取前两个元素再次运行函数 $g$ 。


实例:

问题:给定一个数组,求该数组中每个元素都平方,然后相加的结果。


所以以上的执行过程可以看成:


mapper:就是传给 $map$ 函数的处理函数;如以上例子的 $f$。 reducer:就是传给 $reduce$ 函数的处理函数;如以上例子的 $g$。

关键就在于程序员只需要去实现:$maper$ 和 $reducer$ 函数。



(二)、MapReduce



可以把 MapReduce 看成是 $map+reduce$ by key

  • $mapper$ 函数并不是返回一个 value,而是返回一个 key-value 对(可能很多有相同的 key )
  • 在调用 $reducer$ 函数之前,需要把所有 key 值相同的结果存到一起。

2.1 过程

对于普通的 map+reduce 的过程如图所示:

给定 item ,通过 mapper 函数得到新的 value,然后再通过 reducer 函数得到最终结果。


而对于 $MapReduce$ 的过程为:

  • 给定输入 items ;通过 mapper 函数最终得到 key-value pair

  • execution engine 会把所有相同的 key 归类到一起。也就是 Shuffle过程(并不是程序员来完成)

  • 最后 reducer 会根据 key 来完成整合,最后得到 key-value 对。(每一种 key 用一次 reducer 函数;当然,每个 reduce worker 可以处理多个 key)




2.2 实例






(三)、MapReduce 系统架构



  • 首先,我们有个用户进程来协调程序如何运行。如它会根据数据规模来判断需要把任务拆分给多少个 mapper 和 reducer 去完成。 当它认为需要 5 个 mapper 去完成时,就会把数据拆成 5 份。
  • 接着产生很多 woker (map woker 和 reduce woker);而且还会产生一个 master worker,这个 $master$ 会作为用户的代理来协调整个过程,这样的话用户就可以去做别的事了。

  • $master$ 就会告诉 worker 去拿相应的数据(也就是分配数据的过程)。
  • map worker 就会本地完成 map 操作。
  • 当 map 完成之后,$master$ 就会通知 reduce worker 去拿数据。
  • $reducer$ 完成 reduce 操作
  • 最后把结果写到最终的文件中。

所以总结为:

  • split : 数据切分,分给不同的 map worker。(map worker 去拿数据)
  • map : map worker 本地调用 mapper 函数
  • Shuffle : 把数据分配给不同的 reduce worker (相同的 key 的数据给同一个 reduce worker)
  • reduce :reducer 完成 reduce 操作
  • finalize :数据整合


(四)、MapReduce 完整过程


Step1 :split input files into chunks (shards)

  • 切分数据成成很多 shard


注意: $\color{blue}{shards 的数量 M» worker 的数量}$

原因:对于每个 worker 输入大小一样时并不能保证执行 shard 的时间一样(各个机器的执行速度并不一样);所以说每个 worker 分配一个 shard 并不合理。而当 $M» worker 的数量$ 时,我们可以动态分配,哪些 worker 完成任务了就来取数据执行,这样做更加 balance 。

Step 2 :fork processes

生成很多 wokers 和 master


Step 3 :map task

  • 从输入中读取 shard

  • 根据输入数据生成 key-value pair;最终结果存在 memory 中


Step 4 :create intermediate files

  • 把生成的 intermediate files 定期的写入到 disk 中
  • 当完成后需要通知 master


Step 4a :partitioning

  • 根据 key 进行排序

  • Partition function : 根据 key 值来决定分配给哪个 reduce worker

    • 默认函数为:$\text{hash(key) mod M}$
  • 每个 reduce worker 会从 所有 map worker 中读取属于它的 partition。

Step 5 :sorting intermediate data

  • 可以通过 master 知道数据位置
  • 通过 RPC 从 map workers 的 local disk 读取数据
  • 根据 key 值排序
  • 所有相同的 key 会 group 到一起


$\color{red}{相同的key一定去同一个reducer}$

Step 6 :reduce task

  • 根据 key 和与之对应的一系列 values 整合结果


  • 最终结果 append 到输出文件


Step 7 :return to user



(五)、MapReduce 设计细节


3.1 Locality

  • 输入文件(input)和输出文件(output)保存在 GFS 中。
  • MapReduce 运行在 GFS 的 chunkservers 上面;
    • 主要是为了让计算和存储在相同的机器上
  • Master 调度的时候尽量让含有相应数据的 worker 处理对应的数据

3.2 Fault Tolerance

  • Master 周期性的去 $ping$ 每个 worker (检查 worker fail)
    • 如果一定时间内 worker 没有回复,则标记该 worker failed
    • 把分配给该 worker 的任务重新分配给别的 worker (re-execution
  • Master failure (Master fail 的情况)
    • 状态 checkpointed to GFS
    • 由 GFS 来恢复 Master 并继续执行

3.3 Straggler

问题:有的 worker 执行速度快,有的 worker 执行速度慢(straggler);而这些速度慢的 worker 会使得整个计算时间变慢。

解决方案:把任务分配给别的 worker,谁先做完就使用谁的结果。

3.4 Intermediate key-value pairs

问题:中间结果表示 intermediate key-value 存储在哪里?存在 mapper 的本地磁盘还是 reducer 的本地磁盘。

第一种方案: Mapper-side

  • 发送数据前做 partition;可以减少网络传输
  • 如果 Map task failure ; 会造成部分数据丢失
  • 如果 Reduce task failure; 只需要重新发送数据

第二种方案: Reducer-side

  • Pipeline -> overlap the transfer time

也就是说发送数据和执行可以 overlap (有的做完一部分 Map 后就可以发送数据,并不需要等待所有 Map worker 都做完后发送)

目前普遍用的是第一种方案。