01 引言
MapReduce是Hadoop生态圈的一部分,也是最核心的一部分,本文来讲解下。
02 MapReduce 概述
2.1 MapReduce 定义
MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架,其核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。
2.2 MapReduce 产生缘由
为什么需要MapReduce?
- 海量数据在单机上处理因为硬件资源限制,无法胜任。
- 而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度。
- 引入MapReduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。
设想一个海量数据场景下的wordcount需求:
- 单机版:内存受限,磁盘受限,运算能力受限
- 分布式:文件分布式存储(HDFS)、运算逻辑需要至少分成2个阶段(一个阶段独立并发,一个阶段汇聚)、运算程序如何分发、程序如何分配运算任务(切片)、两阶段的程序如何启动?如何协调?、整个程序运行过程中的监控?容错?重试?
可见在程序由单机版扩成分布式时,会引入大量的复杂工作。
2.3 MapReduce与Yarn的关系
Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台。而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。
YARN的重要概念:
- yarn并不清楚用户提交的程序的运行机制;
- yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责分配资源);
- yarn中的主管角色叫ResourceManager;
- yarn中具体提供运算资源的角色叫NodeManager;
- 这样一来,yarn其实就与运行的用户程序完全解耦,就意味着yarn上可以运行各种类型的分布式运算程序(MapReduce只是其中的一种),比如MapReduce、storm程序,spark程序,tez……;
- 所以,spark、storm等运算框架都可以整合在yarn上运行,只要他们各自的框架中有符合yarn规范的资源请求机制即可;
- Yarn就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享。
2.4 MapReduce 中的序列化
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系。。。。),不便于在网络中高效传输;所以,hadoop自己开发了一套序列化机制Writable,精简,高效。
简单代码验证两种序列化机制的差别:
public class TestSeri { public static void main(String[] args) throws Exception { //定义两个ByteArrayOutputStream,用来接收不同序列化机制的序列化结果 ByteArrayOutputStream ba = new ByteArrayOutputStream(); ByteArrayOutputStream ba2 = new ByteArrayOutputStream(); //定义两个DataOutputStream,用于将普通对象进行jdk标准序列化 DataOutputStream dout = new DataOutputStream(ba); DataOutputStream dout2 = new DataOutputStream(ba2); ObjectOutputStream obout = new ObjectOutputStream(dout2); //定义两个bean,作为序列化的源对象 ItemBeanSer itemBeanSer = new ItemBeanSer(1000L, 89.9f); ItemBean itemBean = new ItemBean(1000L, 89.9f); //用于比较String类型和Text类型的序列化差别 Text atext = new Text("a"); // atext.write(dout); itemBean.write(dout); byte[] byteArray = ba.toByteArray(); //比较序列化结果 System.out.println(byteArray.length); for (byte b : byteArray) { System.out.print(b); System.out.print(":"); } System.out.println("-----------------------"); String astr = "a"; // dout2.writeUTF(astr); obout.writeObject(itemBeanSer); byte[] byteArray2 = ba2.toByteArray(); System.out.println(byteArray2.length); for (byte b : byteArray2) { System.out.print(b); System.out.print(":"); } } }
如何自定义对象实现MR中的序列化接口?(里面有很多mr的概念,可以阅读完本文再看这里)
- 如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序,此时,自定义的bean实现的接口应该是:
public class FlowBean implements WritableComparable<FlowBean>
需要自己实现的方法是:
/** * 反序列化的方法,反序列化时,从流中读取到的各个字段的顺序应该与序列化时写出去的顺序保持一致 */ @Override public void readFields(DataInput in) throws IOException { upflow = in.readLong(); dflow = in.readLong(); sumflow = in.readLong(); } /** * 序列化的方法 */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upflow); out.writeLong(dflow); //可以考虑不序列化总流量,因为总流量是可以通过上行流量和下行流量计算出来的 out.writeLong(sumflow); } @Override public int compareTo(FlowBean o) { //实现按照sumflow的大小倒序排序 return sumflow>o.getSumflow()?-1:1; }
03 MapReduce 工作原理
3.1 MapReduce 进程
为了提高开发效率,可以将分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。
而MapReduce就是这样一个分布式程序的通用框架,整体结构如下(在分布式运行时有三类实例进程):
- MRAppMaster:负责整个程序的过程调度及状态协调
- MapTask:负责map阶段的整个数据处理流程
- ReduceTask:负责reduce阶段的整个数据处理流程
3.2 MapReduce 运行机制
流程描述如下:
① 一个MR程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的MapTask实例数量,然后向集群申请机器启动相应数量的MapTask进程;
② MapTask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:
- 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对;
- 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存;
- 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件。
③ MRAppMaster监控到所有MapTask进程任务完成之后,会根据客户指定的参数启动相应数量的ReduceTask进程,并告知ReduceTask进程要处理的数据范围(数据分区);
④ ReduceTask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台MapTask运行所在机器上获取到若干个MapTask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储。
3.2.1 MapTask 并行度决定机制
从上面的MapReduce运行流程可以知道,一个job的map阶段的并行度由客户端在提交job时决定,而客户端对map阶段并行度的规划的基本逻辑为:
- 将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split)
- 然后每一个split分配一个mapTask并行实例处理;
- 这段逻辑及形成的切片规划描述文件,由FileInputFormat实现类的getSplits()方法完成。
3.2.1.1 FileInputFormat切片机制
FileInputFormat切片机制原理如下图:
3.2.1.2 FileInputFormat切片步骤
Step1: 切片定义在InputFormat类中的getSplit()方法;
Step2: FileInputFormat中默认的切片机制:
- 简单地按照文件的内容长度进行切片
- 切片大小,默认等于block大小
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
比如待处理数据有两个文件:file1.txt(320M)、file2.txt (10M),经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.txt.split1-- 0~128 file1.txt.split2-- 128~256 file1.txt.split3-- 256~320 file2.txt.split1-- 0~10M
Step3: FileInputFormat中切片的大小的参数配置
通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize));切片主要由这几个值来运算决定:
- minsize(切片最小值,默认值:1 ):参数调的比blockSize大,则可以让切片变得比blocksize还大,配置参数: mapreduce.input.fileinputformat.split.minsize
- maxsize(切片最大值,默认值:Long.MAXValue ):参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值,配置参数:mapreduce.input.fileinputformat.split.maxsize
- blocksize:切片大小
3.2.2 MapTask 并行度调优
选择并发数的影响因素:
- 运算节点的硬件配置
- 运算任务的类型:CPU密集型还是IO密集型
- 运算任务的数据量
举例:
- 如果硬件配置为2*12core + 64G,恰当的map并行度是大约每个节点20-100个map,最好每个map的执行时间至少一分钟。
- 如果job的每个map或者 reduce task的运行时间都只有30-40秒钟,那么就减少该job的map或者reduce数,每一个task(map|reduce)的setup和加入到调度器中进行调度,这个中间的过程可能都要花费几秒钟,所以如果每个task都非常快就跑完了,就会在task的开始和结束的时候浪费太多的时间。
配置task的jvm重用可以改善该问题:
- mapred.job.reuse.jvm.num.tasks,默认是1,表示一个JVM上最多可以顺序执行的task数目(属于同一个Job)是1,也就是说一个task启一个JVM);
- 如果input的文件非常的大,比如1TB,可以考虑将hdfs上的每个block size设大,比如设成256MB或者512MB
3.2.3 ReduceTask 并行度决定机制
ReduceTask的并行度同样影响整个job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:
//默认值是1,手动设置为4 job.setNumReduceTasks(4);
注意:
- 如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜。
- ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask
- 尽量不要运行太多的ReduceTask,对大多数job来说,最好reduce的个数最多和集群中的reduce持平,或者比集群的 reduce slots小 ,这个对于小集群而言,尤其重要。
3.2.4 Shuffle机制
Shuffle机制:指的是map阶段处理的数据传递给reduce的一个流程。核心是数据分区、排序及缓存,就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序;
Shuffle分为3个步骤操作:
- 分区partition
- Sort根据key排序
- Combiner进行局部value的合并
详细流程:
- MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中;
- 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件;
- 多个溢出文件会被合并成大的溢出文件;
- 在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序;
- ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据;
- ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序);
- 合并成大文件后,shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)。
Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快 。
缓冲区的大小可以通过参数调整, 参数:io.sort.mb 默认100M。
3.2.5 Combiner使用原则
Combiner的使用要非常谨慎,因为Combiner在MapReduce过程中可能调用也肯能不调用,可能调一次也可能调多次。
所以Combiner使用的原则是,有或没有都不能影响业务逻辑。
- combiner是MR程序中Mapper和Reducer之外的一种组件。
- combiner组件的父类就是Reducer
- combiner和reducer的区别在于运行的位置:
|__3.1、Combiner是在每一个maptask所在的节点运行
|__3.2、Reducer是接收全局所有Mapper的输出结果;
- combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量,具体实现步骤:
|___4.1、自定义一个combiner继承Reducer,重写reduce方法
|___4.2 、在job中设置:job.setCombinerClass(CustomCombiner.class)
- combiner能够应用的前提是不能影响最终的业务逻辑,而且combiner的输出kv应该跟reducer的输入kv类型要对应起来。
04 MapReduce 编程
4.1 MapReduce 编程规范
作为程序员,对MapReduce这一块,用户编写的程序分成三个部分:Mapper,Reducer,Driver,提交运行MR程序客户端。
Mapper部分:
- Mapper的输入数据是KV对的形式(KV的类型可自定义);
- Mapper的输出数据是KV对的形式(KV的类型可自定义);
- Mapper中的业务逻辑写在map()方法中;
- map()方法(MapTask进程)对每一个调用一次。
Reducer部分:
- Reducer的输入数据类型对应Mapper的输出数据类型,也是KV;
- Reducer的业务逻辑写在reduce()方法中;
- Reducetask进程对每一组相同k的组调用一次reduce()方法;
- 用户自定义的Mapper和Reducer都要继承各自的父类。
Drvier部分: 整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象。
4.2 WordCount示例
需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数。
Step1:定义一个mapper类
//首先要定义四个泛型的类型 //key in: LongWritable value in: Text //key out: Text value out:IntWritable public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ //map方法的生命周期: 框架每传一行数据就被调用一次 //key : 这一行的起始点在文件中的偏移量 //value: 这一行的内容 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拿到一行数据转换为string String line = value.toString(); //将这一行切分出各个单词 String[] words = line.split(" "); //遍历数组,输出<单词,1> for(String word:words){ context.write(new Text(word), new IntWritable(1)); } } }
Step2:定义一个Reducer类
//生命周期:框架每传递进来一个kv 组,reduce方法被调用一次 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //定义一个计数器 int count = 0; //遍历这一组kv的所有v,累加到count中 for(IntWritable value:values){ count += value.get(); } context.write(key, new IntWritable(count)); } }
Step3:定义一个主类,用来描述job并提交job
public class WordCountRunner { //把业务逻辑相关的信息(哪个是mapper,哪个是reducer,要处理的数据在哪里,输出的结果放哪里……)描述成一个job对象 //把这个描述好的job提交给集群去运行 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job wcjob = Job.getInstance(conf); //指定我这个job所在的jar包 // wcjob.setJar("/home/hadoop/wordcount.jar"); wcjob.setJarByClass(WordCountRunner.class); wcjob.setMapperClass(WordCountMapper.class); wcjob.setReducerClass(WordCountReducer.class); //设置我们的业务逻辑Mapper类的输出key和value的数据类型 wcjob.setMapOutputKeyClass(Text.class); wcjob.setMapOutputValueClass(IntWritable.class); //设置我们的业务逻辑Reducer类的输出key和value的数据类型 wcjob.setOutputKeyClass(Text.class); wcjob.setOutputValueClass(IntWritable.class); //指定要处理的数据所在的位置 FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt"); //指定处理完成之后的结果所保存的位置 FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hdp-server01:9000/wordcount/output/")); //向yarn集群提交这个job boolean res = wcjob.waitForCompletion(true); System.exit(res?0:1); }
05 MapReduce 运行模式
5.1 本地运行模式
MapReduce本地运行模式如下:
- MapReduce程序是被提交给LocalJobRunner在本地以单进程的形式运行;
- 处理的数据及输出结果可以在本地文件系统,也可以在HDFS上;
- 如果要在本地运行,写一个程序,不要带集群的配置文件(本质是你的mr程序的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname参数);
- 本地模式非常便于进行业务逻辑的debug,只要在开发环境中打断点即可。
如果在windows下想运行本地模式来测试程序逻辑,需要在windows中配置环境变量,并且要将d:/hadoop-2.6.1的lib和bin目录替换成windows平台编译的版本:
%HADOOP_HOME% = d:/hadoop-2.6.1 %PATH% = %HADOOP_HOME%\bin
5.2 集群运行模式
Step1:将MapReduce程序提交给yarn集群ResourceManager,分发到很多的节点上并发执行;
Step2:处理的数据和输出结果应该位于HDFS文件系统;
Step3:提交集群的实现步骤
- 将程序打成JAR包,然后在集群的任意一个节点上用hadoop命令启动(命令: $ hadoop jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver inputpath outputpath
);
- 直接运行main方法(项目中要带参数:mapreduce.framework.name=yarn以及yarn的两个基本配置);
- 如果要在开发环境提交job给集群,则要修改YarnRunner类。
06 MapReduce 参数优化
6.1 资源相关参数
- mapreduce.map.memory.mb: 一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。
- mapreduce.reduce.memory.mb: 一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。
- mapreduce.map.java.opts: Map Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g.“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc” (@taskid@会被Hadoop框架自动换为相应的taskid), 默认值: “”
- mapreduce.reduce.java.opts: Reduce Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g.“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”, 默认值: “”
- mapreduce.map.cpu.vcores: 每个Map task可使用的最多cpu core数目, 默认值: 1
6.2 容错相关参数
- mapreduce.map.maxattempts: 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
- mapreduce.reduce.maxattempts: 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
- mapreduce.map.failures.maxpercent: 当失败的Map Task失败比例超过该值为,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业扔认为成功。
- mapreduce.reduce.failures.maxpercent: 当失败的Reduce Task失败比例超过该值为,整个作业则失败,默认值为0.
- mapreduce.task.timeout: Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是300000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。
6.3 本地运行参数
设置以下几个参数:
mapreduce.framework.name=local mapreduce.jobtracker.address=local fs.defaultFS=local
6.4 效率和稳定性相关参数
- mapreduce.map.speculative: 是否为Map Task打开推测执行机制,默认为false
- mapreduce.reduce.speculative: 是否为Reduce Task打开推测执行机制,默认为false
- mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:当同一个class同时出现在用户jar包和hadoop jar中时,优先使用哪个jar包中的class,默认为false,表示优先使用hadoop jar中的class。
- mapreduce.input.fileinputformat.split.minsize: 每个Map Task处理的数据量(仅针对基于文件的Inputformat有效,比如TextInputFormat,SequenceFileInputFormat),默认为一个block大小,即 134217728。
07 MapReduce应用案例
之前写过MapReduce的应用案例,有兴趣的同学可以参阅下:
- 《MapReduce中的排序初步》
- 《MapReduce中的分区Partitioner》
- 《MapReduce数据压缩》
- 《MapReduce的reduce端join算法实现》
- 《MapReduce的map端join算法实现》
- 《MapReduce的 web日志预处理》
- 《MapReduce自定义inputFormat》
- 《MapReduce自定义outputFormat》
- 《MapReduce自定义GroupingComparator》
- 《MapReduce中的DistributedCache应用》
- 《MapReduce中的其他应用》
08 文末
本文主要讲解了MapReduce的基本概念,谢谢大家的阅读,本文完!