5

MapReduce #导入Word文档图片#

 2 years ago
source link: https://blog.51cto.com/Lolitann/5363657
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

这是我的分布式作业报告!不要抄不要抄!

1 提出背景

MapReduce最早是由Google公司研究提出的一种面向大规模数据处理的并行计算模型和方法。Google公司设计MapReduce的初衷主要是为了解决其搜索引擎中大规模网页数据的并行化处理。

说MapReduce之前,我们有必要说一下PageRank与MapReduce的渊源,PageRank是谷歌两个创始人Lawrence Page(拉里·佩奇)和 Sergey Brin(谢尔盖·布林)在大学时期研究出来的一个网页排名算法。随着互联网发展,互联网中的资源越来越多。当用户在搜索引擎中搜索某个关键字的时候,会返回成千上万的网页结果。那对于这些网页结果如何进行排序,呢怎么才能找到用户最想看的信息呢?PageRank算法解决的就是这个问题。PageRank的核心思想是:在互联网成千上万的网页中,如果一个网页被很多其他网页所链接,说明它被更多用户需要,那么它的排名就高。

PageRank网页排名算法转化到数学上是使用矩阵进行计算的。使用二维矩阵巨型运算,从PageRank算法理论上来说,矩阵的维度等于网页的数量。面对海量的网页,这么大的矩阵相乘计算量是非常大的。Lawrence Page和 Sergey Brin两个人利用稀疏矩阵计算的技巧,大大简化了计算量,并实现了这个网页排名算法。但是这只是解决了当时的问题。

随着互联网的发展,个人PC的普及,网页数量呈现出爆炸式增长,PageRank的计算量越来越大,必须利用多台服务器才能完成。Google早期时,PageRank计算的并行化是半手工、半自动的,这样更新一遍所有网页的PageRank的周期很长。2003年, Google的工程师Jeffrey Dean和Sanjay Ghemawat发明了并行计算工具MapReduce,PageRank的并行计算完全自动化了,这就大大缩短了计算时间,使网页排名的更新周期比以前短了许多。2004年JeffreyDean和Sanjay Ghemawat 发表了关于MapReduce的论文《MapReduce: Simplified Data Processing on LargeClusters》。这篇论文此后被广泛引用,它描述了Google如何通过拆分、处理和聚合来处理他们那些大到令人难以置信的数据集。

2 MapReduce算法介绍

MapReduce的核心思想

假设矩阵:

MapReduce #导入Word文档图片# _hadoop

MapReduce #导入Word文档图片# _hadoop_02

要计算他们的乘积C=A×B,即

MapReduce #导入Word文档图片# _mapreduce_03

当网页数量过大时,一台服务器存不下整个大数组,就会无法计算。MapReduce采用分治算法的思想,将整个矩阵计算问题划分。

在上边A、B、C的维度均是MapReduce #导入Word文档图片# _hadoop_04。接下来我们看一下MapReduce:

如下图,假设我们把一个MapReduce #导入Word文档图片# _mapreduce_05的矩阵A按行拆成10个小矩阵A1,A2,… ,A10,每一个有每个矩阵就变成了MapReduce #导入Word文档图片# _mapreduce_06 的矩阵。

MapReduce #导入Word文档图片# _hadoop_07

现在用划分之后的每一块Ai与B进行计算:

MapReduce #导入Word文档图片# _mapreduce_08

现在问题就可以转化成为一个MapReduce #导入Word文档图片# _mapreduce_09的矩阵和一个MapReduce #导入Word文档图片# _mapreduce_10的矩阵相乘,结果为MapReduce #导入Word文档图片# _hadoop_11的矩阵。这样我们就可以把这个矩阵运算从一台服务器搬到十台服务器上,计算出不同的Ci之后最终合并到一起。

既然A能拆分,那我们肯定也知道B也能拆分,遵循矩阵乘法的计算规则,我们将B矩阵按照列划分,也划分为十份,B1,B2,… ,B10,每一个有每个矩阵就变成了MapReduce #导入Word文档图片# _hadoop_12 的矩阵。

MapReduce #导入Word文档图片# _hadoop_13

此时将划分后的Ai和Bj进行相乘:

MapReduce #导入Word文档图片# _mapreduce_14

现在问题转换成了MapReduce #导入Word文档图片# _mapreduce_15的矩阵和MapReduce #导入Word文档图片# _mapreduce_16的矩阵计算,最后得到MapReduce #导入Word文档图片# _mapreduce_17的矩阵。我们也将问题从一个服务器搬到了100个服务器上。

这就是MapReduce的根本原理。将一个大任务拆分成小的子任务,并且完成子任务的计算,这个过程叫作Map,将中间结果合并成最终结果,这个过程叫作Reduce。当然,如何将一个大矩阵自动拆分,保证各个服务器负载均衡,如何合并返回值,就是MapReduce在工程上所做的事情了。

2.2 图解MapReduce

假设我们开了个餐厅。

MapReduce #导入Word文档图片# _mapreduce_18

现在有一堆食材,这是Input。

一般餐厅再会给主厨配一些帮手,不能所有的事都让主厨做,需要有一些厨师作为备菜厨师。我们把食材进行分类。海鲜、蔬菜、水果三类分配给不同的三个备菜厨师,这些备菜厨师把收到的菜分别处理成不同的样子方便主厨直接使用,分配的过程我们称之为Split,备菜厨师处理食材的过程,我们称之为Map。

被拆除时将处理好的食材放入到冰箱和冰柜中储藏,等待主厨随时使用。放入冰箱这个过程就是Shuffle。

有客人点餐的时候主厨从不同的冰箱冰柜里拿出食材做出菜,这个过程我们称之为Reduce。

最后客人取到自己点的菜,这个被称为Finalze。

通过上面这个例子,我们应该能够形象的理解MapReduce的整个流程了。接下来介绍一下MapReduce的构架。

首先我们需要一个用户进程来协调定义我们这个程序需要怎么运行,需要去做什么事情。所以拿到数据之后第一步用户会想我的数据要划分成几部分,用多少个Mapper和Reducer去做,这是用户可以自己决定的。思路正好对照上面我们说的怎么将矩阵拆分。决定好数据怎么拆分之后他会指导产生对应数量的Worker。这些Worker分别作为Mapper和Reducer,此外还会产生一个Master,Master之后会作为用户的代理去协调整个MapReduce的流程。Master、Mapper和Reducer都是同等级的组件,三者的区别仅在于职能不同。

接受的任务分配之后,作为Mapper的Worker取到自己的那一份数据将其处理,最后将其写到本地硬盘或缓存上。之后作为Reducer的Worker就可以去本地拿Mapper已经处理好的数据进行组装。Reducer做完处理之后将内容写到最终的文件中就是Finlize。

MapReduce #导入Word文档图片# _hadoop_19

MapReduce算法陈述

从MapReduce自身的命名特点可以看出,MapReduce由两个阶段组成:Map和Reduce。

Mapper:

第一阶段是把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划。默认情况下,Split size = Block size。每一个切片由一个MapTask处理。(getSplits)

第二阶段是对切片中的数据按照一定的规则解析成<key,value>对。默认规则是把每一行文本内容解析成键值对。key是每一行的起始位置(单位是字节),value是本行的文本内容。(TextInputFormat)

第三阶段是调用Mapper类中的map方法。上阶段中每解析出来的一个<k,v>,调用一次map方法。每次调用map方法会输出零个或多个键值对。

第四阶段是按照一定的规则对第三阶段输出的键值对进行分区。默认是只有一个区。分区的数量就是Reducer任务运行的数量。默认只有一个Reducer任务。

第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到文件中。

第六阶段是对数据进行局部聚合处理,也就是combiner处理。键相等的键值对会调用一次reduce方法。经过这一阶段,数据量会减少。本阶段默认是没有的。

Reducer:

第一阶段是Reducer任务会主动从Mapper任务复制其输出的键值对。Mapper任务可能会有很多,因此Reducer会复制多个Mapper的输出。

第二阶段是把复制到Reducer本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。

第三阶段是对排序后的键值对调用reduce方法。键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS文件中。

3 MapReduc实践

在整个MapReduce程序的开发过程中, 用户只需编写map()和reduce()两个函数,即可完成简单的分布式程序的设计。

之所以这么设计是因为Google 的研究人员发现所有数据处理的程序都有类似的过程:

将一组输入的数据应用map函数返回一个k/v对的结构作为中间数据集并将具有相同key的数据输入到一个reduce函数中执行最终返回处理后的结果。

  • map()函数以<key,value>对作为输入,产生另外一系列<key,value>对作为中间输出写入本地磁盘。MapReduce框架会自动将这些中间数据按照key值进行聚集,且key值相同(用户可设定聚集策略,默认情况下是对key值进行哈希取模)的数据被统一交给reduce()函数处理。
  • reduce()函数以key及对应的value列表作为输入,经合并key相同的value值后,产生另外一系列<key,value>对作为最终输出写入HDFS。

MapReduce非常有利于并行化计算。map的过程可以在多台机器上,而机器之间不需要同互相协调。reduce执行的过程中同样不需要协调。这一计算模型的优势在于非常利于并行化,map的过程可以在多台机器上而机器之间不需要相互协调。所有的依赖性和协调过程都被隐藏在map与reduce函数的数据分发之间。因此这里有一个很重要的细节就是 map和reduce都是可以各自并行执行,但reduce执行的前提条件是所有的map函数执行完毕。

说完MapReduce原理之后简单的说一下它的两个小应用。

3.1 统计单词词频

MapReduce #导入Word文档图片# _hadoop_20
  • Input:文档作为输入。
  • Split:将文档按行切分。
  • Map:将每一行的单词拆分出来。
  • Shuffle:将同类单词聚合到一起,排序。
  • Reduce:将同类单词进行统计。
  • Finalize:形成单词词频统计表格。
    Map和Reduce实现的伪代码如下:

3.2 构建倒排索引

  • Input:文档作为输入。
  • Split:将文档进行切分
  • Map:将句子中的单词拆分出来,并标记每个单词在文档中的位置。
  • Shuffle:将同类单词聚合到一起,排序。
  • Reduce:将同类单词的统计结果合并起来。
  • Finalize:形成单词索引表格。

Map和Reduce实现的伪代码如下:

MapReduce #导入Word文档图片# _hadoop_21

MapReduce #导入Word文档图片# _hadoop_22

Hadoop

4.1 Hadoop的诞生

Hadoop最早起源于Nutch。Nutch是一个开源的网络搜索引擎,由DougCutting于2002年创建。Nutch的设计目标是构建一个大型的全网搜索引擎,包括网页抓取、索引、查询等功能,但随着抓取网页数量的增加,遇到了严重的可扩展性问题,即不能解决数十亿网页的存储和索引问题。之后,谷歌发表的两篇论文为该问题提供了可行的解决方案。一篇是2003年发表的关于谷歌分布式文件系统(GFS)的论文。该论文描述了谷歌搜索引擎网页相关数据的存储架构,该架构可解决Nutch遇到的网页抓取和索引过程中产生的超大文件存储需求的问题。但由于谷歌仅开源了思想而未开源代码,Nutch项目组便根据论文完成了一个开源实现,即Nutch的分布式文件系统(NDFS)。另一篇是2004年发表的关于谷歌分布式计算框架MapReduce的论文。该论文描述了谷歌内部最重要的分布式计算框架MapReduce的设计艺术,该框架可用于处理海量网页的索引问题。同样,由于谷歌未开源代码,Nutch的开发人员完成了一个开源实现。由于NDFS和MapReduce不仅适用于搜索领域,2006年年初,开发人员便将其移出Nutch,成为Lucene的一个子项目,称为Hadoop。大约同一时间,Doug Cutting加入雅虎公司,且公司同意组织一个专门的团队继续发展Hadoop。同年2月,ApacheHadoop项目正式启动以支持MapReduce和HDFS的独立发展。2008年1月,Hadoop成为Apache顶级项目,迎来了它的快速发展期。

4.2 Hadoop中的MapReduce编程模型

从前边对MapReduce的描述中我们可以知道,MapReduce的适用的应用场景往往具有一个共同的特点:任务可被分解成相互独立的子问题。基于该特点,MapReduce编程模型给出了其分布式编程方法,共分5个步骤:

  • 迭代(iteration)。遍历输入数据,并将之解析成keyvalue对。
  • 将输入<key,value>对映射(map)成另外一些<key,value>对。
  • 依据key对中间数据进行分组(grouping)。
  • 以组为单位对数据进行归约(reduce)。
  • 迭代。将最终产生的<key,value>对保存到输出文件中。

MapReduce将计算过程分解成以上5个步骤带来的最大好处是组件化与并行化。为了实现MapReduce编程模型,Hadoop设计了一系列对外编程接口。用户可通过实现这些接口完成应用程序的开发。

4.3 Hadoop中的MapReduce接口

MapReduce编程模型对外提供的编程接口体系结构如下图所示,整个编程模型位于应用程序层和MapReduce执行器之间,可以分为两层。

  • 第一层是最基本API,主要有5个可编程组件,分别是InputFormat、Mapper、Partitioner、Reducer和OutputFormat。Hadoop自带了很多直接可用的InputFormat、Partitioner和OutputFormat,大部分情况下,用户只需编写Mapper和Reducer即可。
  • 第二层是工具层,位于基本Java API之上,主要是为了方便用户编写复杂的MapReduce程序和利用其他编程语言增加MapReduce计算平台的兼容性而提出来的。在该层中,主要提供了4个编程工具包。
  • JobControl:方便用户编写有依赖关系的作业,这些作业往往构成一个有向图,所以通常称为DAG(Directed Acyclic Graph)作业。
  • ChainMapper/ChainReducer:方便用户编写链式作业,即在Map或者Reduce阶段存在多个Mapper。
  • Hadoop Streaming:方便用户采用非Java语言编写作业,允许用户指定可执行文件或者脚本作为Mapper/Reducer。
  • Hadoop Pipes:专门为C/C++程序员编写MapReduce程序提供的工具包。
MapReduce #导入Word文档图片# _mapreduce_23

4.4 Hadoop中的MapReduce处理逻辑

Hadoop 中的每个map 任务可以细分成4个阶段:record reader、mapper、combiner 和partitioner。map 任务的输出被称为中间键和中间值,会被发送到reducer做后续处理。reduce任务可以分为4个阶段:混排(shuffle)、排序(sort)、reducer 和输出格式(output format)。map 任务运行的节点会优先选择在数据所在的节点,因此,一般可以通过在本地机器上进行计算来减少数据的网络传输。具体如下:

  • Split:在进行map计算之前,MapReduce会根据输入文件计算输入分片(input split),每个输入分片针对一个map任务,输入分片存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。

读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>。Key表示每行首字符偏移值,value表示这一行文本内容。

  • Map:映射逻辑由用户编写。读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。
  • Shuffle:数据混洗阶段,核心机制是数据分区,排序,局部聚合,缓存,拉取,再合并排序。map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。数据处理之后放入缓冲区中。简单来说,就是将MapTask输出的处理数据结果,按照Partitioner组件制定的规则分发ReduceTask,并在分发的过程中,对数据按key进行分区和排序
  • Reduce:由用户自己编写。Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。
  • Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。
  • Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。

把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

MapReduce现状

5.1 “MapReduce为什么被淘汰了?”

查相关资料的时候在看到这样一个问题“MapReduce为什么被淘汰了?”

我的的回答是“No!没被淘汰!”

热评第一说MapReduce被淘汰了,理由是这篇文章《为什么MapReduce会被硅谷一线公司淘汰?》这篇文章里提到:

到了 2014 年左右,Google 内部已经几乎没人写新的 MapReduce 了。2016 年开始,Google 在新员工的培训中把 MapReduce 替换成了内部称为 FlumeJava的数据处理技术。Google 内部的 FlumeJava后来的开源版本就是Apache Beam。

为什么说“被淘汰”,文中作者提出两点:

  1. 维护成本高使用 MapReduce,你需要严格地遵循分步的 Map 和 Reduce 步骤。当你构造更为复杂的处理架构时,往往需要协调多个 Map 和多个 Reduce 任务。然而,每一步的 MapReduce 都有可能出错。为了这些异常处理,很多人开始设计自己的协调系统(orchestration)。例如,做一个状态机(state machine)协调多个 MapReduce,这大大增加了整个系统的复杂度。​
  2. 时间性能“达不到”用户的期待

Google 曾经在 2007 年到 2012 年间做过一个对于 1PB 数据的大规模排序实验,来测试 MapReduce 的性能。从 2007 年的排序时间 12 小时,到 2012 年的排序时间缩短至 0.5 小时。即使是 Google,也花了 5 年的时间才不断优化了一个 MapReduce 流程的效率。2011 年,他们在 Google Research 的博客上公布了初步的成果。其中有一个重要的发现,就是他们在 MapReduce 的性能配置上花了非常多的时间。包括了缓冲大小 (buffer size),分片多少(number of shards),预抓取策略(prefetch),缓存大小(cache size)等等。

5.2 我的看法

5.2.1 MapReduce不会被淘汰​

对于上面的那个说法我并不同意。我认为我和上边提到的那篇文章作者的分歧是因为MapReduc的广义和狭义概念这个地方。那篇文章中作者说的是狭义的MapReduce,即谷歌提出的那个原始的MapReduce,不包括之后Hadoop、Spark、Hive、Flink、Presto、Druid等等。

我是从广义的MapReduce思考的,我认为你可以说谷歌原始的MapReduce、Hadoop MapReduce太慢了,因此被Spark之类的取代了。但是你不能说它是被淘汰了。MapReduce作为一种编程范式,它的思想永远不会被淘汰。

5.2.2 Apache Beam​

针对“Google 在新员工的培训中把 MapReduce 替换成了内部称为 FlumeJava的数据处理技术。Google 内部的 FlumeJava后来的开源版本就是Apache Beam。”这句话我们来剖析一下Apache Beam。

Beam里面有两个核心原语:

  • ParDo: 来处理通用的基于单条数据的计算: 每条需要处理的数据会被喂给用户提供的指定的一个函数(Beam里面的@ProcessElement), 然后输出0个或者多个输出。
  • Filter, AppendColumn等等都可以通过ParDo来实现。
  • ParDo的语义涵盖了Hadoop中Map-Shuffle-Reduce的Map,Reduce
  • GroupByKey: 用来做Grouping的操作。
  • Count, Sum, Min等等都可以通过GroupByKey来实现。
  • Group的语义涵盖了Hadoop中Map-Shuffle-Reduce的Shuffle

既然Map-Shuffle-Reduce这三个关键语义都被Beam涵盖了,那它为什么不是MapReduce了呢?豆子炒熟了端上餐桌就不是豆子吗?函数式编程里面的 MapReduce将会继续影响着分布式领域的发展。

5.3 我们身边的MapReduce

百度智能云MapReduce提供全托管计算集群服务,提供高可靠、高安全性、高性价比的分布式计算服务,涵盖 Hadoop、Spark、Hive、Flink、Presto、Druid等多种组件。

MapReduce #导入Word文档图片# _hadoop_24
  • Baidu MapReduce组件丰富,涵盖 Hadoop、Spark、Hive、Flink、Presto、Druid等。
  • 为减少资源消耗,可以从云端动态调整机组的数量,运算需求大时增加机组,工作量小的时候减少机组。
  • 基于工作流优化架构,使组件之间形成有机协同,保障数据分析高效平稳运转。
  • 可以依照其不同维度需求快速搭建数据仓库,为企业决策提供高效数据。
  • 可以提供亚秒级的实时计算能力,针对实时数据高效处理。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK