假设这些信息都存存储在一个文件里
时间数 省份 城市 用户 广告
如下所示:
(中间字段使用空格隔开)
import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.recommendation.Rating; import com.clearspring.analytics.util.Lists; import scala.Tuple2; public class test4 { public static void main(String[] args) { // TODO Auto-generated method stub SparkConf sparkConf = new SparkConf().setAppName("PeopleInfoCalculator").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD<String> filedata=sc.textFile("file:///home/gyq/eclipse-workspace/ALS/PeopleInfo.txt"); JavaRDD<String> data1=filedata.flatMap(f->Arrays.asList(f.split(" ")[1]).iterator()); JavaRDD<String> data2=filedata.flatMap(f->Arrays.asList(f.split(" ")[2]).iterator()); JavaPairRDD<Tuple2<String,String>,Integer> PAdata=filedata.mapToPair(s->{ return new Tuple2<Tuple2<String,String>,Integer>( new Tuple2<String,String>(s.split(" ")[1],s.split(" ")[4]),1); }); JavaPairRDD<Tuple2<String,String>,Integer> ggs=PAdata.reduceByKey((x,y)->x+y); // ggs.foreach(f->System.out.println("(省份,"+ "广告),点击数"+f));// JavaRDD<String> gss1=ggs.map(f->{return f._1._1+"-"+f._1._2+"-"+f._2();}); //gss1.foreach(f->System.out.println("(省份,"+ "广告),点击数"+f)); JavaRDD<String> gss2=gss1.sortBy(f->{return Integer.valueOf(f.split("-")[2]);}, false,1); JavaPairRDD<Integer,String> gss3=gss2.mapToPair(f-> {return new Tuple2<Integer,String>(Integer.valueOf(f.split("-")[0]),f.split("-")[1]+"-"+f.split("-")[2]);}); JavaPairRDD<Integer, Iterable<String>> gss4=gss3.groupByKey(); JavaPairRDD<Integer, Iterable<String>> gss5= gss4.mapValues(f->{ List<String> ls=Lists.newArrayList(f); String bb; List<String> top3=new ArrayList<>(); for(int i=0;i<3;i++) { bb=ls.get(i); top3.add(bb); } return top3; }); JavaPairRDD<Integer, Iterable<String>> gss6=gss5.sortByKey(); gss6.foreach(f->System.out.println(f)); } }
答案如下:
或者这样也能出答案
package thisterm; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.recommendation.Rating; import com.clearspring.analytics.util.Lists; import scala.Tuple2; public class test4 { public static void main(String[] args) { // TODO Auto-generated method stub SparkConf sparkConf = new SparkConf().setAppName("PeopleInfoCalculator").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD<String> filedata=sc.textFile("file:///home/gyq/eclipse-workspace/ALS/PeopleInfo.txt"); JavaRDD<String> data1=filedata.flatMap(f->Arrays.asList(f.split(" ")[1]).iterator()); JavaRDD<String> data2=filedata.flatMap(f->Arrays.asList(f.split(" ")[2]).iterator()); JavaPairRDD<Tuple2<String,String>,Integer> PAdata=filedata.mapToPair(s->{ return new Tuple2<Tuple2<String,String>,Integer>( new Tuple2<String,String>(s.split(" ")[1],s.split(" ")[4]),1); }); JavaPairRDD<Tuple2<String,String>,Integer> ggs=PAdata.reduceByKey((x,y)->x+y); // ggs.foreach(f->System.out.println("(省份,"+ "广告),点击数"+f));// JavaRDD<String> gss1=ggs.map(f->{return f._1._1+"-"+f._1._2+"-"+f._2();}); //gss1.foreach(f->System.out.println("(省份,"+ "广告),点击数"+f)); JavaRDD<String> gss2=gss1.sortBy(f->{return Integer.valueOf(f.split("-")[2]);}, false,1); JavaPairRDD<Integer,String> gss3=gss2.mapToPair(f-> {return new Tuple2<Integer,String>(Integer.valueOf(f.split("-")[0]),f.split("-")[1]+"-"+f.split("-")[2]);}); JavaPairRDD<Integer, Iterable<String>> gss4=gss3.groupByKey(); JavaPairRDD<Integer,String> prdd3=gss3.reduceByKey((a,b)->{ if(a.split(";").length<3) { return a+";"+b; }else { return a; } }); prdd3.foreach(f->System.err.println(f)); // JavaPairRDD<Integer, Iterable<String>> gss6=gss5.sortByKey(); //gss6.foreach(f->System.out.println(f)); } }

