Spark(九) -- SparkSQL API编程

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45957991 本文测试的Spark版本是1.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45957991

本文测试的Spark版本是1.3.1

Text文本文件测试

一个简单的person.txt文件内容为:

JChubby,13
Looky,14
LL,15

分别是Name和Age

在Idea中新建Object,原始代码如下:

object  TextFile{
    def main(args:Array[String]){

    }
}

SparkSQL编程模型:

第一步:
需要一个SQLContext对象,该对象是SparkSQL操作的入口
而构建一个SQLContext对象需要一个SparkContext

第二步:
构建好入口对象之后,要引入隐式转换的方法,作用是将读取到的各种文件转换成DataFrame,DataFrame是SparkSQL上进行统一操作的数据类型

第三步:
根据数据的格式,构建一个样例类。作用是提供将读取到的各种各样的数据类型隐式转换成一个统一的数据格式,方便编程

第四步:
使用SQLContext对象读取文件,并将其转换成DataFrame

第五步:
对数据进行相关操作。
1.DataFrame自带的操作方式。DataFrame提供了很多操作数据的方法,如where,select等

2.DSL方式。DSL其实使用的也是DataFrame提供的方法,但是在操作属性时可以方便的使用’ + 属性名的方式进行操作

3.将数据注册成表,通过SQL语句操作

object  TextFile{
    def main(args:Array[String]){
        //第一步
        //构建SparkContext对象,主要要使用new调用构造方法,否则就变成使用样例类的Apply方法了
        val sc = new SparkContext()
        //构建SQLContext对象
        val sqlContext = new SQLContext(sc)

        //第二步
        import sqlContext.implicits._
        //第三步
        case Person(name:String,age:Int)

        //第四步,textFile从指定路径读取文件如果是集群模式要写hdfs文件地址;通过两个map操作将读取到的文件转换成Person类的对象,每一行对应一个Person对象;toDF将其转换成DataFrame
        val people = sc.textFile("文件路径").map(_.split(",")).map{case (name,age) => Person(name,age.toInt)}.toDF()
        //第五步
        //DataFrame方法
        println("------------------------DataFrame------------------------------------")
        //赛选出age>10的记录,然后只选择name属性,show方法将其输出
        people.where(people("age") > 10).select(people("name")).show()

        //DSL
         println("---------------------------DSL---------------------------------")
         people.where('age > 10).select('name).show()

        //SQL
        println("-----------------------------SQL-------------------------------")
        //将people注册成people表
        people.registerTempTable("people")
        //使用sqlContext的sql方法来写SQL语句
        //查询返回的是RDD,所以对其进行collect操作,之后循环打印
        sqlContext.sql("select name from people where age > 10").collect.foreach(println)

        //保存为parquet文件,之后的parquet演示会用到
        people.saveAsParquet("保存的路径")
    }
}

parquet格式文件测试:

val sc = new SparkContext()
    val sql = new SQLContext(sc)
    import sql.implicits._
    val parquet = sql.parquetFile(args(0))
    println("------------------------DataFrame------------------------------------")
    println(parquet.where(parquet("age") > 10).select(parquet("name")).show())

    println("---------------------------DSL---------------------------------")
    println(parquet.where('age > 10).select('name).show())

    println("-----------------------------SQL-------------------------------")
    parquet.registerTempTable("parquet")
    sql.sql("select name from parquet where age > 10").map(p => "name:" + p(0)).collect().foreach(println)

Json格式测试:

val sc = new SparkContext()
    val sql = new SQLContext(sc)
    import sql.implicits._
    val json = sql.jsonFile(args(0))
    println("------------------------DataFrame------------------------------------")
    println(json.where(json("age") > 10).select(json("name")).show())

    println("---------------------------DSL---------------------------------")
    println(json.where('age > 10).select('name).show())

    println("-----------------------------SQL-------------------------------")
    json.registerTempTable("json")
    sql.sql("select name from json where age > 10").map(p => "name:" + p(0)).collect().foreach(println)

可以看到上面的代码几乎和读取文本文件的一模一样,只不顾sc在读取文件的时候使用了parquetFile/jsonFile方法,而之后的操作是一摸一样的
由于parquet和json数据读取进来就是一个可操作的格式并且会自动转换成DataFrame,所以省去了case class的定义步骤和toDF的操作

以上为SparkSQL API的简单使用

相关文章
|
27天前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
37 1
|
27天前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
33 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
27天前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
34 0
|
27天前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
66 0
|
27天前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
27 0
|
27天前
|
存储 SQL 分布式计算
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
36 0
|
27天前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
39 0
|
27天前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
35 0
|
27天前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
29 0
|
27天前
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
38 0