所有的mapreduce任务调度器都可以有combiner函数吗

Hadoop(49)
Combiner的作用是把一个map产生的多个&KEY,VALUE&合并成一个新的&KEY,VALUE&,然后再将新&KEY,VALUE&的作为reduce的输入;
在map函数与reduce函数之间多了一个combine函数,目的是为了减少map输出的中间结果,这样减少了reduce复制map输出的数据,减少网络传输负载;
并不是所有情况下都能使用Combiner,Combiner适用于对记录汇总的场景(如求和),但是,求平均数的场景就不能使用Combiner了。如果可以使用Combiner,一般情况下,和我们的reduce函数是一致的。
什么时候运行Combiner?
1、当job设置了Combiner,并且spill的个数到min.num.bine(默认是3)的时候,那么combiner就会Merge之前执行;
2、但是有的情况下,Merge开始执行,但spill文件的个数没有达到需求,这个时候Combiner可能会在Merge之后执行;
3、Combiner也有可能不运行,Combiner会考虑当时集群的一个负载情况。如果集群负载量很大,会尽量提早执行完map,空出资源,所以,就不会去执行。
实例代码:
package MyC
import java.io.IOE
import java.net.URI;
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.FileS
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
public class CombinerExp {
private final static String INPUT_PATH = "hdfs://master:8020/input";
private final static String OUTPUT_PATH = "hdfs://master:8020/output.txt";
public static class MyMapper extends Mapper&LongWritable, Text, Text, IntWritable&{
private IntWritable one = new IntWritable(1);
private Text word = new Text();
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] str = value.toString().split("\\s+");
for (String string : str) {
System.out.println(string);
word.set(string);
context.write(word, one);
public static class MyReducer extends Reducer&Text, IntWritable,Text, IntWritable&{
private IntWritable result = new IntWritable();
protected void reduce(Text key, Iterable&IntWritable& values,
Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum+=val.get();
result.set(sum);
context.write(key,result);
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
if(fileSystem.exists(new Path(OUTPUT_PATH)))
fileSystem.delete(new Path(OUTPUT_PATH),true);
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(CombinerExp.class);
FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
System.exit(job.waitForCompletion(true) ? 0 : 1);
[root@master liguodong]# hdfs dfs -ls -R /input/
-rw-r--r--
1 root supergroup
27 2015-06-13 22:15 /input/input1
-rw-r--r--
1 root supergroup
38 2015-06-13 22:15 /input/input2
当我们只有map和combine而没有reduce时,combine并不会执行。
而输出的结果并没有被求和。
[root@master liguodong]# hdfs dfs -ls -R /output/
-rw-r--r--
3 liguodong supergroup
0 2015-06-13 22:17 /output/_SUCCESS
-rw-r--r--
3 liguodong supergroup
50 2015-06-13 22:17 /output/part-m-00000
-rw-r--r--
3 liguodong supergroup
39 2015-06-13 22:17 /output/part-m-00001
[root@master liguodong]# hdfs dfs -cat /output/part-m-00000
[root@master liguodong]# hdfs dfs -cat /output/part-m-00001
当我们把第79行注释取消,将80行注释的时候,将会执行combine函数。
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 32
File System Counters
Map-Reduce Framework
Map input records=6
Map output records=12
Input split bytes=192
Combine input records=12
Combine output records=9
Reduce input records=9
Reduce output records=7
Spilled Records=18
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=
File Input Format Counters
Bytes Read=65
File Output Format Counters
Bytes Written=51
[root@master hadoop]# hdfs dfs -ls -R /output/
-rw-r--r--
3 liguodong supergroup
0 2015-06-13 22:41 /output/_SUCCESS
-rw-r--r--
3 liguodong supergroup
51 2015-06-13 22:41 /output/part-r-00000
[root@master hadoop]# hdfs dfs -cat /output/pa*
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:216720次
积分:5223
积分:5223
排名:第4279名
原创:264篇
评论:48条
姓名:李国冬
英文名:wintfru
学校:西南石油大学
学历:本科
专注:Java、Hadoop、IOT
爱好:跑步,NBA,旅游,LOL
文章:14篇
阅读:20697
文章:15篇
阅读:22071
文章:13篇
阅读:11946
文章:24篇
阅读:18209
文章:27篇
阅读:36810
好记性不如烂笔头。现在的位置:
hadoop系列之二:MapReduce理论基础
每个MapReduce job都是Hadoop客户端想要执行的一个工作单元,它一般由输入数据、MapReduce程序和配置信息组成,而Hadoop会把每个job分隔成两类任务(task):map任务和reduce任务。在Hadoop集群中有两类节点来执行两类job进程的执行 。
1 大数据处理
任何基础业务包含了收集、分析、监控、过滤、搜索或组织web内容的公司或组织都面临着所谓的“大数据”问题:“web规模”处理即海量数据处理的代名词。社交类网站的兴起也使得这些组织面临着另一个问题:用户行为数据分析,这涉及到通过日志文件记录用户的对web页面浏览、点击、停留时长等,而后对日志文件中的大量数据进行分析以支持进行合理、正确的商业决策。
那么,大数据处理究竟意味着对多大规模的数据进行处理?一个简单的例子:Google在2004年平均每天利用MapReduce处理100GB的数据,到2008年平均每天处理的数据已经达到20PB;2009年,Facebook的数据量达到2.5PB,且以每天15TB的速度在增长。PB级别的数据集正变得越来越常见,大数据时代的到来已然是不争的事实,密集数据处理也正迅速成为现实需求。
大数据问题的处理需要以与传统数据处理方式所不同的方法去实现,这正是MapReduce思想得以大放光彩的核心所在。MapReduce在实现大数据处理上有着多个基础理论思想的支撑,然而这些基础理论甚至实现方法都未必是MapReduce所创,它们只是被MapReduce采用独特的方式加以利用而已。
1.1 向外扩展(Scale out)而非向上扩展(Scale up):
大数据的处理更适合采用大量低端商业服务器(scale out)而非少量高端服务器(scale up)。后者正是向上扩展的系统性能提升方式,它通常采用有着SMP架构的主机,然而有着大量的CPU插槽(成百上千个)及大量的共享内存(可以多达数百GB)的高端服务器非常昂贵,但其性能的增长却非线性上升的,因此性价比很一般。而大量的低端商业服务器价格低廉、易于更换和伸缩等特性有效避免了向上扩展的敝端。
1.2 假设故障很常见(Assume failures are common):
在数据仓库架构级别,故障是不可避免且非常普遍的。假设一款服务器出故障的平均概率为1000天1次,那么10000台这种服务器每天出错的可能性将达到10次。因此,大规模向外扩展的应用场景中,一个设计优良且具有容错能力的服务必须能有效克服非常普遍的硬件故障所带来的问题,即故障不能导致用户应用层面的不一致性或非确定性。MapReduce编程模型能通过一系列机制如任务自动重启等健壮地应付系统或硬件故障。
1.3 将处理移向数据(Move processing to the data):
传统高性能计算应用中,超级计算机一般有着处理节点(processing node)和存储节点(storage node)两种角色,它们通过高容量的设备完成互联。然而,大多数数据密集型的处理工作并不需要多么强大的处理能力,于是把计算与存储互相分开将使得网络成为系统性能瓶颈。为了克服计算如此类的问题,MapReduce在其架构中将计算和存储合并在了一起,并将数据处理工作直接放在数据存储的位置完成,只不过这需要分布式文件系统予以支撑。
1.4 顺序处理数据并避免随机访问(Process data sequentially and avoid random access):
大数据处理通常意味着海量的数量难以全部载入内存,因而必须存储在磁盘上。然而,机械式磁盘寻道操作的先天性缺陷使得随机数据访问成为非常昂贵的操作,因此避免随机数据访问并以顺序处理为目的完成数据组织成为亟待之需。固态磁盘虽然避免了机械磁盘的某此缺陷,然而其高昂的价格以及并没有消除的随机访问问题仍然无法带来性能上的飞跃发展。MapReduce则主要设计用来在海量数据集上完成批处理操作,即所有的计算被组织成较长的流式处理操作,以延迟换取较大的吞吐能力。
1.5 向程序员隐藏系统级别的细节(Hide system-level details from the application developer):
1.6 无缝扩展(Seamless scalability):
2 MapReduce和大数据问题
海量数据处理的核心思想无非是将一个较大的问题进行“分割包围、逐个歼灭”。然而其难点和关键点在于如何将一个大的问题分分割成多个可以分别在不同的CPU上或不同的主机上进行处理的独立小问题,而且这些独立进行处理的小问题所产生的中间结果又该如何合并成最终结果并予以输出。因此,看似简单的化整为零的处理思想却不得不面临如下的难题:
(1) 如何将大问题分割为小任务?进一步地,如何将大问题分解为可以并行处理的小任务?
(2) 如何将分解好的小任务派送给分布式系统中的某主机且是较为适合解决此问题的主机上的worker完成处理?
(3) 如何保证某worker获取所需的数据?
(4) 如何协调不同worker之间进行同步?
(5) 如何将某worker的部分结果共享给其它需要此结果的worker?
(6) 如何在出现软件或硬件故障时仍然能保证上述工作的顺利进行?
在传统的并行或分布式编程模型中,程序员不得不显式地解决上述的部分甚至是全部问题,而在共享内存编程中,程序员需要显式地协调对共享数据结构的如互斥锁的访问、显式地通过栅(barrier)等设备解决进程同步问题、并得时刻警惕着程序中可能出现的死锁或竞争条件。虽然有些编程语言也或多或少地规避了让程序员面对上述问题,但却也避免不了将资源分配给各worker的问题。MapReduce的优势之一便是有效地向程序员隐藏了这些问题。
3 函数式编译语言
MapReduce是一种类似于Lisp或ML的函数式编程语言。函数式编程的核心特性之一是基于高阶函数,即能够接受其它函数作为参数的函数完成编程。MapReduce有两个常见地内置高阶函数map和fold。
如图所示,给定一个列表,map(接受一个参数)以函数f为其参数并将其应用于列表中的所有元素;fold(接受两个参数)以函数g和一个初始值作为参数,然后将g应用于初始值和列表中的第一个元素,结果被放置于中间变量中。中间变量和第二个元素将作为g函数下一次应用时的参数,而后如此操作直至将列表中的所有元素处理完毕后,fold会将最终处理结果保存至一个中间变量中。
于是,基于上述过程,我们可以把map视作利用f函数将给定数据集完成形式转换的操作,同样地,fold就可以被看作利用g函数完成数据聚合的操作。我们就可以由此得知,各函数式程序在运行时彼此间是隔离的,因此,在map中将f函数应用于列表中每一个元素的操作可以并行进行,进一步地讲,它们可以分布于集群中的不同节点上并行执行。然而,受限于数据的本地性,fold操作需要等到列表中的每一个元素都准备停当之后才能进行。幸运地是,现实生活中的应用程序并不要求g函数应用于列表中的所有元素,因此,列表中元素可以被分为多个逻辑组,并将fold操作并行地应用在这些逻辑组上即可。由此,fold操作也可以以并行的方式高效完成。
MapReduce有两个常见地内置高阶函数map和reduce,其map就类似于上述过程中的map操作,reduce对应于上述过程中的fold操作。只不过,MapReduce的执行框架能自行协调map与reduce并将其应用于在商业服务器硬件平台上并行处理海量数据。
更为精确地说,MapReduce有三个相互关联却各不相同的概念。首先,MapReduce是一个如上所述的函数式编程语言。其次,MapReduce也是一个运行框架,它能够协调运行基于MapReduce思想开发的程序。最后,MapReduce还可以被看作编程模型和执行框架的软件实现,如Google的专有实现和另一个开源实现Hadoop等。
4 mapper和reducer
键值对儿(Key-value pair)是MapReduce的基础数据结构。Key和Value可以是基础类型数据,如整数、浮点数、字符串或未经加工的字节数据,也可以是任意形式的复杂数据类型。程序员可以自行定义所需的数据类型,也可借助于Protocol Buffer、Thrift或Avro提供的便捷方式完成此类工作。
MapReduce算法设计的工作之一就是在给定数据集上定义“键-值”数据结构,比如在搜索引擎搜集、存储网页类工作中,key可以使用URL来表示,而value则是网页的内容。而在有些算法中,Key也可以是没有任何实际意义的数据,其在数据处理过程中可被安全忽略。在MapReduce中,程序员需要基于如下方式定义mapper和reducer:
map: (k1,v1)--&[(k2,v20)]
reduce: (k2,[v2])--&[(k3,v3)]
其中[...]意味着其可以是一个列表。这些传递给MapReduce进行处理的数据存储于分布式文件上,mapper操作将应用于每一个传递过来的键-值对并生成一定数量的中间键值对(intermediate key-value),而后reduce操作将应用于这些中间键值对并输出最终的键值对。然而,mapper操作和reducer操作之间还隐含着一个应用于中间键值对的“分组”操作,同一个键的键值对需要被归类至同一组中并发送至同一个reducer,而传送给每个reducer的分组中的键值对是基于键进行排序后的列表。reducer生成的结果将会保存至分布式文件系统,并存储为一个或多个以r(即reducer号码)结尾的文件,但mapper生成的中间键值对数据则不会被保存。如下为MapReduce作业过程:
在Hadoop中,mapper和reducer是分别由MAP和REDUCE方法实现的对象。每个map任务(接收一个称作input split的键值对列表)都被初始化一个mapper对象,并会由执行框架为每个输入的键值对调用一次其map方法。程序员可以配置启动的map任务个数,但其真正启动的数目则由执行框架根据数据的物理分布最终给定。类似地,每个reduce任务由REDUCE方法初始化为一个reduce对象,并会由执行框架为其接受的每个中间键值对调用一次REDUCE方法,所不同的是,程序员可以明确限定启动的reduce任务的个数。
mapper和reducer可以直接在各自接收的数据上执行所需要的操作,然而,当使用到外部资源时,多个mapper或reducer之间可能会产生资源竞争,这势必导致其性能下降,因此,程序员必须关注其所用资源的竞争条件并加入适当处理。其次,mapper输出的中间键值对与接受的键值对可以是不同的数据类型,类似地,reducer输出的键值对与其接收的中间键值对也可以是不同的数据类型,这可能会给编程过程及程序运行中的故障排除带来困难,但这也正是MapReduce强大功能的体现之一。
除了常规的两阶段MapReduce处理流外,其还有一些变化形式。比如将mapper输出的结果直接保存至磁盘中(每个mapper对应一个文件)的没有reducer的MapReduce作业(如下图),不过仅有reducer而没有mapper的作业是不允许的。不过,就算用不着reducer处理具体的操作,利用reducer将mapper的输出结果进行重新分组和排序后进行输出也能以另一种形式提供的完整MapReduce模式。
MapReduce作业一般是通过HDFS读取和保存数据,但它也可以使用其它满足MapReduce应用的数据源或数据存储,比如Google的MapReduce实现中使用了Bigtable来完成数据的读入或输出。BigTable属于非关系的数据库,它是一个稀疏的、分布式的、持久化存储的多维度排序Map,其设计目的是可靠的处理PB级别的数据,并且能够部署到上千台机器上。在Hadoop中有一个类似的实现HBase可用于为MapReduce提供数据源和数据存储。
5 Hadoop运行框架
MapReduce程序也称作为MapReduce作业,一般由mapper代码、reducer代码以及其配置参数(如从哪儿读入数据,以及输出数据的保存位置)组成。准备好的作业可通过JobTracker(作业提交节点)进行提交,然后由运行框架负责完成后续的其它任务。这些任务主要包括如下几个方面。
每个MapReduce作业都会划分为多个称作任务(task)的较小单元,而较大的作业划分的任务数量也可能会超出整个集群可运行的任务数,此时就需要调度器程序维护一个任务队列并能够追踪正在运行态任务的相关进程,以便让队列中处于等待状态的任务派送至某转为可用状态的节点运行。此外,调度器还要负责分属于不同作业的任务协调工作。
对于一个运行中的作业来说,只有所用的map任务都完成以后才能将中间数据分组、排序后发往reduce作业,因此,map阶段的完成时间取决于其最慢的一个作业的完成时间。类似的,reduce阶段的最后一个任务执行结束,其最终结果才为可用。因此,MapReduce作业完成速度则由两个阶段各自任务中的掉队者决定,最坏的情况下,这可能会导致作业长时间得不到完成。出于优化执行的角度,Hadoop和Google MapReduce实现了推测执行(Speculative execution)机制,即同一个任务会在不同的主机上启动多个执行副本,运行框架从其最快执行的任务中取得返回结果。不过,推测执行并不能消除其它的滞后场景,比如中间键值对数据的分发速度等。
5.2 数据和代码的协同工作(data/code co-location)
术语“数据分布”可能会带来误导,因为MapReduce尽力保证的机制是将要执行的代码送至数据所在的节点执行,因为代码的数据量通常要远小于要处理的数据本身。当然,MapReduce并不能消除数据传送,比如在某任务要处理的数据所在的节点已经启动很多任务时,此任务将不得不在其它可用节点运行。此时,考虑到同一个机架内的服务器有着较充裕的网络带宽,一个较优选择是从数据节点同一个机架内挑选一个节点来执行此任务。
5.3 同步(Synchronization)
异步环境下的一组并发进程因直接制约而互相发送消息而进行互相合作、互相等待,使得各进程按一定的速度执行的过程称为进程间同步,其可分为进程同步(或者线程同步)和数据同步。就编程方法来说,保持进程间同步的主要方法有内存屏障(Memory barrier),互斥锁(Mutex),信号量(Semaphore)和锁(Lock),管程(Monitor),消息(Message),管道(Pipe)等。MapReduce是通过在map阶段的进程与reduce阶段的进程之间实施隔离来完成进程同步的,即map阶段的所有任务都完成后对其产生的中间键值对根据键完成分组、排序后通过网络发往各reducer方可开始reduce阶段的任务,因此这个过程也称为“shuffle and sort”。
5.4 错误和故障处理(Error and fault handling)
MapReduce运行框架本身就是设计用来容易发生故障的商用服务器上了,因此,其必须有着良好的容错能力。在任何类别的硬件故障发生时,MapReduce运行框架均可自行将运行在相关节点的任务在一个新挑选出的节点上重新启动。同样,在任何程序发生故障时,运行框架也要能够捕获异常、记录异常并自动完成从异常中恢复。另外,在一个较大规模的集群中,其它任何超出程序员理解能力的故障发生时,MapReduce运行框架也要能够安全挺过。
6 partitioner和combiner
除了前述的内容中的组成部分,MapReduce还有着另外两个组件:partiontioner和combiner。
Partitioner负责分割中间键值对数据的键空间(即前面所谓的“分组”),并将中间分割后的中间键值对发往对应的reducer,也即partitioner负责完成为一个中间键值对指派一个reducer。最简单的partitioner实现是将键的hash码对reducer进行取余计算,并将其发往余数对应编号的reducer,这可以尽力保证每个reducer得到的键值对数目大体上是相同的。不过,由于partitioner仅考虑键而不考虑“值”,因此,发往每个reducer的键值对在键数目上的近似未必意味着数据量的近似。
Combiner是MapReduce的一种优化机制,它的主要功能是在“shuffle and sort”之前先在本地将中间键值对进行聚合,以减少在网络上发送的中间键值对数据量。因此可以把combiner视作在“shuffle and sort”阶段之前对mapper的输出结果所进行聚合操作的“mini-reducer”。在实现中,各combiner之间的操作是隔离的,因此,它不会涉及到其它mapper的数据结果。需要注意的是,就算是某combiner可以有机会处理某键相关的所有中间数据,也不能将其视作reducer的替代品,因为combiner输出的键值对类型必须要与mapper输出的键值对类型相同。无论如何,combiner的恰当应用将有机会有效提高作业的性能。
【上篇】【下篇】
您可能还会对这些文章感兴趣!
2017年三月
6789101112
13141516171819
20212223242526
2728293031
同分类最新文章
日志:437篇
评论:296条
分类:44个
标签:268个
链接:10个
网站运行:1530天
最后更新:日mapreduce-combiner函数使用例子代码
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import com.MaxTemperatureM
import com.MaxTemperatureR
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.T
//本程序是使用combiner函数的例子
public class MaxTemperatureWithCombiner {
* @param args
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
if(args.length != 2){
System.out.println(&Usage: MaxTemperatureWithCombiner输入路径& + &输出路径&);
System.exit(-1);
Job job = new Job();
job.setJarByClass(MaxTemperatureWithCombiner.class);
job.setJobName(&Max temperature&);
FileInputFormat.addInputPath(job, new Path(&&));
FileOutputFormat.setOutputPath(job, new Path(&&));
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
System.out.println(job.waitForCompletion(true) ? 0 : 1);
随机推荐程序问答结果
如对文章有任何疑问请提交到,或者您对内容不满意,请您反馈给我们发贴求解。
,机器学习分类整理更新日期:: 15:57:14
如需转载,请注明文章出处和来源网址:
本文WWW.DOC100.NET DOC100.NET版权所有。【Hadoop】一:MapReduce初探 - 简书
【Hadoop】一:MapReduce初探
在python和swift中,map,reduce都是一种高阶函数(还有filter),那么什么是高阶函数呢?这里引用一下廖雪峰大神的总结,高阶函数就是指函数参数可以接收其他函数,还有一种函数叫作偏函数,就是指函数的返回值是一个函数。高阶函数和偏函数的概念可以阅读函数式编程一书得到参考java8的lambda表达式就是一种高阶函数的实现。
好吧,回归正题,那么什么是map,什么又是reduce呢?map:map就是指把输入的数据集中的每一个元素进行处理后输出,这种输入输出通常是键值对形式的。reduce:reduce的英文释义是规约,也就是说reduce函数是将一定的数据集进行循环的处理得到最终的结果。比如找出每个数据集中的最大值。
Mapper和Reducer
Mapper是泛型类型,分别是map函数的输入键,输入值,输出建和输出值,hadoop在java自带的基本类型之上还封装了一套适用于网络序列化传输的基本类型,这些类型位于or.apache.hadoop.io包中,例如Text类型相当于java的String类型。Reducer也是泛型类型,类型参数和Mapper一样,Reducer提供了强大的数据比较和抽取能力。Job是用来指定作业执行规范的,主要包括如下几步:
设置作业完整的类名
设置作业名称
设置输入和输入源(可以来自网络,可以来自文件系统)
设置map(即对数据进行处理的Mapper实现类)和设置reduce(Reducer的实现类)。
新旧api差别
MapReduce的横向(水平)扩展
为了实现横向扩展,需要将数据存储在类似于HDFS的分布式文件系统中。
在MapReduce的作业中,有两类节点控制着作业的执行,一类是jobtracker(作业节点)和一系列的tasktracker(任务节点),一个作业是由多个任务(map任务和reduce任务)组成的。
Hadoop将数据分为大小相等的数据块,每个数据块就是一个分片,并且为每个分片指定一个map任务,由该任务来运行用户自定义的map函数从而处理分片中的每条记录。
分片的大小决定了程序的效率和资源的利用率,如果分片太小,那么管理分片的总时间和构建map任务的时间将会决定整个处理时间,如果分片太大,又没有充分利用map函数提供的处理能力,所以合适的分片大小至关重要。一个合理的分片大小大概是HDFS的一个块的大小。默认是64MB。如果超过了该大小,那么输入源的数据就会存储在两个机架上面,这样就会产生网络传输,降低了效率。
对于map任务,一般运行在输入数据存储的机器上,这样不需要使用网络带宽资源,提高了IO效率。如果该map任务在处理其他的任务,则需要协调其他机架上面的map任务来进行处理,这样还是会产生网络传输。但是这么做,会大大的减少这种情况的发生。
map任务的输出一般都是存储在任务执行的机器上面,因为map输出的数据大多是一个中间数据,只需要临时存储,在传给reduce任务处理之后,就可以删除该中间结果。如果中间数据传输给reduce任务失败,则会在另一个节点上重新运行map任务重新传输从而避免再次失败。
reduce任务可以接收来自多个map任务的输出,这之间复杂的数据传输流称为shuffle。shuffle就是怎样把map task的输出结果有效地传送到reduce端,也可以理解为 Shuffle描述着数据从map task输出到reduce task输入的这段过程。下图是官方的shuffle解释图
shuffle官网解释图
为了减少map和reduce之间的数据传输量,充分利用集群之间的带宽,可以对map的输出执行一次或者多次的combiner函数,相当于对于单个map进行的规约操作(reduce是对于多个map任务的规约).但是combiner的适用场景有限,例如平均数的计算可能就不适用-((x+y+z+a)/4+(b+c)/2)/2!=(a+b+c+x+y+z)/6,所以该函数的适用范围为对分片数据规约不会影响整个结果的场景。
Streaming API
Streaming API 使得我们可以用其他编程语言实现自己的map和reduce函数,例如用c++实现,提高程序的处理效率。该API使用标准输入输出流作为hadoop和不同语言编写的map和reduce任务之间的接口。当使用Streaming时需要在hadoop的运行命令里面指定streaming.jar文件的位置从而让hadoop支持其他语言。
一只欢快的程序猿}

我要回帖

更多关于 mapreduce 任务提交 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信