题目:广告点击数据统计。
根据动态生成的黑名单进行过滤,实时统计广告点击流量前三。
背景:
在广告点击计费系统中,我们在线过滤掉黑名单的点击,进而保护广告商的利益,只进行有效的广告点击计费 。或者在防刷评分(或者流量)系统,过滤掉无效的投票或者评分或者流量。
实现目标:
1、实现实时动态黑名单机制:将每天对某个广告点击超过N次的用户拉黑。
2、基于黑名单的非法广告点击流量过滤机制。
3、每天各广告的点击流量实时统计。
4、统计每天Top3热门广告。
请完成Spark程序的编写,并完成实验报告上传。实验报告中应该包含实验步骤,代码,运行截图等。
第一步就是要安装mysql到虚拟机里面,然后在mysql里面建表
数据如下:
3333 flume 4444 ooize 5555 flume 4444 ooize 5555 flume 2222 hive 3333 hadoop 4444 hbase 3333 flume 4444 ooize 5555 flume flume 1 hadoop 2
import java.io.Serializable; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import org.apache.spark.streaming.Durations; public class finaltest { public static void main(String[] args) throws InterruptedException { // TODO Auto-generated method stub //1.获取实时数据 SparkConf sparkConf = new SparkConf().setAppName("Streaming").setMaster("local[2]"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,Durations.seconds(60)); JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER); //2.处理数据,获得每天对某个广告点击超过N次的用户 JavaPairDStream<String,String> data = lines.mapToPair(f -> new Tuple2<>(f.split(",")[0],f.split(",")[1])); data.foreachRDD(rdd -> { JavaRDD<Advert> adRDD = rdd.map(f -> { Advert ad = new Advert(); ad.setUserId(f._1); ad.setAdvertId(f._2); return ad; }); SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); Dataset<Row> words = spark.createDataFrame(adRDD,Advert.class); words.createOrReplaceGlobalTempView("words"); Dataset<Row> result = spark.sql("select userId from (select userId,advertId,count(*) from words group by userId,advertId having count(*) > 2 a"); //3.将实时产生的黑名单存入MYSQL数据库 result.write().format("jdbc").option ("url","jdbc:mysql://localhost:3306/studentInfo").option ("driver","com.mysql.jdbc.Driver").option ("dbtable","lists").option ("user","debian-sys-maint").option ("password","6KCiLZuGt5t8EuZU").mode("append").save(); }); //4.实时从MYSQL中读取黑名单 JavaPairDStream<String,Integer> data2 = data.transformToPair(rdd -> { SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); Dataset<Row> jdbcDF = spark.read().format("jdbc").option ("url", "jdbc:mysql://localhost:3306/studentInfo").option ("driver","com.mysql.jdbc.Driver").option ("dbtable","lists").option ("user","debian-sys-maint").option ("password","6KCiLZuGt5t8EuZU").load(); JavaRDD<Row> stu = ssc.sparkContext().parallelize(jdbcDF.collectAsList()); JavaRDD<String> rdd1 = stu.map(f -> f.toString()); List<String> rdd2 = rdd1.distinct().collect(); //5.根据黑名单内容对获得的广告点击数据进行过滤,去掉黑名单中的用户 JavaPairRDD<String,String> rdd3 = rdd.filter(f -> !(rdd2.contains(f._1))); //6.实时统计广告点击数 7.输出前三点击量的广告到文件 JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f -> new Tuple2<String,Integer>(f._2,1)).reduceByKey((x,y) -> x+y); JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f -> f.swap()).sortByKey(false).mapToPair(f -> f.swap()); JavaPairRDD<String,Integer> rdd6 = ssc.sparkContext().parallelizePairs(rdd5.take(3)); return rdd6; }); data2.dstream().repartition(1).saveAsTextFiles("/home/yyl/data/top3","spark"); ssc.start(); ssc.awaitTermination(); } //2.处理数据,获得每天对某个广告点击超过N次的用户 public static class JavaSparkSessionSingleton{ private static transient SparkSession instance = null; public static SparkSession getInstance(SparkConf sparkConf) { if(instance == null) { instance = SparkSession.builder().config(sparkConf).getOrCreate(); } return instance; } } public static class Advert implements Serializable{ private String userId; private String advertId; public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getAdvertId() { return advertId; } public void setAdvertId(String advertId) { this.advertId = advertId; } } } -============================================================================================================= package thisterm; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; public class filterblock { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws IOException, InterruptedException { if (args.length < 2) { System.err.println("需要传入参数:主机名端口号"); System.exit(1); } SparkConf sparkConf = new SparkConf().setAppName("JavaNetWorkWordCount").setMaster("local[2]"); JavaStreamingContext scc = new JavaStreamingContext(sparkConf,Durations.seconds(10)); // JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]")); SQLContext sqlContext = new SQLContext(scc.sparkContext()); String url = "jdbc:mysql://localhost:3306/name"; Properties connectionProperties = new Properties(); connectionProperties.put("user","root"); connectionProperties.put("password","123456"); connectionProperties.put("driver","com.mysql.cj.jdbc.Driver"); JavaReceiverInputDStream<String> lines = scc.socketTextStream(args[0],Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER); JavaDStream<String> words = lines.flatMap(f -> { return Arrays.asList(f).iterator(); }); JavaPairDStream<String,Integer> wordCounts = words.mapToPair(f -> new Tuple2<>(f,1)).reduceByKey((x,y) -> x + y); JavaPairDStream<String,Integer> wordCountfiltter=wordCounts.filter(f->f._2>2);//333 flume 1 JavaDStream<String> wordC=wordCountfiltter.map(f->f._1.split(" ")[1]+" "+f._2);//flume 1 JavaDStream<Row> personsRDD = wordC.map(new Function<String,Row>(){ public Row call(String line) throws Exception { String[] splited = line.split(" "); return RowFactory.create(splited[0],Integer.valueOf(splited[1])); } }); List structFields = new ArrayList(); structFields.add(DataTypes.createStructField("bname",DataTypes.StringType,true)); structFields.add(DataTypes.createStructField("number",DataTypes.IntegerType,true)); StructType structType = DataTypes.createStructType(structFields); personsRDD.foreachRDD(f->{ Dataset personsDF = sqlContext.createDataFrame(f,structType); personsDF.write().mode("append").jdbc(url,"blockname1",connectionProperties); }); List<String> listblock=new ArrayList<String>(); personsRDD.foreachRDD(f->{ Dataset<Row> personsDF = sqlContext.createDataFrame(f,structType); Dataset<Row> readfile= sqlContext.read().jdbc(url,"blockname1",connectionProperties); JavaRDD<Row> stu=scc.sparkContext().parallelize(readfile.collectAsList()); JavaRDD<String> rdd1=stu.map(f1->f1.toString().split(",")[0].substring(1)); rdd1.foreach(f2->System.err.println(f2)); List<String> list = rdd1.distinct().collect(); //5.根据黑名单内容对获得的广告点击数据进行过滤,去掉黑名单中的用户 listblock.addAll(list); // System.out.println(stu.toString()); // readfile.show(); }); words.foreachRDD(f->{ JavaPairRDD<String,String> rdd=f.mapToPair(s->new Tuple2<String,String>(s.split(" ")[0],s.split(" ")[1])); JavaPairRDD<String,String> rdd3 = rdd.filter(ff -> !(listblock.contains(ff._1))); //6.实时统计广告点击数 7.输出前三点击量的广告到文件 JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f3 -> new Tuple2<String,Integer>(f3._2,1)).reduceByKey((x,y) -> x+y); JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f4 -> f4.swap()).sortByKey(false).mapToPair(f4 -> f4.swap()); JavaPairRDD<String,Integer> rdd6 = scc.sparkContext().parallelizePairs(rdd5.take(3)); }); wordCountfiltter.print(); scc.start(); scc.awaitTermination(); } }
代码如下
第一个类
import java.io.Serializable; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; public class heimingdan { public static class Advert implements Serializable{ private String userid; public String getUserid() { return userid; } public void setUserid(String userid) { this.userid = userid; } public String getAdvertid() { return advertid; } public void setAdvertid(String advertid) { this.advertid = advertid; } private String advertid; } public static void main(String[] args) throws InterruptedException { SparkConf sparkConf = new SparkConf().setAppName("Streaming").setMaster("local[2]"); JavaStreamingContext ssc =new JavaStreamingContext(sparkConf,Durations.seconds(60)); JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER); //鑾峰彇瀹炴椂鏁版嵁 JavaPairDStream<String,String> data=lines.mapToPair(f->new Tuple2<>(f.split(",")[0],f.split(",")[1])); data.foreachRDD(rdd->{ JavaRDD<Advert> adRDD =rdd.map(f->{ Advert ad = new Advert(); ad.setUserid(f._1); ad.setAdvertid(f._2); return ad; }); SparkSession spark= JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); Dataset<Row> words = spark.createDataFrame(adRDD, Advert.class); words.createOrReplaceTempView("words"); Dataset<Row> result = spark.sql("select userid from(select userid,advertid,count(*)from words group by userid,advertid having count(*)>2)a"); //澶勭悊鏁版嵁锛岃幏寰楁瘡澶╁鏌愪釜骞垮憡鏁拌秴杩�2娆$殑鐢ㄦ埛 result.write().format("jdbc").option("url","jdbc:mysql://localhost:3306/studentInfo").option("driver","com.mysql.jdbc.Driver") .option("dbtable","hmd").option("user","root").option("password","123456").mode("append").save(); }); //灏嗗疄鏃朵骇鐢熺殑榛戝悕鍗曞瓨鍏ySQL鏁版嵁搴� JavaPairDStream<String,Integer> data2 = data.transformToPair(rdd->{ SparkSession spark= JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); Dataset<Row> jdbcDF = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/studentInfo") .option("driver","com.mysql.jdbc.Driver").option("dbtable","hmd").option("user","root").option("password","123456").load(); JavaRDD<Row> stu = ssc.sparkContext().parallelize(jdbcDF.collectAsList()); JavaRDD<String> rdd1 = stu.map(f->f.toString()); List<String> list = rdd1.distinct().collect(); //瀹炴椂浠嶮ySQL涓鍙栭粦鍚嶅崟 JavaPairRDD<String,String> rdd3 =rdd.filter(f->!(list.contains(f)));//杩囨护鎺夐粦鍚嶅崟 JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f->new Tuple2<>(f._2,1)).reduceByKey((x,y)->x+y); //瀹炴椂缁熻骞垮憡鐐瑰嚮鏁� JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f->f.swap()).sortByKey(false).mapToPair(f->f.swap()); JavaPairRDD<String,Integer> rdd6 = ssc.sparkContext().parallelizePairs(rdd5.take(3));//杈撳嚭鍓嶄笁鐐瑰嚮閲忕殑骞垮憡鍒版枃浠� return rdd6; }); data2.dstream().repartition(1).saveAsTextFiles("/home/gyq/妗岄潰/blacklist","spark"); data2.print(); ssc.start(); ssc.awaitTermination(); } }
第二个类
import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; public class JavaSparkSessionSingleton { private static transient SparkSession instance = null; public static SparkSession getInstance(SparkConf sparkConf) { if(instance == null) { instance = SparkSession.builder().config(sparkConf).getOrCreate(); } return instance; } }