Shuffle过程详解
shuffer是一个网络拷贝的过程,是指通过网络把数据从map端拷贝到reduce端的过程.
map阶段
最左边有一个inputsplit,最终会产生一个map任务,map任务在执行的时候会k1,v1转化为k2,v2,这些数据会先临时存储到一个内存缓冲区中,这个内存缓冲区的大小默认是100M
(io.sort.mb属性),当达到内存缓冲区大小的80%(io.sort.spill.percent)也就是80M的时候,会把内存中的数据溢写到本地磁盘中(mapred.local.dir),一直到map把所有的数据都计算完,最后会把内存缓冲区中的数据一次性全部刷新到本地磁盘文件中,在这个图里面表示产生了3个临时文件,每个临时文件中有3个分区,这是由于map阶段中对数据做了分区,所以数据在存储的时候,在每个临时文件中也分为了3块
reduce阶段
最后需要对这些临时文件进行合并,合并为一个大文件,因为一个map任务最终只会产生一个文件,这个合并之后的文件也是有3个分区的
这3个分区的数据会被shuffle线程分别拷贝到三个不同的reduce节点,图里面只显示了一个reduce节 点,下面还有两个没有显示。不同map任务中的相同分区的数据会在同一个reduce节点进行合并,合并 以后会执行reduce的功能,最终产生结果数据。
在这里shuffle其实是横跨map端和reduce端的,它主要是负责把map端产生的数据通过网络拷贝到
reduce阶段进行统一聚合计算。
Hadoop中序列化机制
开发MapReduce程序的时候使用到了LongWritable和Text这些数据类型,这些数据类 的是Java中的Long和String,那MapReduce为什么不直接使用Java中的这些数据类型呢?
因为java的反序列化和序列化效能不如hadoop,所以hadoop重写了序列化相关代码.
mapreduce的过程中当程序在向磁盘中写数据以及从磁盘中读取数据的时候会对数据进行序列化和反序列化,磁盘io这些步骤我们省略不了,但是我们可以从序列化和反序列化这一块来着手做一些优化
什么是序列化和反序列化?
把内存中的数据写入到文件中的时候,会对数据序列化,然后再写入,这个序列化其实就是把内存中的对象信息转成二进制的形式,方便存储到文件中,默认java中的序列化会把对象及其父类、超类的整个继承体系信息都保存下来,这样存储的信息太大了,就会导致写入文件的信息过大,这写入是会额外消耗性能的。
反序列化也是一样,reduce端想把文件中的对象信息加载到内存中,如果文件很大,在加载的时候也额外消耗很多性能,所以如果我们把对象存储的信息尽量精简,那么就可以提高数据写入和读取消耗的性能。
基于此, hadoop官方实现了自己的序列化和反序列化机制, 没有使用java中的序列化机制, 所
hadoop中的数据类型没有沿用java中的数据类型,而是自己单独设计了一些writable的实现了,例如 longwritable、text等
java序列化
public class JavaSerialize { public static void main(String[] args) throws Exception{ //创建Student对象,并设置id和name属性 StudentJava studentJava = new StudentJava(); studentJava.setId(1L); studentJava.setName("Hadoop"); //将Student对象的当前状态写入本地文件中 FileOutputStream fos = new FileOutputStream("D:\\student_java.txt"); ObjectOutputStream oos = new ObjectOutputStream(fos); oos.writeObject(studentJava); oos.close(); fos.close(); } } class StudentJava implements Serializable{ private static final long serialVersionUID = 1L; private Long id; private String name; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
hadoop序列化
public class HadoopSerialize { public static void main(String[] args) throws Exception{ //创建Student对象,并设置id和name属性 StudentWritable studentWritable = new StudentWritable(); studentWritable.setId(1L); studentWritable.setName("Hadoop"); //将Student对象的当前状态写入本地文件中 FileOutputStream fos = new FileOutputStream("D:\\student_hadoop.txt"); ObjectOutputStream oos = new ObjectOutputStream(fos); studentWritable.write(oos); oos.close(); fos.close(); } } class StudentWritable implements Writable{ private Long id; private String name; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public void write(DataOutput out) throws IOException { out.writeLong(this.id); out.writeUTF(this.name); } @Override public void readFields(DataInput in) throws IOException { this.id = in.readLong(); this.name = in.readUTF(); } }
这段代码执行会报错:
因为在pom中hadoop-client的scope 标签设定为provide代表只有编译的时候使用.
注释掉这个标签执行成功.
两种反序列化的比较
InputFormat分析
fileFormat源代码
public List<InputSplit> getSplits(JobContext job) throws IOException { StopWatch sw = new StopWatch().start(); //获取InputSplit的size的最小值minSize和最大值maxSize /* getFormatMinSplitSize()=1 getMinSplitSize(job)=0 所以最终minSize=1 */ long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); /* getMaxSplitSize(job)=Long.MAX_VALUE 所以maxSize等于Long的最大值 */ long maxSize = getMaxSplitSize(job); // 创建List,准备保存生成的InputSplit List<InputSplit> splits = new ArrayList<InputSplit>(); //获取输入文件列表 List<FileStatus> files = listStatus(job); /* !getInputDirRecursive(job) = !false = true job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, 所以ignoreDirs=false */ boolean ignoreDirs = !getInputDirRecursive(job) && //迭代输入文件列表 for (FileStatus file: files) { //是否忽略子目录,默认不忽略 if (ignoreDirs && file.isDirectory()) { continue; } //获取 文件/目录 路径 Path path = file.getPath(); //获取 文件/目录 长度 long length = file.getLen(); if (length != 0) { //保存文件的Block块所在的位置 BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } //判断文件是否支持切割,默认为true if (isSplitable(job, path)) { //获取文件的Block大小,默认128M long blockSize = file.getBlockSize(); //计算split的大小 splitSize = Math.max(1, Math.min(Long.MAX_VALUE, 128))=128M=134217728 所以我们说默认情况下split逻辑切片的大小和Block size相等 */ long splitSize = computeSplitSize(blockSize, minSize, maxSize); //还需要处理的文件剩余字节大小,其实就是这个文件的原始大小long bytesRemaining = length; // /* SPLIT_SLOP = 1.1 文件剩余字节大小/1134217728【128M】 > 1.1 意思就是当文件剩余大小bytesRemaining与splitSize的比值还大于1.1的时候,就继否则,剩下的直接作为一个InputSplit 敲黑板,划重点:只要bytesRemaining/splitSize<=1.1就会停止划分,将剩下的作 */ while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); //组装InputSplit /* 生成InputSplit path:路径 length-bytesRemaining:起始位置splitSize:大小 blkLocations[blkIndex].getHosts()和blkLocations[blkIndex].getCached makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()) */ splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } //最后会把bytesRemaining/splitSize<=1.1的那一部分内容作为一个InputSplit if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable //如果文件不支持切割,执行这里if (LOG.isDebugEnabled()) { // Log only if the file is big enough to be splitted if (length > Math.min(file.getBlockSize(), minSize)) { LOG.debug("File is not splittable so no parallelization " + "is possible: " + file.getPath()); } } //把不支持切割的文件整个作为一个InputSplit splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts( blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); sw.stop();
下面我们来看几个面试题?
1.一个1G的文件,会产生多少个map任务?
Block块默认是128M,所以1G的文件会产生8个Block块
默认情况下InputSplit的大小和Block块的大小一致,每一个InputSplit会产生一个map任务 所以:1024/128=8个map任务
2.1000个文件,每个文件100KB,会产生多少个map任务?
一个文件,不管再小,都会占用一个block,所以这1000个小文件会产生1000个Block, 那最终会产生1000个InputSplit,也就对应着会产生1000个map任务
3.一个140M的文件,会产生多少个map任务? 根据前面的分析
140M的文件会产生2个Block,那对应的就会产生2个InputSplit了? 注意:这个有点特殊,140M/128M=1.09375<1.1
所以,这个文件只会产生一个InputSplit,也最终也就只会产生1个map 任务。这个文件其实再稍微大1M就可以产生2个map 任务了。
createRecordReader分析
createRecordReader的具体实现是在TextInputFormat类中
试想一个问题: 如果逻辑切分InputSplit时,一行数据横跨两个InputSplit时该如何处理?
hadoop中是这样处理的:
如果这个InputSplit不是第一个InputSplit,我们将会丢掉读取出来的第一行因为我们总是通过next()方法多读取一行(会多读取下一个InputSplit的第一行)
//如果start不等于0,表示不是第一个InputSplit,所以就把start的值重置为第二行的起始 if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); }
源码解读
@Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { // 获取换行符,默认没有配置。 String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); // 创建一个行阅读器 return new LineRecordReader(recordDelimiterBytes); }
创建行阅读器之前会执行这个初始化方法。
//初始化方法 public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { //获取传过来的InputSplit,将InputSplit转换成子类FileSplit FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); //MAX_LINE_LENGTH对应的参数默认没有设置,所以会取Integer.MAX_VALUE this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); //获取InputSplit的起始位置 start = split.getStart(); //获取InputSplit的结束位置 end = start + split.getLength(); //获取InputSplit的路径 final Path file = split.getPath(); // open the file and seek to the start of the split // 打开文件,并且跳到InputSplit的起始位置 final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); // 获取文件的压缩信息 CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null!=codec) { //如果文件是压缩文件,则执行if中的语句 isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { if (start != 0) { // So we have a split that is only part of a file stored using // a Compression codec that cannot be split. throw new IOException("Cannot seek in " + codec.getClass().getSimpleName() + " compressed stream"); } in = new SplitLineReader(codec.createInputStream(fileIn, decompressor), job, this.recordDelimiterBytes); filePosition = fileIn; } } else { //如果文件是未压缩文件(普通文件),则执行else中的语句 //跳转到文件中的指定位置 fileIn.seek(start); //针对未压缩文件,创建一个阅读器读取一行一行的数据 in = new UncompressedSplitLineReader( fileIn, job, this.recordDelimiterBytes, split.getLength()); filePosition = fileIn; } // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. /* 注意:如果这个InputSplit不是第一个InputSplit,我们将会丢掉读取出来的第一行因为我们总是通过next()方法多读取一行(会多读取下一个InputSplit的第一行) 这就解释了这个问题:如果一行数据被拆分到了两个InputSplit中,会不会有问题? */ if (start != 0) { //如果start不等于0,表示不是第一个InputSplit,所以就把start的值重置为第二行的起始 start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; }
/* 这个方法是核心的方法,会被框架调用,每调用一次,就会读取一行数据,最终获取到我们之前 */ public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } // k1 就是每一行的起始位置 key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { if (pos == 0) { newSize = skipUtfByteOrderMark(); } else { //读取一行数据,赋值给value,也就是v1 newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; } if ((newSize == 0) || (newSize < maxLineLength)) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0) { key = null; value = null; return false; } else { return true; } }
OutPutFormat分析
源代码位置
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { boolean isCompressed = getCompressOutput(job); String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t"); if (!isCompressed) { Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); // create the named codec CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job); // build the filename including the extension Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); return new LineRecordWriter<K, V>(new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator); } }
分析完源码能回答的问题
1.切分inputSplit 和block有什么联系?
2. 1个mapreduce 任务会生成几个map任务?
3. 针对一行文件被切分到多个inputSplit中的情况,mapreduce如何处理?