spark_黑名单过滤题目:广告点击数据统计。

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: spark_黑名单过滤题目:广告点击数据统计。

题目:广告点击数据统计。


根据动态生成的黑名单进行过滤,实时统计广告点击流量前三。


背景:


在广告点击计费系统中,我们在线过滤掉黑名单的点击,进而保护广告商的利益,只进行有效的广告点击计费 。或者在防刷评分(或者流量)系统,过滤掉无效的投票或者评分或者流量。


实现目标:


1、实现实时动态黑名单机制:将每天对某个广告点击超过N次的用户拉黑。


2、基于黑名单的非法广告点击流量过滤机制。


3、每天各广告的点击流量实时统计。


4、统计每天Top3热门广告。


请完成Spark程序的编写,并完成实验报告上传。实验报告中应该包含实验步骤,代码,运行截图等。


31.1.gif

电子信息学院专业实验报告.doc


第一步就是要安装mysql到虚拟机里面,然后在mysql里面建表


数据如下:


3333 flume
4444 ooize
5555 flume
4444 ooize
5555 flume
2222 hive
3333 hadoop
4444 hbase
3333 flume
4444 ooize
5555 flume
flume 1
hadoop 2
import java.io.Serializable;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import org.apache.spark.streaming.Durations;
public class finaltest {
  public static void main(String[] args) throws InterruptedException {
  // TODO Auto-generated method stub
  //1.获取实时数据
  SparkConf sparkConf = new SparkConf().setAppName("Streaming").setMaster("local[2]");
  JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,Durations.seconds(60));
  JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER);
  //2.处理数据,获得每天对某个广告点击超过N次的用户
  JavaPairDStream<String,String> data = lines.mapToPair(f -> new Tuple2<>(f.split(",")[0],f.split(",")[1]));
  data.foreachRDD(rdd -> {
    JavaRDD<Advert> adRDD = rdd.map(f -> {
    Advert ad = new Advert();
    ad.setUserId(f._1);
    ad.setAdvertId(f._2);
    return ad;
    });
  SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
  Dataset<Row> words = spark.createDataFrame(adRDD,Advert.class);
  words.createOrReplaceGlobalTempView("words");
  Dataset<Row> result = spark.sql("select userId from (select userId,advertId,count(*) from words group by userId,advertId having count(*) > 2 a");
  //3.将实时产生的黑名单存入MYSQL数据库 
  result.write().format("jdbc").option
    ("url","jdbc:mysql://localhost:3306/studentInfo").option
    ("driver","com.mysql.jdbc.Driver").option
    ("dbtable","lists").option
    ("user","debian-sys-maint").option
    ("password","6KCiLZuGt5t8EuZU").mode("append").save();
  });
  //4.实时从MYSQL中读取黑名单
  JavaPairDStream<String,Integer> data2 = data.transformToPair(rdd -> {
    SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
    Dataset<Row> jdbcDF = spark.read().format("jdbc").option
      ("url", "jdbc:mysql://localhost:3306/studentInfo").option
      ("driver","com.mysql.jdbc.Driver").option
      ("dbtable","lists").option
      ("user","debian-sys-maint").option
      ("password","6KCiLZuGt5t8EuZU").load();
      JavaRDD<Row> stu = ssc.sparkContext().parallelize(jdbcDF.collectAsList());
      JavaRDD<String> rdd1 = stu.map(f -> f.toString());
      List<String> rdd2 = rdd1.distinct().collect();
      //5.根据黑名单内容对获得的广告点击数据进行过滤,去掉黑名单中的用户
      JavaPairRDD<String,String> rdd3 = rdd.filter(f -> !(rdd2.contains(f._1)));
      //6.实时统计广告点击数 7.输出前三点击量的广告到文件
      JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f -> new Tuple2<String,Integer>(f._2,1)).reduceByKey((x,y) -> x+y);
      JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f -> f.swap()).sortByKey(false).mapToPair(f -> f.swap());
      JavaPairRDD<String,Integer> rdd6 = ssc.sparkContext().parallelizePairs(rdd5.take(3));
      return rdd6;    
  });
  data2.dstream().repartition(1).saveAsTextFiles("/home/yyl/data/top3","spark");
  ssc.start();
  ssc.awaitTermination();
  } 
  //2.处理数据,获得每天对某个广告点击超过N次的用户
    public static class JavaSparkSessionSingleton{
    private static transient SparkSession instance = null;
    public static SparkSession getInstance(SparkConf sparkConf) {
      if(instance == null) {
      instance = SparkSession.builder().config(sparkConf).getOrCreate();
      }
      return instance;
    }
    } 
    public static class Advert implements Serializable{
    private String userId;
    private String advertId;
    public String getUserId() {
      return userId;
    }
    public void setUserId(String userId) {
      this.userId = userId;
    }
    public String getAdvertId() {
      return advertId;
    }
    public void setAdvertId(String advertId) {
      this.advertId = advertId;
    }
    }
}
-=============================================================================================================
package thisterm;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public class filterblock {
  private static final Pattern SPACE = Pattern.compile(" ");
  public static void main(String[] args) throws IOException, InterruptedException {
  if (args.length < 2) {
    System.err.println("需要传入参数:主机名端口号");
    System.exit(1);
  }
  SparkConf sparkConf = new SparkConf().setAppName("JavaNetWorkWordCount").setMaster("local[2]");
  JavaStreamingContext scc = new JavaStreamingContext(sparkConf,Durations.seconds(10));
//   JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]"));
         SQLContext sqlContext = new SQLContext(scc.sparkContext());
  String url = "jdbc:mysql://localhost:3306/name";
        Properties connectionProperties = new Properties();
        connectionProperties.put("user","root");
        connectionProperties.put("password","123456");
        connectionProperties.put("driver","com.mysql.cj.jdbc.Driver");
  JavaReceiverInputDStream<String> lines = scc.socketTextStream(args[0],Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER);
  JavaDStream<String> words = lines.flatMap(f -> {
    return Arrays.asList(f).iterator();
  }); 
  JavaPairDStream<String,Integer> wordCounts = words.mapToPair(f -> new Tuple2<>(f,1)).reduceByKey((x,y) -> x + y);
  JavaPairDStream<String,Integer> wordCountfiltter=wordCounts.filter(f->f._2>2);//333 flume  1
  JavaDStream<String> wordC=wordCountfiltter.map(f->f._1.split(" ")[1]+" "+f._2);//flume  1
    JavaDStream<Row> personsRDD = wordC.map(new Function<String,Row>(){
            public Row call(String line) throws Exception {
                String[] splited = line.split(" ");
                return RowFactory.create(splited[0],Integer.valueOf(splited[1]));
            }
        });
         List structFields = new ArrayList();
         structFields.add(DataTypes.createStructField("bname",DataTypes.StringType,true));
         structFields.add(DataTypes.createStructField("number",DataTypes.IntegerType,true));
         StructType structType = DataTypes.createStructType(structFields);
         personsRDD.foreachRDD(f->{
           Dataset personsDF = sqlContext.createDataFrame(f,structType);
           personsDF.write().mode("append").jdbc(url,"blockname1",connectionProperties);
         });
         List<String> listblock=new ArrayList<String>();
         personsRDD.foreachRDD(f->{
         Dataset<Row> personsDF = sqlContext.createDataFrame(f,structType);
         Dataset<Row> readfile= sqlContext.read().jdbc(url,"blockname1",connectionProperties);
         JavaRDD<Row> stu=scc.sparkContext().parallelize(readfile.collectAsList());
         JavaRDD<String> rdd1=stu.map(f1->f1.toString().split(",")[0].substring(1));
         rdd1.foreach(f2->System.err.println(f2));
         List<String> list = rdd1.distinct().collect();
    //5.根据黑名单内容对获得的广告点击数据进行过滤,去掉黑名单中的用户
         listblock.addAll(list);
        // System.out.println(stu.toString());
        // readfile.show();
         });
         words.foreachRDD(f->{
          JavaPairRDD<String,String> rdd=f.mapToPair(s->new Tuple2<String,String>(s.split(" ")[0],s.split(" ")[1]));
          JavaPairRDD<String,String> rdd3 = rdd.filter(ff -> !(listblock.contains(ff._1)));
    //6.实时统计广告点击数 7.输出前三点击量的广告到文件
    JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f3 -> new Tuple2<String,Integer>(f3._2,1)).reduceByKey((x,y) -> x+y);
    JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f4 -> f4.swap()).sortByKey(false).mapToPair(f4 -> f4.swap());
    JavaPairRDD<String,Integer> rdd6 = scc.sparkContext().parallelizePairs(rdd5.take(3));
         });
  wordCountfiltter.print();
  scc.start();
  scc.awaitTermination();
  }
}

代码如下

第一个类

import java.io.Serializable;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public class heimingdan {
  public static class Advert implements Serializable{
  private String userid;
  public String getUserid() {
    return userid;
  }
  public void setUserid(String userid) {
    this.userid = userid;
  }
  public String getAdvertid() {
    return advertid;
  }
  public void setAdvertid(String advertid) {
    this.advertid = advertid;
  }
  private String advertid;
  }
  public static void main(String[] args) throws InterruptedException  {
  SparkConf sparkConf = new
  SparkConf().setAppName("Streaming").setMaster("local[2]");
  JavaStreamingContext ssc =new JavaStreamingContext(sparkConf,Durations.seconds(60));
  JavaReceiverInputDStream<String> lines = 
    ssc.socketTextStream(args[0], Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER);
  //鑾峰彇瀹炴椂鏁版嵁
  JavaPairDStream<String,String> data=lines.mapToPair(f->new Tuple2<>(f.split(",")[0],f.split(",")[1]));
  data.foreachRDD(rdd->{
    JavaRDD<Advert> adRDD =rdd.map(f->{
    Advert ad = new Advert();
    ad.setUserid(f._1);
    ad.setAdvertid(f._2);
    return ad;
    });
    SparkSession spark=
    JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
    Dataset<Row> words = spark.createDataFrame(adRDD, Advert.class);
    words.createOrReplaceTempView("words");
    Dataset<Row> result =
    spark.sql("select userid from(select userid,advertid,count(*)from words group by userid,advertid having count(*)>2)a");
    //澶勭悊鏁版嵁锛岃幏寰楁瘡澶╁鏌愪釜骞垮憡鏁拌秴杩�2娆$殑鐢ㄦ埛
    result.write().format("jdbc").option("url","jdbc:mysql://localhost:3306/studentInfo").option("driver","com.mysql.jdbc.Driver")
    .option("dbtable","hmd").option("user","root").option("password","123456").mode("append").save();
  });
  //灏嗗疄鏃朵骇鐢熺殑榛戝悕鍗曞瓨鍏ySQL鏁版嵁搴�
  JavaPairDStream<String,Integer> data2 = data.transformToPair(rdd->{
    SparkSession spark= JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
    Dataset<Row> jdbcDF = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/studentInfo")
      .option("driver","com.mysql.jdbc.Driver").option("dbtable","hmd").option("user","root").option("password","123456").load();
    JavaRDD<Row> stu = ssc.sparkContext().parallelize(jdbcDF.collectAsList());
    JavaRDD<String> rdd1 = stu.map(f->f.toString());
    List<String> list = rdd1.distinct().collect();
    //瀹炴椂浠嶮ySQL涓鍙栭粦鍚嶅崟
    JavaPairRDD<String,String> rdd3 =rdd.filter(f->!(list.contains(f)));//杩囨护鎺夐粦鍚嶅崟
    JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f->new Tuple2<>(f._2,1)).reduceByKey((x,y)->x+y);
    //瀹炴椂缁熻骞垮憡鐐瑰嚮鏁�
    JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f->f.swap()).sortByKey(false).mapToPair(f->f.swap());
    JavaPairRDD<String,Integer> rdd6 = ssc.sparkContext().parallelizePairs(rdd5.take(3));//杈撳嚭鍓嶄笁鐐瑰嚮閲忕殑骞垮憡鍒版枃浠�
    return rdd6;
  });
  data2.dstream().repartition(1).saveAsTextFiles("/home/gyq/妗岄潰/blacklist","spark");
  data2.print();
  ssc.start();
  ssc.awaitTermination();
  }
}


第二个类

import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
public class JavaSparkSessionSingleton {
  private static transient SparkSession instance = null;
  public static SparkSession getInstance(SparkConf sparkConf) {
  if(instance == null) {
    instance = SparkSession.builder().config(sparkConf).getOrCreate();
  }
  return instance;
  }
}


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
消息中间件 分布式计算 Kafka
195 Spark Streaming整合Kafka完成网站点击流实时统计
195 Spark Streaming整合Kafka完成网站点击流实时统计
73 0
|
30天前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
30 0
|
30天前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
27 0
|
存储 分布式计算 Java
JAVA Spark rdd使用Spark编程实现:统计出每个省份广 告被点击次数的TOP3
JAVA Spark rdd使用Spark编程实现:统计出每个省份广 告被点击次数的TOP3
140 0
|
分布式计算 Java Spark
求TOP3广告点击次数,java spark rdd pairrdd 键值对转化
求TOP3广告点击次数,java spark rdd pairrdd 键值对转化
|
分布式计算 Spark
教材P164操作题。编写Spark Steaming程序,使用leftOuterJoin操作及filter方法过滤掉黑名单的数据
教材P164操作题。编写Spark Steaming程序,使用leftOuterJoin操作及filter方法过滤掉黑名单的数据
|
分布式计算 流计算 Spark
Spark Streaming实时流处理项目实战笔记——实战之黑名单过滤
Spark Streaming实时流处理项目实战笔记——实战之黑名单过滤
Spark Streaming实时流处理项目实战笔记——实战之黑名单过滤
|
分布式计算 算法 调度
spark2.2以后版本任务调度将增加黑名单机制
spark2.2以后版本任务调度将增加黑名单机制
319 0
|
SQL JSON 分布式计算
【Spark】(task2)PySpark数据统计和分组聚合
1.2 保存读取的信息 步骤2:将读取的进行保存,表头也需要保存,这里可保存为csv或者json格式文件。
676 0
【Spark】(task2)PySpark数据统计和分组聚合
使用Spark Streaming SQL基于时间窗口进行数据统计
使用Spark Streaming SQL可以很方便的对事件数据中的时间字段进行处理,同时Spark Streaming SQL提供的时间窗口函数可以将事件时间按照一定的时间区间对数据进行统计操作。 本文通过讲解一个统计用户在过去5秒钟内点击网页次数的案例,介绍如何使用Spark Streaming SQL对事件时间进行操作。