其实,使用MapReduce计算最大值的问题,和Hadoop自带的WordCount的程序没什么区别,不过在Reducer中一个是求最大值,一个是做累加,本质一样,比较简单。下面我们结合一个例子来实现。
测试数据
我们通过自己的模拟程序,生成了一组简单的测试样本数据。输入数据的格式,截取一个片段,如下所示:
01 |
SG 253654006139495 253654006164392 619850464 |
02 |
KG 253654006225166 253654006252433 743485698 |
03 |
UZ 253654006248058 253654006271941 570409379 |
04 |
TT 253654006282019 253654006286839 23236775 |
05 |
BE 253654006276984 253654006301435 597874033 |
06 |
BO 253654006293624 253654006315946 498265375 |
07 |
SR 253654006308428 253654006330442 484613339 |
08 |
SV 253654006320312 253654006345405 629640166 |
09 |
LV 253654006330384 253654006359891 870680704 |
10 |
FJ 253654006351709 253654006374468 517965666 |
上面文本数据一行一行存储,一行包含4部分,分别表示:
- 国家代码
- 起始时间
- 截止时间
- 随机成本/权重估值
各个字段之间以空格号分隔。我们要计算的结果是,求各个国家(以国家代码标识)的成本估值的最大值。
编程实现
因为比较简单,直接看实际的代码。代码分为三个部分,当然是Mapper、Reducer、Driver。Mapper实现类为GlobalCostMapper,实现代码如下所示:
01 |
package org.shirdrn.kodz.inaction.hadoop.extremum.max; |
02 |
03 |
import java.io.IOException; |
04 |
05 |
import org.apache.hadoop.io.LongWritable; |
06 |
import org.apache.hadoop.io.Text; |
07 |
import org.apache.hadoop.mapreduce.Mapper; |
08 |
09 |
public class GlobalCostMapper extends |
10 |
Mapper<LongWritable, Text, Text, LongWritable> { |
11 |
12 |
private final static LongWritable costValue = new LongWritable( 0 ); |
13 |
private Text code = new Text(); |
14 |
15 |
@Override |
16 |
protected void map(LongWritable key, Text value, Context context) |
17 |
throws IOException, InterruptedException { |
18 |
// a line, such as 'SG 253654006139495 253654006164392 619850464' |
19 |
String line = value.toString(); |
20 |
String[] array = line.split( "\\s" ); |
21 |
if (array.length == 4 ) { |
22 |
String countryCode = array[ 0 ]; |
23 |
String strCost = array[ 3 ]; |
24 |
long cost = 0L; |
25 |
try { |
26 |
cost = Long.parseLong(strCost); |
27 |
} catch (NumberFormatException e) { |
28 |
cost = 0L; |
29 |
} |
30 |
if (cost != 0 ) { |
31 |
code.set(countryCode); |
32 |
costValue.set(cost); |
33 |
context.write(code, costValue); |
34 |
} |
35 |
} |
36 |
} |
37 |
} |
上面实现逻辑非常简单,就是根据空格分隔符,将各个字段的值分离出来,最后输出键值对。
接着,Mapper输出了的键值对列表,在Reducer中就需要进行合并化简,Reducer的实现类为GlobalCostReducer,实现代码如下所示:
01 |
package org.shirdrn.kodz.inaction.hadoop.extremum.max; |
02 |
03 |
import java.io.IOException; |
04 |
import java.util.Iterator; |
05 |
06 |
import org.apache.hadoop.io.LongWritable; |
07 |
import org.apache.hadoop.io.Text; |
08 |
import org.apache.hadoop.mapreduce.Reducer; |
09 |
10 |
public class GlobalCostReducer extends |
11 |
Reducer<Text, LongWritable, Text, LongWritable> { |
12 |
13 |
@Override |
14 |
protected void reduce(Text key, Iterable<LongWritable> values, |
15 |
Context context) throws IOException, InterruptedException { |
16 |
long max = 0L; |
17 |
Iterator<LongWritable> iter = values.iterator(); |
18 |
while (iter.hasNext()) { |
19 |
LongWritable current = iter.next(); |
20 |
if (current.get() > max) { |
21 |
max = current.get(); |
22 |
} |
23 |
} |
24 |
context.write(key, new LongWritable(max)); |
25 |
} |
26 |
} |
上面计算一组键值对列表中代价估值的最大值,逻辑比较简单。为了优化,在Map输出以后,可以使用该Reducer进行合并操作,即作为Combiner,减少从Mapper到Reducer的数据传输量,在配置Job的时候可以指定。
下面看,如何来配置和运行一个Job,实现类为GlobalMaxCostDriver,实现代码如下所示:
01 |
package org.shirdrn.kodz.inaction.hadoop.extremum.max; |
02 |
03 |
import java.io.IOException; |
04 |
05 |
import org.apache.hadoop.conf.Configuration; |
06 |
import org.apache.hadoop.fs.Path; |
07 |
import org.apache.hadoop.io.LongWritable; |
08 |
import org.apache.hadoop.io.Text; |
09 |
import org.apache.hadoop.mapreduce.Job; |
10 |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
11 |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
12 |
import org.apache.hadoop.util.GenericOptionsParser; |
13 |
14 |
public class GlobalMaxCostDriver { |
15 |
16 |
public static void main(String[] args) throws IOException, |
17 |
InterruptedException, ClassNotFoundException { |
18 |
19 |
Configuration conf = new Configuration(); |
20 |
String[] otherArgs = new GenericOptionsParser(conf, args) |
21 |
.getRemainingArgs(); |
22 |
if (otherArgs.length != 2 ) { |
23 |
System.err.println( "Usage: maxcost <in> <out>" ); |
24 |
System.exit( 2 ); |
25 |
} |
26 |
27 |
Job job = new Job(conf, "max cost" ); |
28 |
29 |
job.setJarByClass(GlobalMaxCostDriver. class ); |
30 |
job.setMapperClass(GlobalCostMapper. class ); |
31 |
job.setCombinerClass(GlobalCostReducer. class ); |
32 |
job.setReducerClass(GlobalCostReducer. class ); |
33 |
34 |
job.setOutputKeyClass(Text. class ); |
35 |
job.setOutputValueClass(LongWritable. class ); |
36 |
37 |
FileInputFormat.addInputPath(job, new Path(otherArgs[ 0 ])); |
38 |
FileOutputFormat.setOutputPath(job, new Path(otherArgs[ 1 ])); |
39 |
40 |
int exitFlag = job.waitForCompletion( true ) ? 0 : 1 ; |
41 |
System.exit(exitFlag); |
42 |
} |
43 |
} |
运行程序
首先,需要保证Hadoop集群正常运行,我这里NameNode是主机ubuntu3。下面看运行程序的过程:
- 编译代码(我直接使用Maven进行),打成jar文件
1 |
shirdrn@SYJ:~/programs/eclipse-jee-juno/workspace/kodz-all/kodz-hadoop/target/classes$ jar -cvf global-max-cost.jar -C ./ org |
- 拷贝上面生成的jar文件,到NameNode环境中
1 |
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ scp shirdrn@172.0.8.212:~/programs/eclipse-jee-juno/workspace/kodz-all/kodz-hadoop/target/classes/global-max-cost.jar ./ |
2 |
global-max-cost.jar |
- 上传待处理的数据文件
1 |
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -copyFromLocal /opt/stone/cloud/dataset/data_10m /user/xiaoxiang/datasets/cost/ |
- 运行我们编写MapReduce任务,计算最大值
1 |
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop jar global-max-cost.jar org.shirdrn.kodz.inaction.hadoop.extremum.max.GlobalMaxCostDriver /user/xiaoxiang/datasets/cost /user/xiaoxiang/output/cost |
运行过程控制台输出内容,大概如下所示:
01 |
13/03/22 16:30:16 INFO input.FileInputFormat: Total input paths to process : 1 |
02 |
13/03/22 16:30:16 INFO util.NativeCodeLoader: Loaded the native-hadoop library |
03 |
13/03/22 16:30:16 WARN snappy.LoadSnappy: Snappy native library not loaded |
04 |
13/03/22 16:30:16 INFO mapred.JobClient: Running job: job_201303111631_0004 |
05 |
13/03/22 16:30:17 INFO mapred.JobClient: map 0% reduce 0% |
06 |
13/03/22 16:30:33 INFO mapred.JobClient: map 22% reduce 0% |
07 |
13/03/22 16:30:36 INFO mapred.JobClient: map 28% reduce 0% |
08 |
13/03/22 16:30:45 INFO mapred.JobClient: map 52% reduce 9% |
09 |
13/03/22 16:30:48 INFO mapred.JobClient: map 57% reduce 9% |
10 |
13/03/22 16:30:57 INFO mapred.JobClient: map 80% reduce 9% |
11 |
13/03/22 16:31:00 INFO mapred.JobClient: map 85% reduce 19% |
12 |
13/03/22 16:31:10 INFO mapred.JobClient: map 100% reduce 28% |
13 |
13/03/22 16:31:19 INFO mapred.JobClient: map 100% reduce 100% |
14 |
13/03/22 16:31:24 INFO mapred.JobClient: Job complete: job_201303111631_0004 |
15 |
13/03/22 16:31:24 INFO mapred.JobClient: Counters: 29 |
16 |
13/03/22 16:31:24 INFO mapred.JobClient: Job Counters |
17 |
13/03/22 16:31:24 INFO mapred.JobClient: Launched reduce tasks=1 |
18 |
13/03/22 16:31:24 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=76773 |
19 |
13/03/22 16:31:24 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 |
20 |
13/03/22 16:31:24 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 |
21 |
13/03/22 16:31:24 INFO mapred.JobClient: Launched map tasks=7 |
22 |
13/03/22 16:31:24 INFO mapred.JobClient: Data-local map tasks=7 |
23 |
13/03/22 16:31:24 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=40497 |
24 |
13/03/22 16:31:24 INFO mapred.JobClient: File Output Format Counters |
25 |
13/03/22 16:31:24 INFO mapred.JobClient: Bytes Written=3029 |
26 |
13/03/22 16:31:24 INFO mapred.JobClient: FileSystemCounters |
27 |
13/03/22 16:31:24 INFO mapred.JobClient: FILE_BYTES_READ=142609 |
28 |
13/03/22 16:31:24 INFO mapred.JobClient: HDFS_BYTES_READ=448913653 |
29 |
13/03/22 16:31:24 INFO mapred.JobClient: FILE_BYTES_WRITTEN=338151 |
30 |
13/03/22 16:31:24 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=3029 |
31 |
13/03/22 16:31:24 INFO mapred.JobClient: File Input Format Counters |
32 |
13/03/22 16:31:24 INFO mapred.JobClient: Bytes Read=448912799 |
33 |
13/03/22 16:31:24 INFO mapred.JobClient: Map-Reduce Framework |
34 |
13/03/22 16:31:24 INFO mapred.JobClient: Map output materialized bytes=21245 |
35 |
13/03/22 16:31:24 INFO mapred.JobClient: Map input records=10000000 |
36 |
13/03/22 16:31:24 INFO mapred.JobClient: Reduce shuffle bytes=18210 |
37 |
13/03/22 16:31:24 INFO mapred.JobClient: Spilled Records=12582 |
38 |
13/03/22 16:31:24 INFO mapred.JobClient: Map output bytes=110000000 |
39 |
13/03/22 16:31:24 INFO mapred.JobClient: CPU time spent (ms)=80320 |
40 |
13/03/22 16:31:24 INFO mapred.JobClient: Total committed heap usage (bytes)=1535639552 |
41 |
13/03/22 16:31:24 INFO mapred.JobClient: Combine input records=10009320 |
42 |
13/03/22 16:31:24 INFO mapred.JobClient: SPLIT_RAW_BYTES=854 |
43 |
13/03/22 16:31:24 INFO mapred.JobClient: Reduce input records=1631 |
44 |
13/03/22 16:31:24 INFO mapred.JobClient: Reduce input groups=233 |
45 |
13/03/22 16:31:24 INFO mapred.JobClient: Combine output records=10951 |
46 |
13/03/22 16:31:24 INFO mapred.JobClient: Physical memory (bytes) snapshot=1706708992 |
47 |
13/03/22 16:31:24 INFO mapred.JobClient: Reduce output records=233 |
48 |
13/03/22 16:31:24 INFO mapred.JobClient: Virtual memory (bytes) snapshot=4316872704 |
49 |
13/03/22 16:31:24 INFO mapred.JobClient: Map output records=10000000 |
- 验证Job结果输出
001 |
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs - cat /user/xiaoxiang/output/cost/part-r-00000 |
002 |
AD 999974516 |
003 |
AE 999938630 |
004 |
AF 999996180 |
005 |
AG 999991085 |
006 |
AI 999989595 |
007 |
AL 999998489 |
008 |
AM 999976746 |
009 |
AO 999989628 |
010 |
AQ 999995031 |
011 |
AR 999953989 |
012 |
AS 999935982 |
013 |
AT 999999909 |
014 |
AU 999937089 |
015 |
AW 999965784 |
016 |
AZ 999996557 |
017 |
BA 999949773 |
018 |
BB 999987345 |
019 |
BD 999992272 |
020 |
BE 999925057 |
021 |
BF 999999220 |
022 |
BG 999971528 |
023 |
BH 999994900 |
024 |
BI 999978516 |
025 |
BJ 999977886 |
026 |
BM 999991925 |
027 |
BN 999986630 |
028 |
BO 999995482 |
029 |
BR 999989947 |
030 |
BS 999980931 |
031 |
BT 999977488 |
032 |
BW 999935985 |
033 |
BY 999998496 |
034 |
BZ 999975972 |
035 |
CA 999978275 |
036 |
CC 999968311 |
037 |
CD 999978139 |
038 |
CF 999995342 |
039 |
CG 999788112 |
040 |
CH 999997524 |
041 |
CI 999998864 |
042 |
CK 999968719 |
043 |
CL 999967083 |
044 |
CM 999998369 |
045 |
CN 999975367 |
046 |
CO 999999167 |
047 |
CR 999971685 |
048 |
CU 999976352 |
049 |
CV 999990543 |
050 |
CW 999987713 |
051 |
CX 999987579 |
052 |
CY 999982925 |
053 |
CZ 999993908 |
054 |
DE 999985416 |
055 |
DJ 999997438 |
056 |
DK 999963312 |
057 |
DM 999941706 |
058 |
DO 999945597 |
059 |
DZ 999973610 |
060 |
EC 999920447 |
061 |
EE 999949534 |
062 |
EG 999980522 |
063 |
ER 999980425 |
064 |
ES 999949155 |
065 |
ET 999987033 |
066 |
FI 999966243 |
067 |
FJ 999990686 |
068 |
FK 999966573 |
069 |
FM 999972146 |
070 |
FO 999988472 |
071 |
FR 999988342 |
072 |
GA 999982099 |
073 |
GB 999970658 |
074 |
GD 999996318 |
075 |
GE 999991970 |
076 |
GF 999982024 |
077 |
GH 999941039 |
078 |
GI 999995295 |
079 |
GL 999948726 |
080 |
GM 999967823 |
081 |
GN 999951804 |
082 |
GP 999904645 |
083 |
GQ 999988635 |
084 |
GR 999999672 |
085 |
GT 999972984 |
086 |
GU 999919056 |
087 |
GW 999962551 |
088 |
GY 999999881 |
089 |
HK 999970084 |
090 |
HN 999972628 |
091 |
HR 999986688 |
092 |
HT 999970913 |
093 |
HU 999997568 |
094 |
ID 999994762 |
095 |
IE 999996686 |
096 |
IL 999982184 |
097 |
IM 999987831 |
098 |
IN 999914991 |
099 |
IO 999968575 |
100 |
IQ 999990126 |
101 |
IR 999986780 |
102 |
IS 999973585 |
103 |
IT 999997239 |
104 |
JM 999982209 |
105 |
JO 999977276 |
106 |
JP 999983684 |
107 |
KE 999996012 |
108 |
KG 999991556 |
109 |
KH 999975644 |
110 |
KI 999994328 |
111 |
KM 999989895 |
112 |
KN 999991068 |
113 |
KP 999967939 |
114 |
KR 999992162 |
115 |
KW 999924295 |
116 |
KY 999977105 |
117 |
KZ 999992835 |
118 |
LA 999989151 |
119 |
LB 999963014 |
120 |
LC 999962233 |
121 |
LI 999986863 |
122 |
LK 999989876 |
123 |
LR 999897202 |
124 |
LS 999957706 |
125 |
LT 999999688 |
126 |
LU 999999823 |
127 |
LV 999945411 |
128 |
LY 999992365 |
129 |
MA 999922726 |
130 |
MC 999978886 |
131 |
MD 999996042 |
132 |
MG 999996602 |
133 |
MH 999989668 |
134 |
MK 999968900 |
135 |
ML 999990079 |
136 |
MM 999987977 |
137 |
MN 999969051 |
138 |
MO 999977975 |
139 |
MP 999995234 |
140 |
MQ 999913110 |
141 |
MR 999982303 |
142 |
MS 999974690 |
143 |
MT 999982604 |
144 |
MU 999988632 |
145 |
MV 999961206 |
146 |
MW 999991903 |
147 |
MX 999978066 |
148 |
MY 999995010 |
149 |
MZ 999981189 |
150 |
NA 999961177 |
151 |
NC 999961053 |
152 |
NE 999990091 |
153 |
NF 999989399 |
154 |
NG 999985037 |
155 |
NI 999965733 |
156 |
NL 999949789 |
157 |
NO 999993122 |
158 |
NP 999972410 |
159 |
NR 999956464 |
160 |
NU 999987046 |
161 |
NZ 999998214 |
162 |
OM 999967428 |
163 |
PA 999924435 |
164 |
PE 999981176 |
165 |
PF 999959978 |
166 |
PG 999987347 |
167 |
PH 999981534 |
168 |
PK 999954268 |
169 |
PL 999996619 |
170 |
PM 999998975 |
171 |
PR 999906386 |
172 |
PT 999993404 |
173 |
PW 999991278 |
174 |
PY 999985509 |
175 |
QA 999995061 |
176 |
RE 999952291 |
177 |
RO 999994148 |
178 |
RS 999999923 |
179 |
RU 999894985 |
180 |
RW 999980184 |
181 |
SA 999973822 |
182 |
SB 999972832 |
183 |
SC 999973271 |
184 |
SD 999963744 |
185 |
SE 999972256 |
186 |
SG 999977637 |
187 |
SH 999983638 |
188 |
SI 999980580 |
189 |
SK 999998152 |
190 |
SL 999999269 |
191 |
SM 999941188 |
192 |
SN 999990278 |
193 |
SO 999973175 |
194 |
SR 999975964 |
195 |
ST 999980447 |
196 |
SV 999999945 |
197 |
SX 999903445 |
198 |
SY 999988858 |
199 |
SZ 999992537 |
200 |
TC 999969540 |
201 |
TD 999999303 |
202 |
TG 999977640 |
203 |
TH 999968746 |
204 |
TJ 999983666 |
205 |
TK 999971131 |
206 |
TM 999958998 |
207 |
TN 999963035 |
208 |
TO 999947915 |
209 |
TP 999986796 |
210 |
TR 999995112 |
211 |
TT 999984435 |
212 |
TV 999971989 |
213 |
TW 999975092 |
214 |
TZ 999992734 |
215 |
UA 999970993 |
216 |
UG 999976267 |
217 |
UM 999998377 |
218 |
US 999912229 |
219 |
UY 999989662 |
220 |
UZ 999982762 |
221 |
VA 999975548 |
222 |
VC 999991495 |
223 |
VE 999997971 |
224 |
VG 999949690 |
225 |
VI 999990063 |
226 |
VN 999974393 |
227 |
VU 999953162 |
228 |
WF 999947666 |
229 |
WS 999970242 |
230 |
YE 999984650 |
231 |
YT 999994707 |
232 |
ZA 999998692 |
233 |
ZM 999973392 |
234 |
ZW 999928087 |
可见,结果是我们所期望的。