【Spark】(八)Spark SQL 应用解析1

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 【Spark】(八)Spark SQL 应用解析1

文章目录


一、Spark SQL的进化之路


二、认识Spark SQL

2.1 什么是Spark SQL?

2.2 Spark SQL的作用

2.3 运行原理

2.4 特点

2.5 Spark SQL数据抽象


三、Spark SQL API

3.1 SparkSession


3.2 DataSet ( Spark1. 6 + )

1、创建 DataSet

2、使用case Class 创建 DataSet

3、使用DataSet完成Demo


3.3 DataFrame

1、创建 DataFrame

2、使用DataFrame完成Demo


四、Spark SQL 操作Hive表

4.1 文件配置

4.1 操作 Hive 表


五、Spark SQL 连 MySQL

六、Spark SQL 内置函数

七、Spark SQL 自定义函数


一、Spark SQL的进化之路


1.0以前: Shark


1.1:x开始:Spark SQL(只是测试性的) SQL


1.3.x:Spark SQL(正式版本)+Dataframe


1.5.x:Spark SQL 钨丝计划

b

1.6.x:Spark SQL+DataFrame+DataSet(测试版本)


x:Spark SQL+DataFrame+DataSet(正式版本)

Spark SQL:还有其他的优化 StructuredStreaming(DataSet)


二、认识Spark SQL


2.1 什么是Spark SQL?


spark SQL是spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。


image.png


2.2 Spark SQL的作用


提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎


DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD


2.3 运行原理


将 Spark SQL 转化为 RDD, 然后提交到集群执行


2.4 特点


(1)容易整合


(2)统一的数据访问方式


(3)兼容 Hive


(4)标准的数据连接


2.5 Spark SQL数据抽象


RDD (Spark1.0) -> DataFrame (Spark1.3) -> DataSet ( Spark1. 6)


  • Spark SQL提供了DataFrame和DataSet的数据抽象。
  • DataFrame就是RDD + Schema,可以认为是一-张二维表格。 他的劣势是在编译器不进行表格中的字段的类型检查。在运行期进行检查。
  • DataSet是 Spark最新的数据抽象, Spark的发 展会逐步将DataSet作为主要的数据抽象,弱化RDD和DataFrame。 DataSet包含了DataFr ame所有的优化机制。除此之外提供了以样例类为Schema模型的强类型。
  • DataFrame = DataSet [Row]
  • DataFrame和DataSet都有可控的内存管理机制,所有数据都保存在非堆上,都使用了catalyst进行SQL的优化。


image.png


三、Spark SQL API


3.1 SparkSession

  • SQLContext Spark SQL的编程入口
  • HiveContext的子集,包含更多的功能
  • SparkSession (Spark 2.x 推荐)
  • SparkSession:合并了SQLContext与HiveContext
  • 提供与Spark功能交互单一入口点,并允许使用DataFrame和DataSet API对Spark进行编程


SparkSession 是 Spark 2.0 引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。 在spark的早期版本中,SparkContext 是 spark 的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用 StreamingContext ;对于sql,使用 sqlContext ;对于Hive,使用 hiveContext 。但是随着 DataSet 和 DataFrame 的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API 的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。


SparkSession 实质上是 SQLContext 和 HiveContext 的组合(未来可能还会加上 StreamingContext),所以在 SQLContext 和 HiveContext 上可用的 API 在 SparkSession 上同样是可以使用的。SparkSession 内部封装了 SparkContext,所以计算实际上是由 SparkContext 完成的。


特点:


  • 为用户提供一个统一的切入点使用Spark 各项功能


  • 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序


  • 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互


  • 与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中


3.2 DataSet ( Spark1. 6 + )


image.png


1、创建 DataSet

createDataset()的参数可以是:Seq、Array、RDD
scala> spark.createDataset(1 to 3).show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+
scala> spark.createDataset(List(("a",1))).show
+---+---+
| _1| _2|
+---+---+
|  a|  1|
+---+---+


2、使用case Class 创建 DataSet


Scala中在class 关键字前面加上case关键字,这个类就可以成为样例类

scala> case class Userinfos(userid:Int,username:String)
defined class Userinfos
scala> case class Scores(sid:Int,userid:Int,score:Int)
defined class Scores
scala> val users = Seq(Userinfos(1,"zhangsan"),Userinfos(2,"lisi"),Userinfos(3,"Wangwu")).toDS
users: org.apache.spark.sql.Dataset[Userinfos] = [userid: int, username: string]
scala> val scs = Seq(Scores(1,1,100),Scores(2,1,30),Scores(3,2,60),Scores(4,3,35)).toDS
scs: org.apache.spark.sql.Dataset[Scores] = [sid: int, userid: int ... 1 more field]
scala> users.join(scs,users("userid")===scs("userid")).show
+------+--------+---+------+-----+
|userid|username|sid|userid|score|
+------+--------+---+------+-----+
|     1|zhangsan|  2|     1|   30|
|     1|zhangsan|  1|     1|  100|
|     2|    lisi|  3|     2|   60|
|     3|  Wangwu|  4|     3|   35|
+------+--------+---+------+-----+


image.png




case class Customers(custid:String,lname:String,fname:String,cardno:String,addr:String,area:String,city:String,language:String,score:String)
case class Orders(orderid:String,orddate:String,custid:String,status:String)
case class OrderItem(oiid:String,ordid:String,proid:String,buynum:String,sp:String,cp:String)
case class Products(proid:String,protype:String,proname:String,price:String,photo:String)
// 效率高 方法一
val custs = sc.textFile("hdfs://192.168.56.137:9000/20200106/customers.csv").map(str=>{
var e = str.replaceAll("\"","").split(",")
Customers(e(0),e(1),e(2),e(3),e(4),e(5),e(6),e(7),e(8))
})
scala> custs.toDS.show(1)
+------+-------+---------+---------+---------+------------------+-----------+--------+-----+
|custid|  lname|    fname|   cardno|     addr|              area|       city|language|score|
+------+-------+---------+---------+---------+------------------+-----------+--------+-----+
|     1|Richard|Hernandez|XXXXXXXXX|XXXXXXXXX|6303 Heather Plaza|Brownsville|      TX|78521|
+------+-------+---------+---------+---------+------------------+-----------+--------+-----+
// 方法二
val custs = sc.textFile("hdfs://192.168.56.137:9000/20200106/customers.csv").map(_.replaceAll("\"","").split(",")).map(x=>Customers(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8))).toDS.show(1)
val ords = sc.textFile("hdfs://192.168.56.137:9000/20200106/orders.csv").map(_.replaceAll("\"","").split(",")).map(x=>Orders(x(0),x(1),x(2),x(3))).toDS
val items = sc.textFile("hdfs://192.168.56.137:9000/20200106/order_items.csv").map(_.replaceAll("\"","").split(",")).map(x=>OrderItem(x(0),x(1),x(2),x(3),x(4),x(5))).toDS


3、使用DataSet完成Demo


  • 如上我们已经使用RDD装载业务数据
  • 定义样例类
  • 将RDD转换为DataSet
// 连接查询  谁的消费额最高 三表查询
val tabitem = items.groupBy("ordid").agg(sum($"cp").as("countPrice"))
val tabord = tabitem.join(ords,tabitem("ordid")===ords("orderid"))
tabord.show(2)
+-----+----------+-------+-------------------+------+---------------+           
|ordid|countPrice|orderid|            orddate|custid|         status|
+-----+----------+-------+-------------------+------+---------------+
|10096|    369.97|  10096|2013-09-25 00:00:00| 10503|PENDING_PAYMENT|
|10351|    299.98|  10351|2013-09-27 00:00:00|   723|        ON_HOLD|
+-----+----------+-------+-------------------+------+---------------+
tabord.where("ordid=1").show()
+-----+----------+-------------------+------+------+                            
|ordid|countPrice|            orddate|custid|status|
+-----+----------+-------------------+------+------+
|    1|    299.98|2013-07-25 00:00:00| 11599|CLOSED|
+-----+----------+-------------------+------+------+
tabord.orderBy(desc("countPrice")).show(3)
+-----+------------------+-------------------+------+--------+                  
|ordid|        countPrice|            orddate|custid|  status|
+-----+------------------+-------------------+------+--------+
|68703|3089.9500000000003|2013-08-16 00:00:00|  9515|COMPLETE|
|68724|           2609.93|2013-09-26 00:00:00|  1148|COMPLETE|
|68809|           2579.94|2014-03-12 00:00:00|  5946| ON_HOLD|
+-----+------------------+-------------------+------+--------+
tabord.orderBy(desc("countPrice")).limit(1).show
+-----+------------------+-------------------+------+--------+                  
|ordid|        countPrice|            orddate|custid|  status|
+-----+------------------+-------------------+------+--------+
|68703|3089.9500000000003|2013-08-16 00:00:00|  9515|COMPLETE|
+-----+------------------+-------------------+------+--------+                                          
val tabod=tabord.orderBy(desc("countPrice")).limit(1)
tabod.join(custs,tabod("custid")===custs("custid"))
.select(col("lname"),col("fname"),col("countPrice")).show
+--------+-----+------------------+                                             
|   lname|fname|        countPrice|
+--------+-----+------------------+
|Victoria|Smith|3089.9500000000003|
+--------+-----+------------------+
// 哪个产品销量最高 
val procounts =items.groupBy("proid")
.agg(sum($"buynum").as("countnum"))
.orderBy(desc("countnum")).limit(1)
procounts
.join(pros,procounts("proid")===pros("proid"))
.drop(pros("proid"))
.select(col("proid"),col("protype"),col("proname"),col("countnum")).show
+-----+-------+--------------------+--------+                                   
|proid|protype|             proname|countnum|
+-----+-------+--------------------+--------+
|  365|     17|Perfect Fitness P...| 73698.0|
+-----+-------+--------------------+--------+


3.3 DataFrame


image.png


在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。


image.png


1、创建 DataFrame

scala> val cus = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.137:9000/20200102/events.csv")
// 使用printSchema方法输出DataFrame的Schema信息
scala> cus.printSchema
// 表头有名字 直接注册一个临时表
scala> cus.registerTempTable("users")
// sql()方法执行SQL查询操作
scala> spark.sql("select * from users").show(2)
scala> spark.sql("select event_id,user_id from users").show(2)


// 表头没名字
scala> val cs = spark.read.format("csv").load("hdfs://192.168.56.137:9000/20200106/customers.csv")
scala> cs.printSchema
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
// 单个命名
scala> cs.withColumnRenamed("_c0","id")
// 多个命名
scala> val lookup = Map("_c0"->"id","_c1"->"lname")
scala> cs.select(cs.columns.map(c=>col(c).as(lookup.getOrElse(c,c))):_*)
scala> cs.select(cs.columns.map(c=>col(c).as(lookup.getOrElse(c,c))):_*).registerTempTable("abc")
scala> spark.sql("select id,lname from abc").show(2)
+---+-------+
| id|  lname|
+---+-------+
|  1|Richard|
|  2|   Mary|
+---+-------+


2、使用DataFrame完成Demo

val custs = spark.read.format("csv").load("hdfs://192.168.56.137:9000/20200106/customers.csv")
val ords = spark.read.format("csv").load("hdfs://192.168.56.137:9000/20200106/orders.csv")
val items = spark.read.format("csv").load("hdfs://192.168.56.137:9000/20200106/order_items.csv")
val pros = spark.read.format("csv").load("hdfs://192.168.56.137:9000/20200106/products.csv")
val lookup = Map("_c0"->"ordid","_c1"->"orddate","_c2"->"custid","_c3"->"status")
ords.select(ords.columns.map(c=>col(c).as(lookup.getOrElse(c,c))):_*).registerTempTable("ordstab")
spark.sql("select dayofweek(orddate) as week,count(ordid) as countnum from ordstab group by week")
+----+--------+                                                                 
|week|countnum|
+----+--------+
|   1|    9735|
|   6|   10288|
|   3|    9964|
|   5|    9862|
|   4|    9758|
|   7|    9984|
|   2|    9292|
+----+--------+
val lookup = Map("_c0"->"custid","_c1"->"lname","_c2"->"fname","_c3"->"cardno","_c4"->"addr","_c5"->"area","_c6"->"city","_c7"->"language","_c8"->"score")
custs.select(custs.columns.map(c=>col(c).as(lookup.getOrElse(c,c))):_*).registerTempTable("custstab")
val lookup = Map("_c0"->"oiid","_c1"->"ordid","_c2"->"proid","_c3"->"buynum","_c4"->"sp","_c5"->"cp")
items.select(items.columns.map(c=>col(c).as(lookup.getOrElse(c,c))):_*).registerTempTable("itemstab")
spark.sql("select c.city ,sum(t.sp) as total_purchasses from itemstab as t inner join ordstab as o on o.ordid=t.ordid inner join custstab as c on c.custid=o.custid group by c.city order by total_purchasses desc")


目录
相关文章
|
3天前
|
安全 网络协议 应用服务中间件
AJP Connector:深入解析及在Apache HTTP Server中的应用
【9月更文挑战第6天】在Java Web应用开发中,Tomcat作为广泛使用的Servlet容器,经常与Apache HTTP Server结合使用,以提供高效、稳定的Web服务。而AJP Connector(Apache JServ Protocol Connector)作为连接Tomcat和Apache HTTP Server的重要桥梁,扮演着至关重要的角色
22 2
|
1天前
|
PHP 开发者
PHP 7新特性深度解析与实践应用
【9月更文挑战第17天】本文将深入探讨PHP 7的新特性及其对开发者的实际影响,同时通过实例演示如何有效利用这些特性优化代码和提高性能。我们将从类型声明的增强开始,逐步深入到其他关键改进点,最后通过一个综合案例展示如何将这些新特性应用于日常开发中。
|
7天前
|
存储 负载均衡 Java
Jetty技术深度解析及其在Java中的实战应用
【9月更文挑战第3天】Jetty,作为一款开源的、轻量级、高性能的Java Web服务器和Servlet容器,自1995年问世以来,凭借其卓越的性能、灵活的配置和丰富的扩展功能,在Java Web应用开发中占据了举足轻重的地位。本文将详细介绍Jetty的背景、核心功能点以及在Java中的实战应用,帮助开发者更好地理解和利用Jetty构建高效、可靠的Web服务。
21 2
|
10天前
|
编译器 PHP 开发者
PHP 8新特性解析与应用实践
PHP 8作为PHP语言的最新版本,带来了许多令人兴奋的新特性和性能改进。本文将深入探讨PHP 8中的JIT编译器、联合类型、匹配表达式等关键更新,并通过实例演示如何在项目中有效利用这些新工具,帮助开发者提升代码质量和执行效率。
|
13天前
|
C# Android开发 开发者
Uno Platform 高级定制秘籍:深度解析与实践样式和模板应用,助你打造统一且高效的跨平台UI设计
【9月更文挑战第7天】Uno Platform 是一个强大的框架,支持使用 C# 和 XAML 创建跨平台 UI 应用,覆盖 Windows、iOS、Android、macOS 和 WebAssembly。本文介绍 Uno Platform 中样式和模板的应用,助力开发者提升界面一致性与开发效率。样式定义控件外观,如颜色和字体;模板则详细定制控件布局。通过 XAML 定义样式和模板,并可在资源字典中全局应用或嵌套扩展。合理利用样式和模板能简化代码、保持设计一致性和提高维护性,帮助开发者构建美观高效的跨平台应用。
26 1
|
5天前
|
监控 算法 数据可视化
深入解析Android应用开发中的高效内存管理策略在移动应用开发领域,Android平台因其开放性和灵活性备受开发者青睐。然而,随之而来的是内存管理的复杂性,这对开发者提出了更高的要求。高效的内存管理不仅能够提升应用的性能,还能有效避免因内存泄漏导致的应用崩溃。本文将探讨Android应用开发中的内存管理问题,并提供一系列实用的优化策略,帮助开发者打造更稳定、更高效的应用。
在Android开发中,内存管理是一个绕不开的话题。良好的内存管理机制不仅可以提高应用的运行效率,还能有效预防内存泄漏和过度消耗,从而延长电池寿命并提升用户体验。本文从Android内存管理的基本原理出发,详细讨论了几种常见的内存管理技巧,包括内存泄漏的检测与修复、内存分配与回收的优化方法,以及如何通过合理的编程习惯减少内存开销。通过对这些内容的阐述,旨在为Android开发者提供一套系统化的内存优化指南,助力开发出更加流畅稳定的应用。
17 0
|
5天前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
20 0
|
19天前
|
图形学 iOS开发 Android开发
从Unity开发到移动平台制胜攻略:全面解析iOS与Android应用发布流程,助你轻松掌握跨平台发布技巧,打造爆款手游不是梦——性能优化、广告集成与内购设置全包含
【8月更文挑战第31天】本书详细介绍了如何在Unity中设置项目以适应移动设备,涵盖性能优化、集成广告及内购功能等关键步骤。通过具体示例和代码片段,指导读者完成iOS和Android应用的打包与发布,确保应用顺利上线并获得成功。无论是性能调整还是平台特定的操作,本书均提供了全面的解决方案。
79 0
|
19天前
|
定位技术
|
19天前
|
存储 C# 关系型数据库
“云端融合:WPF应用无缝对接Azure与AWS——从Blob存储到RDS数据库,全面解析跨平台云服务集成的最佳实践”
【8月更文挑战第31天】本文探讨了如何将Windows Presentation Foundation(WPF)应用与Microsoft Azure和Amazon Web Services(AWS)两大主流云平台无缝集成。通过具体示例代码展示了如何利用Azure Blob Storage存储非结构化数据、Azure Cosmos DB进行分布式数据库操作;同时介绍了如何借助Amazon S3实现大规模数据存储及通过Amazon RDS简化数据库管理。这不仅提升了WPF应用的可扩展性和可用性,还降低了基础设施成本。
41 0

推荐镜像

更多