Hadoop MapReduce编程:计算最大值

简介:

其实,使用MapReduce计算最大值的问题,和Hadoop自带的WordCount的程序没什么区别,不过在Reducer中一个是求最大值,一个是做累加,本质一样,比较简单。下面我们结合一个例子来实现。

测试数据

我们通过自己的模拟程序,生成了一组简单的测试样本数据。输入数据的格式,截取一个片段,如下所示:

01 SG 253654006139495 253654006164392 619850464
02 KG 253654006225166 253654006252433 743485698
03 UZ 253654006248058 253654006271941 570409379
04 TT 253654006282019 253654006286839 23236775
05 BE 253654006276984 253654006301435 597874033
06 BO 253654006293624 253654006315946 498265375
07 SR 253654006308428 253654006330442 484613339
08 SV 253654006320312 253654006345405 629640166
09 LV 253654006330384 253654006359891 870680704
10 FJ 253654006351709 253654006374468 517965666

上面文本数据一行一行存储,一行包含4部分,分别表示:

  1. 国家代码
  2. 起始时间
  3. 截止时间
  4. 随机成本/权重估值

各个字段之间以空格号分隔。我们要计算的结果是,求各个国家(以国家代码标识)的成本估值的最大值。

编程实现

因为比较简单,直接看实际的代码。代码分为三个部分,当然是Mapper、Reducer、Driver。Mapper实现类为GlobalCostMapper,实现代码如下所示:

01 package org.shirdrn.kodz.inaction.hadoop.extremum.max;
02
03 import java.io.IOException;
04
05 import org.apache.hadoop.io.LongWritable;
06 import org.apache.hadoop.io.Text;
07 import org.apache.hadoop.mapreduce.Mapper;
08
09 public class GlobalCostMapper extends
10 Mapper<LongWritable, Text, Text, LongWritable> {
11
12 private final static LongWritable costValue = new LongWritable(0);
13 private Text code = new Text();
14
15 @Override
16 protected void map(LongWritable key, Text value, Context context)
17 throws IOException, InterruptedException {
18 // a line, such as 'SG 253654006139495 253654006164392 619850464'
19 String line = value.toString();
20 String[] array = line.split("\\s");
21 if (array.length == 4) {
22 String countryCode = array[0];
23 String strCost = array[3];
24 long cost = 0L;
25 try {
26 cost = Long.parseLong(strCost);
27 } catch (NumberFormatException e) {
28 cost = 0L;
29 }
30 if (cost != 0) {
31 code.set(countryCode);
32 costValue.set(cost);
33 context.write(code, costValue);
34 }
35 }
36 }
37 }

上面实现逻辑非常简单,就是根据空格分隔符,将各个字段的值分离出来,最后输出键值对。
接着,Mapper输出了的键值对列表,在Reducer中就需要进行合并化简,Reducer的实现类为GlobalCostReducer,实现代码如下所示:

01 package org.shirdrn.kodz.inaction.hadoop.extremum.max;
02
03 import java.io.IOException;
04 import java.util.Iterator;
05
06 import org.apache.hadoop.io.LongWritable;
07 import org.apache.hadoop.io.Text;
08 import org.apache.hadoop.mapreduce.Reducer;
09
10 public class GlobalCostReducer extends
11 Reducer<Text, LongWritable, Text, LongWritable> {
12
13 @Override
14 protected void reduce(Text key, Iterable<LongWritable> values,
15 Context context) throws IOException, InterruptedException {
16 long max = 0L;
17 Iterator<LongWritable> iter = values.iterator();
18 while (iter.hasNext()) {
19 LongWritable current = iter.next();
20 if (current.get() > max) {
21 max = current.get();
22 }
23 }
24 context.write(key, new LongWritable(max));
25 }
26 }

上面计算一组键值对列表中代价估值的最大值,逻辑比较简单。为了优化,在Map输出以后,可以使用该Reducer进行合并操作,即作为Combiner,减少从Mapper到Reducer的数据传输量,在配置Job的时候可以指定。
下面看,如何来配置和运行一个Job,实现类为GlobalMaxCostDriver,实现代码如下所示:

01 package org.shirdrn.kodz.inaction.hadoop.extremum.max;
02
03 import java.io.IOException;
04
05 import org.apache.hadoop.conf.Configuration;
06 import org.apache.hadoop.fs.Path;
07 import org.apache.hadoop.io.LongWritable;
08 import org.apache.hadoop.io.Text;
09 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12 import org.apache.hadoop.util.GenericOptionsParser;
13
14 public class GlobalMaxCostDriver {
15
16 public static void main(String[] args) throws IOException,
17 InterruptedException, ClassNotFoundException {
18
19 Configuration conf = new Configuration();
20 String[] otherArgs = new GenericOptionsParser(conf, args)
21 .getRemainingArgs();
22 if (otherArgs.length != 2) {
23 System.err.println("Usage: maxcost <in> <out>");
24 System.exit(2);
25 }
26
27 Job job = new Job(conf, "max cost");
28
29 job.setJarByClass(GlobalMaxCostDriver.class);
30 job.setMapperClass(GlobalCostMapper.class);
31 job.setCombinerClass(GlobalCostReducer.class);
32 job.setReducerClass(GlobalCostReducer.class);
33
34 job.setOutputKeyClass(Text.class);
35 job.setOutputValueClass(LongWritable.class);
36
37 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
38 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
39
40 int exitFlag = job.waitForCompletion(true) ? 0 : 1;
41 System.exit(exitFlag);
42 }
43 }

运行程序

首先,需要保证Hadoop集群正常运行,我这里NameNode是主机ubuntu3。下面看运行程序的过程:

  • 编译代码(我直接使用Maven进行),打成jar文件
1 shirdrn@SYJ:~/programs/eclipse-jee-juno/workspace/kodz-all/kodz-hadoop/target/classes$ jar -cvf global-max-cost.jar -C ./ org
  • 拷贝上面生成的jar文件,到NameNode环境中
1 xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ scpshirdrn@172.0.8.212:~/programs/eclipse-jee-juno/workspace/kodz-all/kodz-hadoop/target/classes/global-max-cost.jar ./
2 global-max-cost.jar
  • 上传待处理的数据文件
1 xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -copyFromLocal /opt/stone/cloud/dataset/data_10m /user/xiaoxiang/datasets/cost/
  • 运行我们编写MapReduce任务,计算最大值
1 xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop jar global-max-cost.jar org.shirdrn.kodz.inaction.hadoop.extremum.max.GlobalMaxCostDriver /user/xiaoxiang/datasets/cost /user/xiaoxiang/output/cost

运行过程控制台输出内容,大概如下所示:

01 13/03/22 16:30:16 INFO input.FileInputFormat: Total input paths to process : 1
02 13/03/22 16:30:16 INFO util.NativeCodeLoader: Loaded the native-hadoop library
03 13/03/22 16:30:16 WARN snappy.LoadSnappy: Snappy native library not loaded
04 13/03/22 16:30:16 INFO mapred.JobClient: Running job: job_201303111631_0004
05 13/03/22 16:30:17 INFO mapred.JobClient: map 0% reduce 0%
06 13/03/22 16:30:33 INFO mapred.JobClient: map 22% reduce 0%
07 13/03/22 16:30:36 INFO mapred.JobClient: map 28% reduce 0%
08 13/03/22 16:30:45 INFO mapred.JobClient: map 52% reduce 9%
09 13/03/22 16:30:48 INFO mapred.JobClient: map 57% reduce 9%
10 13/03/22 16:30:57 INFO mapred.JobClient: map 80% reduce 9%
11 13/03/22 16:31:00 INFO mapred.JobClient: map 85% reduce 19%
12 13/03/22 16:31:10 INFO mapred.JobClient: map 100% reduce 28%
13 13/03/22 16:31:19 INFO mapred.JobClient: map 100% reduce 100%
14 13/03/22 16:31:24 INFO mapred.JobClient: Job complete: job_201303111631_0004
15 13/03/22 16:31:24 INFO mapred.JobClient: Counters: 29
16 13/03/22 16:31:24 INFO mapred.JobClient: Job Counters
17 13/03/22 16:31:24 INFO mapred.JobClient: Launched reduce tasks=1
18 13/03/22 16:31:24 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=76773
19 13/03/22 16:31:24 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
20 13/03/22 16:31:24 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
21 13/03/22 16:31:24 INFO mapred.JobClient: Launched map tasks=7
22 13/03/22 16:31:24 INFO mapred.JobClient: Data-local map tasks=7
23 13/03/22 16:31:24 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=40497
24 13/03/22 16:31:24 INFO mapred.JobClient: File Output Format Counters
25 13/03/22 16:31:24 INFO mapred.JobClient: Bytes Written=3029
26 13/03/22 16:31:24 INFO mapred.JobClient: FileSystemCounters
27 13/03/22 16:31:24 INFO mapred.JobClient: FILE_BYTES_READ=142609
28 13/03/22 16:31:24 INFO mapred.JobClient: HDFS_BYTES_READ=448913653
29 13/03/22 16:31:24 INFO mapred.JobClient: FILE_BYTES_WRITTEN=338151
30 13/03/22 16:31:24 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=3029
31 13/03/22 16:31:24 INFO mapred.JobClient: File Input Format Counters
32 13/03/22 16:31:24 INFO mapred.JobClient: Bytes Read=448912799
33 13/03/22 16:31:24 INFO mapred.JobClient: Map-Reduce Framework
34 13/03/22 16:31:24 INFO mapred.JobClient: Map output materialized bytes=21245
35 13/03/22 16:31:24 INFO mapred.JobClient: Map input records=10000000
36 13/03/22 16:31:24 INFO mapred.JobClient: Reduce shuffle bytes=18210
37 13/03/22 16:31:24 INFO mapred.JobClient: Spilled Records=12582
38 13/03/22 16:31:24 INFO mapred.JobClient: Map output bytes=110000000
39 13/03/22 16:31:24 INFO mapred.JobClient: CPU time spent (ms)=80320
40 13/03/22 16:31:24 INFO mapred.JobClient: Total committed heap usage (bytes)=1535639552
41 13/03/22 16:31:24 INFO mapred.JobClient: Combine input records=10009320
42 13/03/22 16:31:24 INFO mapred.JobClient: SPLIT_RAW_BYTES=854
43 13/03/22 16:31:24 INFO mapred.JobClient: Reduce input records=1631
44 13/03/22 16:31:24 INFO mapred.JobClient: Reduce input groups=233
45 13/03/22 16:31:24 INFO mapred.JobClient: Combine output records=10951
46 13/03/22 16:31:24 INFO mapred.JobClient: Physical memory (bytes) snapshot=1706708992
47 13/03/22 16:31:24 INFO mapred.JobClient: Reduce output records=233
48 13/03/22 16:31:24 INFO mapred.JobClient: Virtual memory (bytes) snapshot=4316872704
49 13/03/22 16:31:24 INFO mapred.JobClient: Map output records=10000000
  • 验证Job结果输出
001 xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -cat/user/xiaoxiang/output/cost/part-r-00000
002 AD 999974516
003 AE 999938630
004 AF 999996180
005 AG 999991085
006 AI 999989595
007 AL 999998489
008 AM 999976746
009 AO 999989628
010 AQ 999995031
011 AR 999953989
012 AS 999935982
013 AT 999999909
014 AU 999937089
015 AW 999965784
016 AZ 999996557
017 BA 999949773
018 BB 999987345
019 BD 999992272
020 BE 999925057
021 BF 999999220
022 BG 999971528
023 BH 999994900
024 BI 999978516
025 BJ 999977886
026 BM 999991925
027 BN 999986630
028 BO 999995482
029 BR 999989947
030 BS 999980931
031 BT 999977488
032 BW 999935985
033 BY 999998496
034 BZ 999975972
035 CA 999978275
036 CC 999968311
037 CD 999978139
038 CF 999995342
039 CG 999788112
040 CH 999997524
041 CI 999998864
042 CK 999968719
043 CL 999967083
044 CM 999998369
045 CN 999975367
046 CO 999999167
047 CR 999971685
048 CU 999976352
049 CV 999990543
050 CW 999987713
051 CX 999987579
052 CY 999982925
053 CZ 999993908
054 DE 999985416
055 DJ 999997438
056 DK 999963312
057 DM 999941706
058 DO 999945597
059 DZ 999973610
060 EC 999920447
061 EE 999949534
062 EG 999980522
063 ER 999980425
064 ES 999949155
065 ET 999987033
066 FI 999966243
067 FJ 999990686
068 FK 999966573
069 FM 999972146
070 FO 999988472
071 FR 999988342
072 GA 999982099
073 GB 999970658
074 GD 999996318
075 GE 999991970
076 GF 999982024
077 GH 999941039
078 GI 999995295
079 GL 999948726
080 GM 999967823
081 GN 999951804
082 GP 999904645
083 GQ 999988635
084 GR 999999672
085 GT 999972984
086 GU 999919056
087 GW 999962551
088 GY 999999881
089 HK 999970084
090 HN 999972628
091 HR 999986688
092 HT 999970913
093 HU 999997568
094 ID 999994762
095 IE 999996686
096 IL 999982184
097 IM 999987831
098 IN 999914991
099 IO 999968575
100 IQ 999990126
101 IR 999986780
102 IS 999973585
103 IT 999997239
104 JM 999982209
105 JO 999977276
106 JP 999983684
107 KE 999996012
108 KG 999991556
109 KH 999975644
110 KI 999994328
111 KM 999989895
112 KN 999991068
113 KP 999967939
114 KR 999992162
115 KW 999924295
116 KY 999977105
117 KZ 999992835
118 LA 999989151
119 LB 999963014
120 LC 999962233
121 LI 999986863
122 LK 999989876
123 LR 999897202
124 LS 999957706
125 LT 999999688
126 LU 999999823
127 LV 999945411
128 LY 999992365
129 MA 999922726
130 MC 999978886
131 MD 999996042
132 MG 999996602
133 MH 999989668
134 MK 999968900
135 ML 999990079
136 MM 999987977
137 MN 999969051
138 MO 999977975
139 MP 999995234
140 MQ 999913110
141 MR 999982303
142 MS 999974690
143 MT 999982604
144 MU 999988632
145 MV 999961206
146 MW 999991903
147 MX 999978066
148 MY 999995010
149 MZ 999981189
150 NA 999961177
151 NC 999961053
152 NE 999990091
153 NF 999989399
154 NG 999985037
155 NI 999965733
156 NL 999949789
157 NO 999993122
158 NP 999972410
159 NR 999956464
160 NU 999987046
161 NZ 999998214
162 OM 999967428
163 PA 999924435
164 PE 999981176
165 PF 999959978
166 PG 999987347
167 PH 999981534
168 PK 999954268
169 PL 999996619
170 PM 999998975
171 PR 999906386
172 PT 999993404
173 PW 999991278
174 PY 999985509
175 QA 999995061
176 RE 999952291
177 RO 999994148
178 RS 999999923
179 RU 999894985
180 RW 999980184
181 SA 999973822
182 SB 999972832
183 SC 999973271
184 SD 999963744
185 SE 999972256
186 SG 999977637
187 SH 999983638
188 SI 999980580
189 SK 999998152
190 SL 999999269
191 SM 999941188
192 SN 999990278
193 SO 999973175
194 SR 999975964
195 ST 999980447
196 SV 999999945
197 SX 999903445
198 SY 999988858
199 SZ 999992537
200 TC 999969540
201 TD 999999303
202 TG 999977640
203 TH 999968746
204 TJ 999983666
205 TK 999971131
206 TM 999958998
207 TN 999963035
208 TO 999947915
209 TP 999986796
210 TR 999995112
211 TT 999984435
212 TV 999971989
213 TW 999975092
214 TZ 999992734
215 UA 999970993
216 UG 999976267
217 UM 999998377
218 US 999912229
219 UY 999989662
220 UZ 999982762
221 VA 999975548
222 VC 999991495
223 VE 999997971
224 VG 999949690
225 VI 999990063
226 VN 999974393
227 VU 999953162
228 WF 999947666
229 WS 999970242
230 YE 999984650
231 YT 999994707
232 ZA 999998692
233 ZM 999973392
234 ZW 999928087

可见,结果是我们所期望的。

目录
相关文章
|
3月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
99 2
|
1月前
|
数据采集 分布式计算 Hadoop
使用Hadoop MapReduce进行大规模数据爬取
使用Hadoop MapReduce进行大规模数据爬取
|
3月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
145 3
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
152 0
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
66 0
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
86 0
|
8月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
92 1
|
7月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
74 1
|
7月前
|
数据采集 SQL 分布式计算
|
8月前
|
分布式计算 Hadoop Java
Hadoop MapReduce 调优参数
对于 Hadoop v3.1.3,针对三台4核4G服务器的MapReduce调优参数包括:`mapreduce.reduce.shuffle.parallelcopies`设为10以加速Shuffle,`mapreduce.reduce.shuffle.input.buffer.percent`和`mapreduce.reduce.shuffle.merge.percent`分别设为0.8以减少磁盘IO。
88 1

相关实验场景

更多