Hadoop Streaming提供了一个便于进行MapReduce编程的工具包,使用它可以基于一些可执行命令、脚本语言或其他编程语言来实现Mapper和 Reducer,从而充分利用Hadoop并行计算框架的优势和能力,来处理大数据。需要注意的是,Streaming方式是基于Unix系统的标准输入输出来进行MapReduce Job的运行,它区别与Pipes的地方主要是通信协议,Pipes使用的是Socket通信,是对使用C++语言来实现MapReduce Job并通过Socket通信来与Hadopp平台通信,完成Job的执行。任何支持标准输入输出特性的编程语言都可以使用Streaming方式来实现MapReduce Job,基本原理就是输入从Unix系统标准输入,输出使用Unix系统的标准输出。
Hadoop是使用Java语言编写的,所以最直接的方式的就是使用Java语言来实现Mapper和Reducer,然后配置MapReduce Job,提交到集群计算环境来完成计算。但是很多开发者可能对Java并不熟悉,而是对一些具有脚本特性的语言,如C++、Shell、Python、 Ruby、PHP、Perl有实际开发经验,Hadoop Streaming为这一类开发者提供了使用Hadoop集群来进行处理数据的工具,即工具包hadoop-streaming-.jar。
Hadoop Streaming使用了Unix的标准输入输出作为Hadoop和其他编程语言的开发接口,因此在其他的编程语言所写的程序中,只需要将标准输入作为程 序的输入,将标准输出作为程序的输出就可以了。在标准的输入输出中,Key和Value是以Tab作为分隔符,并且在Reducer的标准输入中,Hadoop框架保证了输入的数据是经过了按Key排序的。
如何使用Hadoop Streaming工具呢?我们可以查看该工具的使用方法,通过命令行来获取,如下所示:
01 |
xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop jar ./contrib/streaming/hadoop-streaming-1.0.3.jar -info |
02 |
Usage: $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar [options] |
04 |
-input <path> DFS input file (s) for the Map step |
05 |
-output <path> DFS output directory for the Reduce step |
06 |
-mapper <cmd|JavaClassName> The streaming command to run |
07 |
-combiner <cmd|JavaClassName> The streaming command to run |
08 |
-reducer <cmd|JavaClassName> The streaming command to run |
09 |
- file < file > File/ dir to be shipped in the Job jar file |
10 |
-inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional. |
11 |
-outputformat TextOutputFormat(default)|JavaClassName Optional. |
12 |
-partitioner JavaClassName Optional. |
13 |
-numReduceTasks <num> Optional. |
14 |
-inputreader <spec> Optional. |
15 |
-cmdenv <n>=< v > Optional. Pass env .var to streaming commands |
16 |
-mapdebug <path> Optional. To run this script when a map task fails |
17 |
-reducedebug <path> Optional. To run this script when a reduce task fails |
18 |
-io <identifier> Optional. |
21 |
Generic options supported are |
22 |
-conf <configuration file > specify an application configuration file |
23 |
-D <property=value> use value for given property |
24 |
-fs < local |namenode:port> specify a namenode |
25 |
-jt < local |jobtracker:port> specify a job tracker |
26 |
-files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster |
27 |
-libjars <comma separated list of jars> specify comma separated jar files to include in the classpath. |
28 |
-archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines. |
30 |
The general command line syntax is |
31 |
bin/hadoop command [genericOptions] [commandOptions] |
34 |
In -input: globbing on <path> is supported and can have multiple -input |
35 |
Default Map input format : a line is a record in UTF-8 |
36 |
the key part ends at first TAB, the rest of the line is the value |
37 |
Custom input format : -inputformat package.MyInputFormat |
38 |
Map output format , reduce input/output format : |
39 |
Format defined by what the mapper command outputs. Line-oriented |
41 |
The files named in the - file argument[s] end up in the |
42 |
working directory when the mapper and reducer are run. |
43 |
The location of this working directory is unspecified. |
45 |
To set the number of reduce tasks (num. of output files): |
46 |
-D mapred.reduce.tasks=10 |
47 |
To skip the sort /combine/shuffle/ sort /reduce step: |
49 |
A Task 's Map output then becomes a ' side-effect output' rather than a reduce input |
50 |
This speeds up processing, This also feels more like "in-place" processing |
51 |
because the input filename and the map input order are preserved |
52 |
This equivalent -reducer NONE |
54 |
To speed up the last maps: |
55 |
-D mapred.map.tasks.speculative.execution= true |
56 |
To speed up the last reduces: |
57 |
-D mapred.reduce.tasks.speculative.execution= true |
58 |
To name the job (appears in the JobTracker Web UI): |
59 |
-D mapred.job.name= 'My Job' |
60 |
To change the local temp directory: |
61 |
-D dfs.data. dir =/tmp/dfs |
62 |
-D stream.tmpdir=/tmp/streaming |
63 |
Additional local temp directories with -cluster local : |
64 |
-D mapred. local . dir =/tmp/ local |
65 |
-D mapred.system. dir =/tmp/system |
66 |
-D mapred.temp. dir =/tmp/temp |
67 |
To treat tasks with non-zero exit status as SUCCEDED: |
68 |
-D stream.non.zero. exit .is.failure= false |
69 |
Use a custom hadoopStreaming build along a standard hadoop install : |
70 |
$HADOOP_HOME/bin/hadoop jar /path/my-hadoop-streaming.jar [...]\ |
71 |
[...] -D stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar |
72 |
For more details about jobconf parameters see: |
76 |
To set an environement variable in a streaming command : |
77 |
-cmdenv EXAMPLE_DIR=/home/example/dictionaries/ |
80 |
setenv HSTREAMING "$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar" |
82 |
Example: $HSTREAMING -mapper "/usr/local/bin/perl5 filter.pl" |
83 |
- file / local /filter.pl -input "/logs/0604*/*" [...] |
84 |
Ships a script, invokes the non-shipped perl interpreter |
85 |
Shipped files go to the working directory so filter.pl is found by perl |
86 |
Input files are all the daily logs for days in month 2006-04 |
面,我们分别选择几个可以使用Hadoop Streaming工具来进行计算的例子,比如对单词词频进行统计计算,即WordCount功能。
首先,我们准备测试使用的数据集,如下所示:
1 |
xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop fs -lsr /user/xiaoxiang/dataset/ join / |
2 |
-rw-r--r-- 3 xiaoxiang supergroup 391103029 2013-03-26 12:19 /user/xiaoxiang/dataset/ join /irc_basic_info.ds |
3 |
-rw-r--r-- 3 xiaoxiang supergroup 11577164 2013-03-26 12:20 /user/xiaoxiang/dataset/ join /irc_net_block.ds |
4 |
-rw-r--r-- 3 xiaoxiang supergroup 8335235 2013-03-26 12:20 /user/xiaoxiang/dataset/ join /irc_org_info.ds |
一共有3个数据文件,大概将近400M大小。
下面,选择Python语言来实现MapReduce Job的运行。
使用Python实现Mapper,代码文件为word_count_mapper.py,代码如下所示:
7 |
words = filter ( lambda word: word, line.split()) |
9 |
print '%s\t%s' % (word, 1 ) |
使用Python实现Reducer,代码文件为word_count_reducer.py,代码如下所示:
04 |
from operator import itemgetter |
08 |
for line in sys.stdin: |
10 |
word, count = line.split() |
13 |
wc_dict[word] = wc_dict.get(word, 0 ) + count |
17 |
sorted_dict = sorted (wc_dict.items(), key = itemgetter( 0 )) |
18 |
for word, count in sorted_dict: |
19 |
print '%s\t%s' % (word, count) |
运行Python实现的MapReduce程序,如下所示:
01 |
xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop jar ./contrib/streaming/hadoop-streaming-1.0.3.jar -input /user/xiaoxiang/dataset/ join / -output /user/xiaoxiang/output/streaming/python -mapper word_count_mapper.py -reducer word_count_reducer.py -numReduceTasks 2 - file *.py |
02 |
packageJobJar: [word_count_mapper.py, word_count_reducer.py, /opt/stone/cloud/storage/tmp/hadoop-xiaoxiang/hadoop-unjar4066863202997744310/] [] /tmp/streamjob2336302975421423718.jar tmpDir=null |
03 |
13/04/18 17:50:17 INFO util.NativeCodeLoader: Loaded the native-hadoop library |
04 |
13/04/18 17:50:17 WARN snappy.LoadSnappy: Snappy native library not loaded |
05 |
13/04/18 17:50:17 INFO mapred.FileInputFormat: Total input paths to process : 3 |
06 |
13/04/18 17:50:17 INFO streaming.StreamJob: getLocalDirs(): [/opt/stone/cloud/storage/mapred/ local ] |
07 |
13/04/18 17:50:17 INFO streaming.StreamJob: Running job: job_201303302227_0047 |
08 |
13/04/18 17:50:17 INFO streaming.StreamJob: To kill this job, run: |
09 |
13/04/18 17:50:17 INFO streaming.StreamJob: /opt/stone/cloud/hadoop-1.0.3/libexec/../bin/hadoop job -Dmapred.job.tracker=hdfs://ubuntu3:9001/ - kill job_201303302227_0047 |
11 |
13/04/18 17:50:18 INFO streaming.StreamJob: map 0% reduce 0% |
12 |
13/04/18 17:50:33 INFO streaming.StreamJob: map 7% reduce 0% |
13 |
13/04/18 17:50:36 INFO streaming.StreamJob: map 11% reduce 0% |
14 |
13/04/18 17:50:39 INFO streaming.StreamJob: map 15% reduce 0% |
15 |
13/04/18 17:50:42 INFO streaming.StreamJob: map 19% reduce 0% |
16 |
13/04/18 17:50:45 INFO streaming.StreamJob: map 23% reduce 0% |
17 |
13/04/18 17:50:48 INFO streaming.StreamJob: map 25% reduce 0% |
18 |
13/04/18 17:51:09 INFO streaming.StreamJob: map 32% reduce 2% |
19 |
13/04/18 17:51:12 INFO streaming.StreamJob: map 36% reduce 4% |
20 |
13/04/18 17:51:15 INFO streaming.StreamJob: map 40% reduce 8% |
21 |
13/04/18 17:51:18 INFO streaming.StreamJob: map 44% reduce 8% |
22 |
13/04/18 17:51:21 INFO streaming.StreamJob: map 47% reduce 8% |
23 |
13/04/18 17:51:24 INFO streaming.StreamJob: map 50% reduce 8% |
24 |
13/04/18 17:51:45 INFO streaming.StreamJob: map 54% reduce 10% |
25 |
13/04/18 17:51:48 INFO streaming.StreamJob: map 60% reduce 15% |
26 |
13/04/18 17:51:51 INFO streaming.StreamJob: map 65% reduce 17% |
27 |
13/04/18 17:51:55 INFO streaming.StreamJob: map 66% reduce 17% |
28 |
13/04/18 17:51:58 INFO streaming.StreamJob: map 68% reduce 17% |
29 |
13/04/18 17:52:01 INFO streaming.StreamJob: map 72% reduce 17% |
30 |
13/04/18 17:52:04 INFO streaming.StreamJob: map 75% reduce 17% |
31 |
13/04/18 17:52:19 INFO streaming.StreamJob: map 75% reduce 19% |
32 |
13/04/18 17:52:22 INFO streaming.StreamJob: map 87% reduce 21% |
33 |
13/04/18 17:52:25 INFO streaming.StreamJob: map 100% reduce 23% |
34 |
13/04/18 17:52:28 INFO streaming.StreamJob: map 100% reduce 27% |
35 |
13/04/18 17:52:31 INFO streaming.StreamJob: map 100% reduce 29% |
36 |
13/04/18 17:52:37 INFO streaming.StreamJob: map 100% reduce 49% |
37 |
13/04/18 17:52:40 INFO streaming.StreamJob: map 100% reduce 69% |
38 |
13/04/18 17:52:43 INFO streaming.StreamJob: map 100% reduce 72% |
39 |
13/04/18 17:52:46 INFO streaming.StreamJob: map 100% reduce 74% |
40 |
13/04/18 17:52:49 INFO streaming.StreamJob: map 100% reduce 76% |
41 |
13/04/18 17:52:52 INFO streaming.StreamJob: map 100% reduce 78% |
42 |
13/04/18 17:52:55 INFO streaming.StreamJob: map 100% reduce 79% |
43 |
13/04/18 17:52:58 INFO streaming.StreamJob: map 100% reduce 81% |
44 |
13/04/18 17:53:01 INFO streaming.StreamJob: map 100% reduce 84% |
45 |
13/04/18 17:53:04 INFO streaming.StreamJob: map 100% reduce 87% |
46 |
13/04/18 17:53:07 INFO streaming.StreamJob: map 100% reduce 90% |
47 |
13/04/18 17:53:10 INFO streaming.StreamJob: map 100% reduce 93% |
48 |
13/04/18 17:53:13 INFO streaming.StreamJob: map 100% reduce 96% |
49 |
13/04/18 17:53:16 INFO streaming.StreamJob: map 100% reduce 98% |
50 |
13/04/18 17:53:19 INFO streaming.StreamJob: map 100% reduce 99% |
51 |
13/04/18 17:53:22 INFO streaming.StreamJob: map 100% reduce 100% |
52 |
13/04/18 17:54:10 INFO streaming.StreamJob: Job complete: job_201303302227_0047 |
53 |
13/04/18 17:54:10 INFO streaming.StreamJob: Output: /user/xiaoxiang/output/streaming/python |
验证结果,如下所示:
01 |
xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop fs -lsr /user/xiaoxiang/output/streaming/python |
02 |
-rw-r--r-- 3 xiaoxiang supergroup 0 2013-04-18 17:54 /user/xiaoxiang/output/streaming/python/_SUCCESS |
03 |
drwxr-xr-x - xiaoxiang supergroup 0 2013-04-18 17:50 /user/xiaoxiang/output/streaming/python/_logs |
04 |
drwxr-xr-x - xiaoxiang supergroup 0 2013-04-18 17:50 /user/xiaoxiang/output/streaming/python/_logs/ history |
05 |
-rw-r--r-- 3 xiaoxiang supergroup 37646 2013-04-18 17:50 /user/xiaoxiang/output/streaming/python/_logs/ history /job_201303302227_0047_1366278617842_xiaoxiang_streamjob2336302975421423718.jar |
06 |
-rw-r--r-- 3 xiaoxiang supergroup 21656 2013-04-18 17:50 /user/xiaoxiang/output/streaming/python/_logs/ history /job_201303302227_0047_conf.xml |
07 |
-rw-r--r-- 3 xiaoxiang supergroup 91367389 2013-04-18 17:52 /user/xiaoxiang/output/streaming/python/part-00000 |
08 |
-rw-r--r-- 3 xiaoxiang supergroup 91268074 2013-04-18 17:52 /user/xiaoxiang/output/streaming/python/part-00001 |
09 |
xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop fs - cat /user/xiaoxiang/output/streaming/python/part-00000 | head -5 |
相关问题
在使用Python实现MapReduce时,总是执行失败?
可以查看TaskTracker结点运行日志,可以看到,总是找不到对应的Python脚本文件,错误示例如下:
01 |
xiaoxiang@ubuntu1:/opt/stone/cloud/storage/logs/hadoop/userlogs/job_201303302227_0045/attempt_201303302227_0045_m_000001_0$ cat stderr |
02 |
java.io.IOException: Cannot run program "/user/xiaoxiang/streaming/python/word_count_mapper.py" : java.io.IOException: error=2, No such file or directory |
03 |
at java.lang.ProcessBuilder.start(ProcessBuilder.java:460) |
04 |
at org.apache.hadoop.streaming.PipeMapRed.configure(PipeMapRed.java:214) |
05 |
at org.apache.hadoop.streaming.PipeMapper.configure(PipeMapper.java:66) |
06 |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) |
07 |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) |
08 |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) |
09 |
at java.lang.reflect.Method.invoke(Method.java:597) |
10 |
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88) |
11 |
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64) |
12 |
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117) |
13 |
at org.apache.hadoop.mapred.MapRunner.configure(MapRunner.java:34) |
14 |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) |
15 |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) |
16 |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) |
17 |
at java.lang.reflect.Method.invoke(Method.java:597) |
18 |
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88) |
19 |
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64) |
20 |
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117) |
21 |
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:432) |
22 |
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372) |
23 |
at org.apache.hadoop.mapred.Child$4.run(Child.java:255) |
24 |
at java.security.AccessController.doPrivileged(Native Method) |
25 |
at javax.security.auth.Subject.doAs(Subject.java:396) |
26 |
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) |
27 |
at org.apache.hadoop.mapred.Child.main(Child.java:249) |
28 |
Caused by: java.io.IOException: java.io.IOException: error=2, No such file or directory |
29 |
at java.lang.UNIXProcess.<init>(UNIXProcess.java:148) |
30 |
at java.lang.ProcessImpl.start(ProcessImpl.java:65) |
31 |
at java.lang.ProcessBuilder.start(ProcessBuilder.java:453) |
可以使用Streaming的-file选项指定脚本文件加入到Job的Jar文件中,即使用上面运行的命令行中指定的“-file *.py”, 而实现的2个Python脚本文件就在当前运行Job的结点当前目录下。