TopN 案例|学习笔记

简介: 快速学习 TopN 案例

开发者学堂课程【Hadoop 企业优化及扩展案例:TopN 案例】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/96/detail/1575


TopN 案例


1.需求

对需求 2.4 输出结果进行加工,输出流量使用量在前 10 的用户信息。
(1)输入数据top10input.xt
(2)输出数据part-r-00000.txt


2.需求分析

图片24.png

3、实现代码

(1) 编写 FlowBean 类

package com. atguigu.mr. top;.
import java. io. DataInput;
import java. io. Dataoutput;+import java. io. IOException;.
import org. apache . hadoop. io. WritableComparable;.
public class FlowBean implements WritableComparable<FlowBean>{
private long upFlow;
Private  long  downFlow;'
private  long sumFlow;,
public  FlowBean() 
super ();
public FlowBean (long upFlow, long downFlow) {
super ();.
this.upFlow=upFlow;
this.downFlow=downFlow;
}
@override.
public void write(DataOutput out)throws IOException {
out. writeLong (upFlow);.
out. writeLong (downFlow);.
out. writeLong (sumFlow);
}.
@override,
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong ();
downFlow = in.readLong();,
sumFlow = in. readLong();
}
public long getUpFlow() {
return upFlow;,
public void setUpFlow (long upFlow) {
this. upFlow = upFlow;.
public long getDownFlow() {
return downFlow;,
public void setDownFlow (long downFlow) {
this.downFloW = downFloW;
}
public long getSumFlow() 
return sumFlow;
}
public void setSumFlow (long sumFlow) {
this.sumFlow = sumFlow;·
@override.
public string tostring(){
return upFlow + "\t" + downFlow + "\t" + sumFlow;
public void set(long downFlow2, long upFlow2){
downFlow = downFlow2;
upFlow = upFlow2;·
sumFlow = downFlow2 + upFlow2;
}
@override.
public int compareTo (FlowBean Bean) {
int  result; 
if (this.sumFlow>bean.getSumFlow()){
result = -1 ;,
}else if (this.sumFlow < bean.getSumFlow() ){
result = 1;
}else {
Return result; 
}

(2) 编写 TopNMapper 类

(2)package com.atguigu.mr.top;.
import java.io. IOException;.
import  java.util. Iterator;.
import java.util. TreeMap;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public  class  TopNMapper extends Mapper <LongWritable,Text,
FlowBean,Text>{
//定义一个TreeMap作为存储数据的容器(天然按key排序)
Private TreeMap<FlowBean,  Text>  flowMap  = new
TreeMap<FlowBean,Text>();
private FlowBean kBean; ,
@override.
Protected  void map(LongWritable key,Text value  Context,context) throws IOException, InterruptedException {
kBean =newFlowBean();
Text V = newText();,
//  1 获取一行 
String line = value.tostring();
//  2 切割
string []fields = line.split ("\t");
//  3 封装数据
String phoneNum = fields[O];,
long upFlow = Long.parseLong (fields[1]);
Long  downFlow = Long. parseLong(fields[2] ) ;
long sumFlow =  Long.parseLong (fields[3]);
kBean.set.DownFlow(downFlow);
kBean.set.UpFlow(upFlow);.
kBean.set.SumFlow(sumFlow) ;,
V.set(phoneNum);
//4向TreeMap中添加数据
flowMap.put (kBean,v) ;·
//5限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据if (flowMap.size() >10){
flowMap.remove(flowMap. firstKey());
flowMap.remove(flowMap. lastKey());
}
@override.
protected void cleanup (context  context) throws IOException,
Inter ruptedException {
//6遍历treeMap集合,输出数据.
Iterator<FlowBean> bean = flowMap.keySet().iterator();
while (bean.hasNext()){
FlowBean k = bean.next () ;
context.write (k,flowMap.get(k));
}

(3)编写 TopNReducer 类 

package com.atguigu.mr.top;.
import java.io.IOException;
importjava.util.Iterator;
importjava.util.TreeMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;.
public class TopNReducer extends Reducer<FlowBean,Text,Text,FlowBean> 
//定义一个TreeMap作为存储数据的容器(天然按key排序)
TreeMap<FlowBean,Text>FlowMap=newTreeMap<FlowBean,Text>();,
@override.
protected void reduce (FlowBean key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
for  (Text value:values) {
FlowBean bean = new FlowBean();
bean.set (key.getDownFlow(),key.getUpFlow());
//1向treeMap集合中添加数据。
flowMap.put (bean, newText(value));.
//2限制TreeMap数据量,超过10条就删除掉流量最小的一条数据。if(flowMap.size () > 10) {
// flowMap. remove (flowMap. firstKey());
flowMap. remove ( flowMap. lastKey() ) ;,
@override
protected  void  cleanup (Reducer<FlowBean,Text,Text,
FlowBean> . Context context )
throws   IOException,InterruptedException{
//3遍历集合,输出数据
Iterator<FlowBean> it = flowMap.keySet().iterator();
while (it.hasNext()) {
context.write (new Text (flowMab.get (v) );

(4)编写TopNDriver类

package com. atguigu.mr.top;'
import org.apache .hadoop . conf . Configuration;.
import  org.apache .hadoop. fs. Path;.
import org.apache .hadoop. io. Text;.
import org.apache .hadoop .mapreduce .Job;+
import org.apache .hadoop. mapreduce.lib. input. File InputFormat;
Impor
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;.
public class TopNDriver {
public static void main(String[] args) throws Exception i.
args=new string[]("e: /output1", "e: /output3")/
//1获取配置信息,或者job对象实例。
//6指定本程序的jar包所在的本地路径job.setJarByClass (TopNDriver.class) ;
//2指定本业务job要使用的mapper/Reducer业务类。job. setMappe rClass (TopNMapper .class) ;
job. setReducerClass (TopNReducer .class) ;
//3指定mappe r: 输出数据的kv类型。
job. setMapOutputKeyClass (FlowBean.class);
job. setMapOutputValueClass (Text.class) ;.
//4指定最终输出的数据的kv类型。
job. setoutputKeyClass (Text.class) ;
job. setoutputValueClass (FlowBean.class) ;
//5指定job的输入原始文件所在目录
FileInputFormat. setInputPaths (job,new Path(args[0]) ) ;.
FileOutputFormat. setoutputPath(job, newPath(args[1]) ) ;
//7将job中配置的相关参数,以及job所用的java类所在的jar包,
提交给yarn去运行
boolean result=job. waitForCompletion (true);
System.exit (result ? 0 : 1) ;.
Configuration configuration = new Configuration() ;
Job job = Job.getInsance(configuration);.
相关文章
|
6月前
|
SQL 大数据 HIVE
每天一道大厂SQL题【Day10】电商分组TopK实战
每天一道大厂SQL题【Day10】电商分组TopK实战
67 0
|
5月前
|
缓存 分布式计算 大数据
MaxCompute产品使用合集之行转列的函数如何与group by和聚合函数一起使用
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
JSON 安全 搜索推荐
白日梦的Elasticsearch实战笔记,32个查询案例、15个聚合案例、7个查询优化技巧(一)
白日梦的Elasticsearch实战笔记,32个查询案例、15个聚合案例、7个查询优化技巧(一)
978 0
|
分布式计算 Spark
|
数据挖掘
白话Elasticsearch42-深入聚合数据分析之案例实战__bucket filter:统计牌品最近一个月的平均价格(Filter Aggregation)
白话Elasticsearch42-深入聚合数据分析之案例实战__bucket filter:统计牌品最近一个月的平均价格(Filter Aggregation)
137 0
|
数据挖掘
白话Elasticsearch37-深入聚合数据分析之案例实战Date Histogram Aggregation:统计每月电视销量
白话Elasticsearch37-深入聚合数据分析之案例实战Date Histogram Aggregation:统计每月电视销量
95 0
|
流计算
Flink应用简单案例-统计TopN
Flink应用简单案例-统计TopN
Flink应用简单案例-统计TopN
|
存储 缓存 流计算
Flink应用案例统计实现TopN的两种方式
在窗口中可以用一个 HashMap 来保存每个 url 的访问次数,只要遍历窗口中的所有数据,自然就能得到所有 url 的热门度。最后把 HashMap 转成一个列表 ArrayList,然后进行排序、取出前两名输出就可以了。
617 0
Flink应用案例统计实现TopN的两种方式
|
分布式计算 大数据 Spark
聚合操作_多维聚合_rollup 案例 | 学习笔记
快速学习聚合操作_多维聚合_rollup 案例
聚合操作_多维聚合_rollup 案例 | 学习笔记
|
存储
Elastic实战: 通过bucket_sort针对聚合后结果实现分页、排序
elaticsearch中实现聚合操作十分常见,同时es本身存储的数据量一般都比较大,因此聚合结果数量通常都比较多,所以针对聚合结果进行分页,也是非常常见的需求
621 0
Elastic实战: 通过bucket_sort针对聚合后结果实现分页、排序