1. MAPREDUCE原理篇
Mapreduce 是一个分布式运算程序的 编程框架 是用户开发“基于 hadoop 的数据分析应用”的核心框架;
Mapreduce 核心功能 将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 hadoop 集群上;
1.1 为什么要 MAPREDUCE
1 )海量数据在单机上处理因为硬件资源限制,无法胜任
2 )而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度
3 )引入 mapreduce 框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理
设想一个海量数据场景下的 wordcount 需求:

单机版:内存受限,磁盘受限,运算能力受限
分布式:
  1. 文件分布式存储(HDFS
  2. 运算逻辑需要至少分成2个阶段(一个阶段独立并发,一个阶段汇聚)
  3. 运算程序如何分发
  4. 程序如何分配运算任务(切片)
  5. 两阶段的程序如何启动?如何协调?
  6. 整个程序运行过程中的监控?容错?重试?

可见在程序由单机版扩成分布式时,会引入大量的复杂工作。为了提高开发效率,可以将分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。
mapreduce 就是这样一个分布式程序的通用框架,其应对以上问题的整体结构如下:
  1. MRAppMaster(mapreduce application master)
  2. MapTask
  3. ReduceTask

1.2MapReduce的框架思想  图片看不清可以下载到本地查看)


1.2 MAPREDUCE 框架结构及核心运行机制
1.2.1 结构
一个完整的 mapreduce 程序在分布式运行时有三类实例进程:
1 MRAppMaster :负责整个程序的过程调度及状态协调
2 mapTask :负责 map 阶段的整个数据处理流程
3 ReduceTask :负责 reduce 阶段的整个数据处理流程
1.2.2 MR 程序运行流程
1.2.2.1 流程示意图(图片看不清可以下载到本地查看)

1.2.2.2 流程解析
  1. 一个mr程序启动的时候,最先启动的是MRAppMasterMRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程
  1. maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:
    1. 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV
    2. 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存
    3. 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件
  2. MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)
  3. Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同keyKV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储

3.MR核心数据处理全过程(包括shuffle)(图片看不清可以下载到本地查看)


 1)mapreduce 中, map 阶段处理的数据如何传递给 reduce 阶段,是 mapreduce 框架中最关键的一个流程,这个流程就叫 shuffle
2 shuffle: 洗牌、发牌( 核心机制:数据分区、排序、缓存 );
3 )具体来说: 就是将 maptask 输出的处理结果数据,分发给 reducetask ,并在分发的过程中,对数据按 key 进行了分区和排序(默认也会分区,排序)。

上面的流程是整个 mapreduce 最全工作流程, 具体 shuffle 过程详解,如下:
1 maptask 收集我们的 map() 方法输出的 kv 对,放到内存缓冲区中
2 )从内存缓冲区不断溢出本地磁盘文件(非HDFS,因为是中间结果没必要),可能会溢出多个文件
3 )多个溢出文件会被合并成大的溢出文件
4 )在溢出过程中,及合并的过程中,都要调用 partitoner 进行分组和针对 key 进行排序
5 reducetask 根据自己的分区号,去各个 maptask 机器上取相应的结果分区数据
6 reducetask 会取到同一个分区的来自不同 maptask 的结果文件, reducetask 会将这些文件再进行合并(归并排序)
7 )合并成大文件后, shuffle 的过程也就结束了,后面进入 reducetask 的逻辑运算过程( 从文件中取出一个一个的键值对 group ,调用用户自定义的 reduce() 方法)
3 )注意
1.Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快。
缓冲区的大小可以通过参数调整,参数: io.sort.mb  默认 100M
2.map任务将输出的结果时写到本地磁盘,并不是HDFS系统,因为map输出的是中间结果,等reduce处理完以后,会从本地磁盘删除,所以没有必要上传到hdfs上,还会形成复本。这也是MapReduce比Spark慢的原因之一,因为Spark是将中间结果缓存到内存里,再次使用的时候直接加载,比mapreduce还有从本地磁盘读取快了太多。
3.如果运行map任务的节点在将map输出的结果传送给reduce之前就失败了,那么Hadoop将在另一个节点上重新运行这个map任务以再次构建新的map中间结果。MapReduce框架好就好在程序员不用担心系统失效的问题,因为框架可以检测到失败的任务,并重新在正常的机器上启动执行,这个主要是由MRappmaster进行监控的任务情况。如果失败让yarn进行调度分配新的container.

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐