移动计算,而不是移动数据
拿到原始数据;按照空格切分;key-value化;按照key分组,组间排序;把Key相同的,value累加;最后合并
Input:
全名是 input formater;过来的数据应该用什么程序解析
如 textInputFormat、tableInputFormat、SequenceFileInputFormat
Splitting
按块切分; 因为数据按块存储;这样才能做到移动计算而不移动数据,把计算程序复制到DN上
在执行mapreduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对),map会依次处理每一个记录。(Key:偏移量,不是行数)
Map和Reduce的输入数据需要 Key-value 样式,所以,需要RR
RecordReader(RR)
XML需要自己动定义key-value转换规则
TXT:key,偏移量; value是偏移量对应的一整行
Mapping
Combiner
Partitoner & Shuffle & Sort
shuffling
按照Key分组;
再按照key进行排序
看后边的图
Reducing
Final Result (OutputFormat)
Shuffle的正常意思是洗牌或弄乱,如果你不知道MapReduce里Shuffle是什么,那么请看这张图:
每个map task都有一个内存缓冲区(buffer in memory),存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘(partition, srot and spill to disk),当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并(merge on disk),生成最终的正式输出文件,然后等待reduce task来拉数据。
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力。
这个内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill(溢写)。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size spill percent = 100MB 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。
Map输出小于100M,直接入内存;大于100,则进入partition sort and spill to disk, 最后落地成为本地文件,用完会自动删除。不能落地到HDFS,因为存在HDFS上要存多份,且要中间结果要手动删除,麻烦。【本地文件是除了HDFS管理外的文件】
为什么先partiton 再sort?
因为分区内排序,排序量小。 默认partition按照 hash 规则分区;
merge on disk 后,再按块的顺序去处理,分发到多个reduce(merge 后有reduce聚合)
https://blog.csdn.net/bingduanlbd/article/details/51933914
从最基本的要求来说,我们对Shuffle过程的期望可以有:
完整地从map task端拉取数据到reduce 端。
在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。
减少磁盘IO对task执行的影响。
Shuffle过程 补充
map过程
map是一个Java程序,或者其他计算机程序,map得到的结果在内存中, 内存有一定的阈值,当到达一定的阈值后,就需要写入到磁盘中, 这个过程叫做溢写(spill)。
在写入之前要做几件事情,partion和sort。把当前map的数据进行输出和排序,然后再溢写到磁盘。放在磁盘中的数据是已经排好序的数据。
什么是Partition?
partition分区的数据是为以后产生作用的。把map的输出数据分成一个一个区,partition可以由程序员自定义编写代码。如果不写按照默认的分区模式进行分区,默认的分区模式是什么呢? Hash模运算
1、获取hash值,每个对象的hash值得到一个整数,把这个整数模reduce的个数,得到一个结果。假如:hash模2,只有两种结果0或者1,然后map的结果分成了两个部分,0区或者1区。
分区有什么作用??? 这时mapredure如何解决负载均衡和数据倾斜?
分区是为了把map的输出进行负载均衡,解决数据倾斜的问题。这样后期通过reduce进行计算的数据就不会有数据倾斜问题,一个计算的过多,一个计算的过少。这就是分区的作用。
map为什么不会数据倾斜???
map的数据是切片,每一片的大小相等,所以map的输入数据不会出现数据倾斜的问题。
sort
sort也有默认的的sort。默认的sort是按照ascii进行排序的。(即按照字典进行排序的)
merge on disk什么意思???
每次都会溢写到磁盘中,溢写会按照hash值进行合并,相同hash值放到一起。每一次溢写就会生成一个文件,溢写的次数越多,生成的文件越多,这些文件需要进行合并为一个大的文件,如何合并,就是按照hash值进行合并,这是默认的规则。
合并的目的???
减少map的输出,因为后期reduce会移动map的输出到reduce服务器上,减少map的输出,就会减少网络io的操作,所以很有必要的。
reduce过程
之前分区是按照reduce去模后的值进行分区的,分区以后的数据在copy到reduce所在的机器上,同时事进行merge,便于下一步reduce进行处理。
分区就是在把数据拷贝到reduce的时候起作用的
为什么reduce task开始也有merge操作???
因为reduce会从很多的map输出结果中拷贝数据,这样会有很多个小文件,reduce为了计算,就需要对这些小文件进行merge。这就是为什么reduce开始时要进行merge操作。这个合并不是人为控制的,这个会根据key相同的进行合并,是hadoop自有的功能。合并完成以后相同key的数据会进行merge。
两个reduce是可以并发的。
抓重点:
这个过程比较虽然比较复杂,但是有很多部分是hadoop框架已经做的工作,只有一部分需要我们编程实现!!!
哪些部分需要我们编程实现???
1、combiner
2、partition(如何分区) -----> 默认hash
3、sort 比较(如何比较)------> 默认按照字典顺序排序
map 具备本地优势;在本地执行
reduce选择机器上资源比较宽裕的机器; reduce不具备本地优势;
IDEA
run -->edit configuration --->application--->
传入的参数空格隔开; 所有在大数据领域中,数据默认来源于HDFS(即直接写 / ),要从本地获取数据需要些 file:/// (第三个 / 表示本地文件系统的根路径, 前边两个是协议)
reduce 在写时,必须要保证指定的目录不存在,否则,报错
Python 写MapReduce
功能:统计文本文件中所有单词出现的频率功能。
【input.txt】
foo foo quux labs foo bar quux abc bar see you by test welcome test
abc labs foo me python hadoop ab ac bc bec python
【Mapper.py】
!/usr/bin/env python
-- coding:utf-8 --
import sys
for line in sys.stdin:
word_list = line.strip().split(' ')
for word in word_list:
print('\t'.join([word.strip(), str(1)]))
【reducer.py】
!/usr/bin/env python
-- coding:utf-8 --
import sys
cur_word = None
sum = 0
for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) < 2:
continue
word = ss[0].strip()
count = ss[1].strip()
if cur_word == None:
cur_word = word
if cur_word != word:
print '\t'.join([cur_word, str(sum)])
cur_word = word
sum = 0
sum += int(count)
print('\t'.join([cur_word, str(sum)]))
sum = 0
测试代码
在Hadoop平台运行前进行本地测试
cat input.txt | ./mapper.py
cat input.txt | ./mapper.py | sort -k1,1 | ./reducer.py
Hadoop平台运行
将输入文件上传到HDFS,本例中是hdfs://127.0.0.1:9000/input.txt
hadoop fs -put /root/python_mr/input.txt hdfs://127.0.0.1:9000/
执行MapReduce任务,输出结果文件制定为/output/word
[root@ivan bin]# ./hadoop jar /root/soft/hadoop-2.7.2/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
-file /root/python_mr/mapper.py -mapper /root/python_mr/mapper.py \
-file /root/python_mr/reducer.py -reducer /root/python_mr/reducer.py \
-input hdfs://127.0.0.1:9000/input.txt \
-output hdfs://127.0.0.1:9000/py_mr/word
18/04/20 01:58:01 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/root/python_mr/mapper.py, /root/python_mr/reducer.py, /tmp/hadoop-unjar287740249112725245/] [] /tmp/streamjob7707054855503715306.jar tmpDir=null
18/04/20 01:58:02 INFO client.RMProxy: Connecting to ResourceManager at ivan.local/127.0.0.1:8032
18/04/20 01:58:02 INFO client.RMProxy: Connecting to ResourceManager at ivan.local/127.0.0.1:8032
18/04/20 01:58:04 INFO mapred.FileInputFormat: Total input paths to process : 1
18/04/20 01:58:04 INFO mapreduce.JobSubmitter: number of splits:2
18/04/20 01:58:04 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1524145665006_0003
18/04/20 01:58:05 INFO impl.YarnClientImpl: Submitted application application_1524145665006_0003
18/04/20 01:58:05 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1524145665006_0003/
18/04/20 01:58:05 INFO mapreduce.Job: Running job: job_1524145665006_0003
18/04/20 01:58:12 INFO mapreduce.Job: Job job_1524145665006_0003 running in uber mode : false
18/04/20 01:58:12 INFO mapreduce.Job: map 0% reduce 0%
18/04/20 01:58:19 INFO mapreduce.Job: map 100% reduce 0%
18/04/20 01:58:24 INFO mapreduce.Job: map 100% reduce 100%
18/04/20 01:58:25 INFO mapreduce.Job: Job job_1524145665006_0003 completed successfully
18/04/20 01:58:25 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=228
FILE: Number of bytes written=362945
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=343
HDFS: Number of bytes written=110
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=9963
Total time spent by all reduces in occupied slots (ms)=3216
Total time spent by all map tasks (ms)=9963
Total time spent by all reduce tasks (ms)=3216
Total vcore-milliseconds taken by all map tasks=9963
Total vcore-milliseconds taken by all reduce tasks=3216
Total megabyte-milliseconds taken by all map tasks=10202112
Total megabyte-milliseconds taken by all reduce tasks=3293184
Map-Reduce Framework
Map input records=2
Map output records=26
Map output bytes=170
Map output materialized bytes=234
Input split bytes=166
Combine input records=0
Combine output records=0
Reduce input groups=17
Reduce shuffle bytes=234
Reduce input records=26
Reduce output records=17
Spilled Records=52
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=291
CPU time spent (ms)=3560
Physical memory (bytes) snapshot=701444096
Virtual memory (bytes) snapshot=6329454592
Total committed heap usage (bytes)=495976448
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=177
File Output Format Counters
Bytes Written=110
18/04/20 01:58:25 INFO streaming.StreamJob: Output directory: hdfs://127.0.0.1:9000/py_mr/word
[root@ivan bin]#
[root@ivan bin]#
[root@ivan bin]#
http://localhost:8088/cluster