大数据Spark对SogouQ日志分析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云解析 DNS,旗舰版 1个月
简介: 大数据Spark对SogouQ日志分析

1 业务需求

使用搜狗实验室提供【用户查询日志(SogouQ)】数据,使用Spark框架,将数据封装到RDD中

进行业务数据处理分析。数据网址:http://www.sogou.com/labs/resource/q.php



  • 1)、数据介绍:搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索引擎部分网页
    查询需求及用户点击情况的网页查询日志数据集合。
  • 2)、数据格式
  1. 访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL
  2. 用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览
    器输入的不同查询对应同一个用户ID
  • 3)、数据下载:分为三个数据集,大小不一样
  1. 迷你版(样例数据, 376KB): http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.mini.zip
  2. 精简版(1天数据,63MB): http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.reduced.zip
  1. 完整版(1.9GB): http://www.sogou.com/labs/resource/ftp.php?dir=/Data/SogouQ/SogouQ.zip

针对SougoQ用户查询日志数据中不同字段,不同业务进行统计分析:

使用SparkContext读取日志数据,封装到RDD数据集中,调用Transformation函数和Action函数处理分析,灵活掌握Scala语言编程。

2 准备工作

在编程实现业务功能之前,首先考虑如何对【查询词】进行中文分词及将日志数据解析封装。

2.1 HanLP 中文分词

使用比较流行好用中文分词:HanLP,面向生产环境的自然语言处理工具包,HanLP 是由一

系列模型与算法组成的 Java 工具包,目标是普及自然语言处理在生产环境中的应用。

官方网站:http://www.hanlp.com/,添加Maven依赖

<!-- https://mvnrepository.com/artifact/com.hankcs/hanlp -->
<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.7.7</version>
</dependency>

演示范例:HanLP 入门案例,基本使用

import java.util
import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import com.hankcs.hanlp.tokenizer.StandardTokenizer
import scala.collection.JavaConverters._
/**
 * HanLP 入门案例,基本使用
 */
object HanLPTest {
  def main(args: Array[String]): Unit = {
    // 入门Demo
    val terms: util.List[Term] = HanLP.segment("杰克奥特曼全集视频")
    println(terms)
    println(terms.asScala.map(_.word.trim))
    // 标准分词
    val terms1: util.List[Term] = StandardTokenizer.segment("放假++端午++重阳")
    println(terms1)
    println(terms1.asScala.map(_.word.replaceAll("\\s+", "")))
    val words: Array[String] =
      """00:00:00 2982199073774412 [360安全卫
士] 8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html"""
        .split("\\s+")
    words.foreach(println)
    println(words(2).replaceAll("\\[|\\]", ""))
  }
}

2.2 样例类 SogouRecord

将每行日志数据封装到CaseClass样例类SogouRecord中,方便后续处理:

/**
 * 用户搜索点击网页记录Record
 *
 * @param queryTime  访问时间,格式为:HH:mm:ss
 * @param userId     用户ID
 * @param queryWords 查询词
 * @param resultRank 该URL在返回结果中的排名
 * @param clickRank  用户点击的顺序号
 * @param clickUrl   用户点击的URL
 */
case class SogouRecord(
                        queryTime: String, //
                        userId: String, //
                        queryWords: String, //
                        resultRank: Int, //
                        clickRank: Int, //
                        clickUrl: String //
                      )

3 业务实现

先读取数据,封装到SougoRecord类中,再按照业务处理数据。

3.1 读取数据

构建SparkContext实例对象,读取本次SogouQ.sample数据,封装到SougoRecord中 。

// TODO: 1. 本地读取SogouQ用户查询日志数据
//val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.sample")
val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.reduced")
//println(s"Count = ${rawLogsRDD.count()}")
// TODO: 2. 解析数据,封装到CaseClass样例类中
val recordsRDD: RDD[SogouRecord] = rawLogsRDD
  // 过滤不合法数据,如null,分割后长度不等于6
  .filter(log => null != log && log.trim.split("\\s+").length == 6)
  // 对每个分区中数据进行解析,封装到SogouRecord
  .mapPartitions { iter =>
    iter.map { log =>
      val arr: Array[String] = log.trim.split("\\s+")
      SogouRecord(
        arr(0), arr(1), arr(2).replaceAll("\\[|\\]", ""), //
        arr(3).toInt, arr(4).toInt, arr(5) //
      )
    }
  }
println(s"Count = ${recordsRDD.count()}, First = ${recordsRDD.first()}")

3.2 搜索关键词统计

获取用户【查询词】,使用HanLP进行分词,按照单词分组聚合统计出现次数,类似WordCount

程序,具体代码如下:

// =================== 3.1 搜索关键词统计 ===================
// a. 获取搜索词,进行中文分词
val wordsRDD: RDD[String] = recordsRDD.mapPartitions { iter =>
  iter.flatMap { record =>
    // 使用HanLP中文分词库进行分词
    val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)
    // 将Java中集合对转换为Scala中集合对象
    import scala.collection.JavaConverters._
    terms.asScala.map(term => term.word)
  }
}
//println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}")
// b. 统计搜索词出现次数,获取次数最多Top10
val top10SearchWords: Array[(Int, String)] = wordsRDD
  .map(word => (word, 1)) // 每个单词出现一次
  .reduceByKey((tmp, item) => tmp + item) // 分组统计次数
  .map(tuple => tuple.swap)
  .sortByKey(ascending = false) // 词频降序排序
  .take(10) // 获取前10个搜索词
top10SearchWords.foreach(println)

运行结果如下,仅仅显示搜索最多关键词,其中需要过滤谓词:

3.3 用户搜索点击统计

统计出每个用户每个搜索词点击网页的次数,可以作为搜索引擎搜索效果评价指标。先按照用

户ID分组,再按照【查询词】分组,最后统计次数,求取最大次数、最小次数及平均次数。

// =================== 3.2 用户搜索点击次数统计 ===================
/*
每个用户在搜索引擎输入关键词以后,统计点击网页数目,反应搜索引擎准确度
先按照用户ID分组,再按照搜索词分组,统计出每个用户每个搜索词点击网页个数
*/
val clickCountRDD: RDD[((String, String), Int)] = recordsRDD
  .map { record =>
    // 获取用户ID和搜索词
    val key = record.userId -> record.queryWords
    (key, 1)
  }
  // 按照用户ID和搜索词组合的Key分组聚合
  .reduceByKey((tmp, item) => tmp + item)
clickCountRDD
  .sortBy(tuple => tuple._2, ascending = false)
  .take(10).foreach(println)
println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}")
println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}")
println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")

程序运行结果如下:

3.4 搜索时间段统计

按照【访问时间】字段获取【小时】,分组统计各个小时段用户查询搜索的数量,进一步观察

用户喜欢在哪些时间段上网,使用搜狗引擎搜索,代码如下:

// =================== 3.3 搜索时间段统计 ===================
/*
从搜索时间字段获取小时,统计个小时搜索次数
*/
val hourSearchRDD: RDD[(String, Int)] = recordsRDD
  // 提取小时
  .map { record =>
    // 03:12:50
    record.queryTime.substring(0, 2)
  }
  // 分组聚合
  .map(word => (word, 1)) // 每个单词出现一次
  .reduceByKey((tmp, item) => tmp + item) // 分组统计次数
  .sortBy(tuple => tuple._2, ascending = false)
hourSearchRDD.foreach(println)

程序运行结果如下:

3.5 完整代码

业务实现完整代码SogouQueryAnalysis如下所示:、

import java.util
import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
 * 用户查询日志(SogouQ)分析,数据来源Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。
 * 1. 搜索关键词统计,使用HanLP中文分词
 * 2. 用户搜索次数统计
 * 3. 搜索时间段统计
 * 数据格式:
 * 访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL
 * 其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对
 * 应同一个用户ID
 */
object SogouQueryAnalysis {
  def main(args: Array[String]): Unit = {
    // 构建SparkContext上下文实例对象
    val sc: SparkContext = {
      // a. 创建SparkConf对象,设置应用配置信息
      val sparkConf = new SparkConf()
        .setMaster("local[2]")
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
      // b. 创建SparkContext, 有就获取,没有就创建,建议使用
      val context = SparkContext.getOrCreate(sparkConf)
      // c. 返回对象
      context
    }
    sc.setLogLevel("WARN")
    // TODO: 1. 本地读取SogouQ用户查询日志数据
    //val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.sample")
    val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.reduced")
    //println(s"Count = ${rawLogsRDD.count()}")
    // TODO: 2. 解析数据,封装到CaseClass样例类中
    val recordsRDD: RDD[SogouRecord] = rawLogsRDD
      // 过滤不合法数据,如null,分割后长度不等于6
      .filter(log => null != log && log.trim.split("\\s+").length == 6)
      // 对每个分区中数据进行解析,封装到SogouRecord
      .mapPartitions { iter =>
        iter.map { log =>
          val arr: Array[String] = log.trim.split("\\s+")
          SogouRecord(
            arr(0), arr(1), arr(2).replaceAll("\\[|\\]", ""), //
            arr(3).toInt, arr(4).toInt, arr(5) //
          )
        }
      }
    println(s"Count = ${recordsRDD.count()}, First = ${recordsRDD.first()}")
    // 数据使用多次,进行缓存操作,使用count触发
    recordsRDD.persist(StorageLevel.MEMORY_AND_DISK).count()
    // TODO: 3. 依据需求统计分析
    /*
    1. 搜索关键词统计,使用HanLP中文分词
    2. 用户搜索次数统计
    3. 搜索时间段统计
    */
    // =================== 3.1 搜索关键词统计 ===================
    // a. 获取搜索词,进行中文分词
    val wordsRDD: RDD[String] = recordsRDD.mapPartitions { iter =>
      iter.flatMap { record =>
        // 使用HanLP中文分词库进行分词
        val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)
        // 将Java中集合对转换为Scala中集合对象
        import scala.collection.JavaConverters._
        terms.asScala.map(term => term.word)
      }
    }
    //println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}")
    // b. 统计搜索词出现次数,获取次数最多Top10
    val top10SearchWords: Array[(Int, String)] = wordsRDD
      .map(word => (word, 1)) // 每个单词出现一次
      .reduceByKey((tmp, item) => tmp + item) // 分组统计次数
      .map(tuple => tuple.swap)
      .sortByKey(ascending = false) // 词频降序排序
      .take(10) // 获取前10个搜索词
    top10SearchWords.foreach(println)
    // =================== 3.2 用户搜索点击次数统计 ===================
    /*
    每个用户在搜索引擎输入关键词以后,统计点击网页数目,反应搜索引擎准确度
    先按照用户ID分组,再按照搜索词分组,统计出每个用户每个搜索词点击网页个数
    */
    val clickCountRDD: RDD[((String, String), Int)] = recordsRDD
      .map { record =>
        // 获取用户ID和搜索词
        val key = record.userId -> record.queryWords
        (key, 1)
      }
      // 按照用户ID和搜索词组合的Key分组聚合
      .reduceByKey((tmp, item) => tmp + item)
    clickCountRDD
      .sortBy(tuple => tuple._2, ascending = false)
      .take(10).foreach(println)
    println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}")
    println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}")
    println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")
    // =================== 3.3 搜索时间段统计 ===================
    /*
    从搜索时间字段获取小时,统计个小时搜索次数
    */
    val hourSearchRDD: RDD[(String, Int)] = recordsRDD
      // 提取小时
      .map { record =>
        // 03:12:50
        record.queryTime.substring(0, 2)
      }
      // 分组聚合
      .map(word => (word, 1)) // 每个单词出现一次
      .reduceByKey((tmp, item) => tmp + item) // 分组统计次数
      .sortBy(tuple => tuple._2, ascending = false)
    hourSearchRDD.foreach(println)
    // 释放缓存数据
    recordsRDD.unpersist()
    // 应用结束,关闭资源
    sc.stop()
  }
}
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
108 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
61 6
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
96 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
70 1
|
1月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
62 1
|
2月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
51 1
|
2月前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
103 0
|
24天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
196 7
|
24天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
39 2