开发者学堂课程【Hadoop 企业优化及扩展案例:TopN 案例】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/96/detail/1575
TopN 案例
1.需求:
对需求 2.4 输出结果进行加工,输出流量使用量在前 10 的用户信息。
(1)输入数据:top10input.xt
(2)输出数据:part-r-00000.txt
2.需求分析
3、实现代码
(1) 编写 FlowBean 类
package com. atguigu.mr. top;. import java. io. DataInput; import java. io. Dataoutput;+import java. io. IOException;. import org. apache . hadoop. io. WritableComparable;. public class FlowBean implements WritableComparable<FlowBean>{ private long upFlow; Private long downFlow;' private long sumFlow;, public FlowBean() super (); public FlowBean (long upFlow, long downFlow) { super ();. this.upFlow=upFlow; this.downFlow=downFlow; } @override. public void write(DataOutput out)throws IOException { out. writeLong (upFlow);. out. writeLong (downFlow);. out. writeLong (sumFlow); }. @override, public void readFields(DataInput in) throws IOException { upFlow = in.readLong (); downFlow = in.readLong();, sumFlow = in. readLong(); } public long getUpFlow() { return upFlow;, public void setUpFlow (long upFlow) { this. upFlow = upFlow;. public long getDownFlow() { return downFlow;, public void setDownFlow (long downFlow) { this.downFloW = downFloW; } public long getSumFlow() return sumFlow; } public void setSumFlow (long sumFlow) { this.sumFlow = sumFlow;· @override. public string tostring(){ return upFlow + "\t" + downFlow + "\t" + sumFlow; public void set(long downFlow2, long upFlow2){ downFlow = downFlow2; upFlow = upFlow2;· sumFlow = downFlow2 + upFlow2; } @override. public int compareTo (FlowBean Bean) { int result; if (this.sumFlow>bean.getSumFlow()){ result = -1 ;, }else if (this.sumFlow < bean.getSumFlow() ){ result = 1; }else { Return result; }
(2) 编写 TopNMapper 类
(2)package com.atguigu.mr.top;. import java.io. IOException;. import java.util. Iterator;. import java.util. TreeMap; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class TopNMapper extends Mapper <LongWritable,Text, FlowBean,Text>{ //定义一个TreeMap作为存储数据的容器(天然按key排序) Private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean,Text>(); private FlowBean kBean; , @override. Protected void map(LongWritable key,Text value Context,context) throws IOException, InterruptedException { kBean =newFlowBean(); Text V = newText();, // 1 获取一行 String line = value.tostring(); // 2 切割 string []fields = line.split ("\t"); // 3 封装数据 String phoneNum = fields[O];, long upFlow = Long.parseLong (fields[1]); Long downFlow = Long. parseLong(fields[2] ) ; long sumFlow = Long.parseLong (fields[3]); kBean.set.DownFlow(downFlow); kBean.set.UpFlow(upFlow);. kBean.set.SumFlow(sumFlow) ;, V.set(phoneNum); //4向TreeMap中添加数据 flowMap.put (kBean,v) ;· //5限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据if (flowMap.size() >10){ flowMap.remove(flowMap. firstKey()); flowMap.remove(flowMap. lastKey()); } @override. protected void cleanup (context context) throws IOException, Inter ruptedException { //6遍历treeMap集合,输出数据. Iterator<FlowBean> bean = flowMap.keySet().iterator(); while (bean.hasNext()){ FlowBean k = bean.next () ; context.write (k,flowMap.get(k)); }
(3)编写 TopNReducer 类
package com.atguigu.mr.top;. import java.io.IOException; importjava.util.Iterator; importjava.util.TreeMap; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;. public class TopNReducer extends Reducer<FlowBean,Text,Text,FlowBean> //定义一个TreeMap作为存储数据的容器(天然按key排序) TreeMap<FlowBean,Text>FlowMap=newTreeMap<FlowBean,Text>();, @override. protected void reduce (FlowBean key,Iterable<Text> values,Context context) throws IOException, InterruptedException { for (Text value:values) { FlowBean bean = new FlowBean(); bean.set (key.getDownFlow(),key.getUpFlow()); //1向treeMap集合中添加数据。 flowMap.put (bean, newText(value));. //2限制TreeMap数据量,超过10条就删除掉流量最小的一条数据。if(flowMap.size () > 10) { // flowMap. remove (flowMap. firstKey()); flowMap. remove ( flowMap. lastKey() ) ;, @override protected void cleanup (Reducer<FlowBean,Text,Text, FlowBean> . Context context ) throws IOException,InterruptedException{ //3遍历集合,输出数据 Iterator<FlowBean> it = flowMap.keySet().iterator(); while (it.hasNext()) { context.write (new Text (flowMab.get (v) );
(4)编写TopNDriver类
package com. atguigu.mr.top;' import org.apache .hadoop . conf . Configuration;. import org.apache .hadoop. fs. Path;. import org.apache .hadoop. io. Text;. import org.apache .hadoop .mapreduce .Job;+ import org.apache .hadoop. mapreduce.lib. input. File InputFormat; Impor org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;. public class TopNDriver { public static void main(String[] args) throws Exception i. args=new string[]("e: /output1", "e: /output3")/ //1获取配置信息,或者job对象实例。 //6指定本程序的jar包所在的本地路径job.setJarByClass (TopNDriver.class) ; //2指定本业务job要使用的mapper/Reducer业务类。job. setMappe rClass (TopNMapper .class) ; job. setReducerClass (TopNReducer .class) ; //3指定mappe r: 输出数据的kv类型。 job. setMapOutputKeyClass (FlowBean.class); job. setMapOutputValueClass (Text.class) ;. //4指定最终输出的数据的kv类型。 job. setoutputKeyClass (Text.class) ; job. setoutputValueClass (FlowBean.class) ; //5指定job的输入原始文件所在目录 FileInputFormat. setInputPaths (job,new Path(args[0]) ) ;. FileOutputFormat. setoutputPath(job, newPath(args[1]) ) ; //7将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result=job. waitForCompletion (true); System.exit (result ? 0 : 1) ;. Configuration configuration = new Configuration() ; Job job = Job.getInsance(configuration);.