1、Mapper 类
* Maps input key/value pairs to a set of intermediate key/value pairs.
*
* <p>Maps are the individual tasks which transform input records into a
* intermediate records. The transformed intermediate records need not be of
* the same type as the input records. A given input pair may map to zero or
* many output pairs.</p>
*
* <p>The Hadoop Map-Reduce framework spawns one map task for each
* {@link InputSplit} generated by the {@link InputFormat} for the job.
* <code>Mapper</code> implementations can access the {@link Configuration} for
* the job via the {@link JobContext#getConfiguration()}.
*
* <p>The framework first calls
* {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
* {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)}
* for each key/value pair in the <code>InputSplit</code>. Finally
* {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)} is called.</p>
*
* <p>All intermediate values associated with a given output key are
* subsequently grouped by the framework, and passed to a {@link Reducer} to
* determine the final output. Users can control the sorting and grouping by
* specifying two key {@link RawComparator} classes.</p>
*
* <p>The <code>Mapper</code> outputs are partitioned per
* <code>Reducer</code>. Users can control which keys (and hence records) go to
* which <code>Reducer</code> by implementing a custom {@link Partitioner}.
*
* <p>Users can optionally specify a <code>combiner</code>, via
* {@link Job#setCombinerClass(Class)}, to perform local aggregation of the
* intermediate outputs, which helps to cut down the amount of data transferred
* from the <code>Mapper</code> to the <code>Reducer</code>.
*
* <p>Applications can specify if and how the intermediate
* outputs are to be compressed and which {@link CompressionCodec}s are to be
* used via the <code>Configuration</code>.</p>
*
* <p>If the job has zero
* reduces then the output of the <code>Mapper</code> is directly written
* to the {@link OutputFormat} without sorting by keys.</p>
*
* <p>Example:</p>
* <p><blockquote><pre>
* public class TokenCounterMapper
* extends Mapper<Object, Text, Text, IntWritable>{
*
* private final static IntWritable one = new IntWritable(1);
* private Text word = new Text();
*
* public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
* StringTokenizer itr = new StringTokenizer(value.toString());
* while (itr.hasMoreTokens()) {
* word.set(itr.nextToken());
* context.write(word, one);
* }
* }
* }
* </pre></blockquote>
*
* <p>Applications may override the
* {@link #run(org.apache.hadoop.mapreduce.Mapper.Context)} method to exert
* greater control on map processing e.g. multi-threaded <code>Mapper</code>s
* etc.</p>
将输入键/值对映射到一组中间键/值对。
<p> map是将输入记录转换为a
中间记录。 转换后的中间记录不必是
与输入记录的类型相同。 给定的输入对可以映射为零或
</p> . txt / /输出>
*
* <p> Hadoop map - reduce框架为每个映射生成一个映射任务
* {@link InputFormat}为作业生成的{@link InputSplit}。
* <code>Mapper</code>实现可以访问{@link Configuration}
*该任务通过{@link JobContext#getConfiguration()}。
** mapper调用流程
* <p>框架首先调用
* {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)},然后是
* {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)}
*为<code>InputSplit</code>. value >中的每个键/值对。 最后
* {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)}被调用
*
* <p>所有与给定输出键相关联的中间值是
*随后被框架分组,并传递给{@link Reducer}
*确定最终输出。 用户可以通过控制排序和分组
</p> . *指定两个键{@link RawComparator}类
** 1、Partioner分区
* <p> <code>Mapper</code> output are partitioned per . / <p> <code>Mapper</code> output
* <代码>减速器> < /代码。 用户可以控制去哪个键(以及记录)
<code>Reducer</code>通过实现一个自定义的{@link Partitioner}。
*** 2、数据预合并Combiner
* <p>用户可以选择指定一个<code>合成器</code>,通过
* {@link Job#setCombinerClass(Class)},执行局部聚合
*中间输出,有助于减少传输的数据量
从<code>Mapper</code>到<code>Reducer</code>. *
** 3、压缩Compression
* <p>应用程序可以指定是否以及如何使用中间体
*输出将被压缩,哪个{@link CompressionCodec}将被压缩
*通过<code>Configuration</code>.</p> . Configuration
Mapper核心调用
Reducer 类
* Reduces a set of intermediate values which share a key to a smaller set of
* values.
*
* <p><code>Reducer</code> implementations
* can access the {@link Configuration} for the job via the
* {@link JobContext#getConfiguration()} method.</p>
* <p><code>Reducer</code> has 3 primary phases:</p>
* <ol>
* <li>
*
* <b id="Shuffle">Shuffle</b>
*
* <p>The <code>Reducer</code> copies the sorted output from each
* {@link Mapper} using HTTP across the network.</p>
* </li>
*
* <li>
* <b id="Sort">Sort</b>
*
* <p>The framework merge sorts <code>Reducer</code> inputs by
* <code>key</code>s
* (since different <code>Mapper</code>s may have output the same key).</p>
*
* <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
* being fetched they are merged.</p>
*
* <b id="SecondarySort">SecondarySort</b>
*
* <p>To achieve a secondary sort on the values returned by the value
* iterator, the application should extend the key with the secondary
* key and define a grouping comparator. The keys will be sorted using the
* entire key, but will be grouped using the grouping comparator to decide
* which keys and values are sent in the same call to reduce.The grouping
* comparator is specified via
* {@link Job#setGroupingComparatorClass(Class)}. The sort order is
* controlled by
* {@link Job#setSortComparatorClass(Class)}.</p>
*将一组共享密钥的中间值减少到更小的一组
*值。
*
* < p > <代码>减速器> < /代码的实现
. *可以访问任务的{@link配置}
* {@link JobContext#getConfiguration()}方法
<p><code>Reducer</code>有3个主要阶段:</p>
* < ol >
李* < >
*
改组* < b id = "洗牌" > < / b >
*
* <p> <code>Reducer</code>从每个
* {@link Mapper}使用HTTP跨网络
李* < / >
*
李* < >
* < b id = "排序" > < / b >排序
*
* <p>框架合并排序<代码>Reducer</代码>输入
* <代码>关键代码> < / s
*(因为不同的<code>Mapper</code>s可能输出相同的键)
*
* <p> shuffle和sort阶段同时发生,即当输出是
</p> . txt > </p> . txt
*
* < b id = " SecondarySort " > SecondarySort < / b >
*
* <p>对返回的值进行二级排序
*迭代器时,应用程序应该使用secondary扩展键
键并定义一个分组比较器。 索引键将被排序
*整个键,但将使用分组比较器分组决定
在同一个reduce调用中发送哪些键和值。 分组
*比较器通过
* {@link工作# setGroupingComparatorClass(类)}。 排序顺序是
*控制
* {@link工作# setSortComparatorClass(类)}。< / p >
Reducer核心调用
大致处理流程
提交任务执行流程
注意:上节 我们讲述了 任务对提交流程
这次看看Map和Reduce执行流程
我们的Driver 类
/**
* wordCount driver
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取job实例
Job job = Job.getInstance();
// 设置 驱动类
job.setJarByClass(WordCountDriver.class);
// 关联map和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置预聚合combiner
job.setCombinerClass(WordCountCombiner.class);
System.out.println();
// 设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 设置分区类,自定义按条件将数据输出分区
job.setPartitionerClass(CustomPartitioner.class);
// 设置ReduceTask数量
job.setNumReduceTasks(4);
// 设置总输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 设置文件的输出路径和输出路径
FileInputFormat.setInputPaths(job,"G:\\input\\wordCountInput");
FileOutputFormat.setOutputPath(job,new Path("G:\\output\\wordOutput11"));
// 提交job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
partitioner 自定义分区类
Combiner 预结合类
Mapper 类
reducer 类
如何执行?
走到自定义的Mapper
重点: 从这里我们可以看到,partitioner的执行时机是在MapTask中context.write()之后 ,将数据输出到collector环形缓存区之前,确定数据的分区,reduceTask之前
现在进入环形缓冲区collector
public synchronized void collect(K key, V value, final int partition
) throws IOException {
reporter.progress();
** 校验keyClass是否与定义的一致
if (key.getClass() != keyClass) {
throw new IOException("Type mismatch in key from map: expected "
+ keyClass.getName() + ", received "
+ key.getClass().getName());
}
** 校验ValueClass是否与定义的一致
if (value.getClass() != valClass) {
throw new IOException("Type mismatch in value from map: expected "
+ valClass.getName() + ", received "
+ value.getClass().getName());
}
** 判断partitioner分区的合法性
if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")");
}
** 校验是否出现spill溢写异常
checkSpillException();
bufferRemaining -= METASIZE;
** 判断环形缓冲区, 缓存是否80M已经用完,如果用完,开始溢写
if (bufferRemaining <= 0) {
// start spill if the thread is not running and the soft limit has been
** 使用可重入锁ReentrantLock 锁住这段代码
// reached
spillLock.lock();
try {
do {
if (!spillInProgress) {
final int kvbidx = 4 * kvindex;
final int kvbend = 4 * kvend;
// serialized, unspilled bytes always lie between kvindex and
// bufindex, crossing the equator. Note that any void space
// created by a reset must be included in "used" bytes
final int bUsed = distanceTo(kvbidx, bufindex);
final boolean bufsoftlimit = bUsed >= softLimit;
if ((kvbend + METASIZE) % kvbuffer.length !=
equator - (equator % METASIZE)) {
// spill finished, reclaim space
resetSpill();
bufferRemaining = Math.min(
distanceTo(bufindex, kvbidx) - 2 * METASIZE,
softLimit - bUsed) - METASIZE;
continue;
} else if (bufsoftlimit && kvindex != kvend) {
// spill records, if any collected; check latter, as it may
// be possible for metadata alignment to hit spill pcnt
startSpill();
final int avgRec = (int)
(mapOutputByteCounter.getCounter() /
mapOutputRecordCounter.getCounter());
// leave at least half the split buffer for serialization data
// ensure that kvindex >= bufindex
final int distkvi = distanceTo(bufindex, kvbidx);
final int newPos = (bufindex +
Math.max(2 * METASIZE - 1,
Math.min(distkvi / 2,
distkvi / (METASIZE + avgRec) * METASIZE)))
% kvbuffer.length;
setEquator(newPos);
bufmark = bufindex = newPos;
final int serBound = 4 * kvend;
// bytes remaining before the lock must be held and limits
// checked is the minimum of three arcs: the metadata space, the
// serialization space, and the soft limit
bufferRemaining = Math.min(
// metadata max
distanceTo(bufend, newPos),
Math.min(
// serialization max
distanceTo(newPos, serBound),
// soft limit
softLimit)) - 2 * METASIZE;
}
}
} while (false);
} finally {
** 释放锁
spillLock.unlock();
}
}
try {
** 序列化key
// serialize key bytes into buffer
int keystart = bufindex;
keySerializer.serialize(key);
if (bufindex < keystart) {
// wrapped the key; must make contiguous
bb.shiftBufferedKey();
keystart = 0;
}
** 序列化value
// serialize value bytes into buffer
final int valstart = bufindex;
valSerializer.serialize(value);
// It's possible for records to have zero length, i.e. the serializer
// will perform no writes. To ensure that the boundary conditions are
// checked and that the kvindex invariant is maintained, perform a
// zero-length write into the buffer. The logic monitoring this could be
// moved into collect, but this is cleaner and inexpensive. For now, it
// is acceptable.
** 将数据写入缓冲区
bb.write(b0, 0, 0);
// the record must be marked after the preceding write, as the metadata
// for this record are not yet written
int valend = bb.markRecord();
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(
distanceTo(keystart, valend, bufvoid));
// write accounting info
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
// advance kvindex
kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value, partition);
mapOutputRecordCounter.increment(1);
return;
}
}
1、bufferRemaining 默认80M,环形缓冲区,溢写阈值
mapTask循环将数据写入到环形缓冲区之后,自定义map走完,关闭环形缓冲区
private void sortAndSpill() throws IOException, ClassNotFoundException,
InterruptedException {
//approximate the length of the output file to be the length of the
//buffer + header lengths for the partitions
final long size = distanceTo(bufstart, bufend, bufvoid) +
partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
FSDataOutputStream partitionOut = null;
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
final int mstart = kvend / NMETA;
final int mend = 1 + // kvend is a valid record
(kvstart >= kvend
? kvstart
: kvmeta.capacity() + kvstart) / NMETA;
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
int spindex = mstart;
final IndexRecord rec = new IndexRecord();
final InMemValBytes value = new InMemValBytes();
for (int i = 0; i < partitions; ++i) {
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
** 判断是否定义了Combiner,我们自己定义了
if (combinerRunner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
final int kvoff = offsetFor(spindex % maxRec);
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value);
++spindex;
}
} else {
int spstart = spindex;
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec)
+ PARTITION) == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
** 走自定义的combiner
combinerRunner.combine(kvIter, combineCollector);
}
}
// close the writer
writer.close();
if (partitionOut != out) {
partitionOut.close();
partitionOut = null;
}
// record offsets
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
spillRec.putIndex(rec, i);
writer = null;
} finally {
if (null != writer) writer.close();
}
}
if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info("Finished spill " + numSpills);
++numSpills;
} finally {
if (out != null) out.close();
if (partitionOut != null) {
partitionOut.close();
}
}
}
走到我们自定义的Combiner
重点:我们从这里可以看到combiner的执行时机: 环形缓冲区的数据 排序溢写 到 本地临时文件之前,注意:此时数据已经按Key排序好了,数据溢写到本地文件之前,对数据进行一个提前预聚合combiner。
注意:如果使用了combiner,就不要使用自定义的Reducer不然会导致最后没有数据
接下来走到我们自定义Reducer
initialize
第一阶段 COPY
第二阶段sort
第三阶段调用reduce
JobRunner就是我们之前确定的clientProtocol,它分别调用mapTask的run(),和ReduceTask的Run()
最终输出的结果
我们定义了4个分区,一个分区一个ReduceTask处理数据,而一个reduceTask会输出到一个文件,因此4个文件
在看看我们的输入文件,统计单词个数
整个流程如下: