DataFrame 介绍_创建_toDF | 学习笔记

简介: 快速学习 DataFrame 介绍_创建_toDF

开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段:课时名称】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/690/detail/12046


DataFrame 介绍_创建_toDF

 

内容介绍:

一、ETL 的三大步骤

二、Spark 代码编写的套路

三、DataFrame 如何创建

 

上章介绍了 DataFrame 是什么,本章深入介绍 DataFrame 首先要知道在一般情况下处理数据是否有纲领性的指引。

 

一、ETL 的三大步骤

一般处理数据都差不多是ETL这个步骤,此处的ETL不是狭义的ETL,大致模拟为先读再装载,ETL步骤中E代表抽取,T表示Transformation处理,转换。L即代表装载,落地,以上就代表ETL的三大步骤。Spark是怎么去处理数据的呢?代码如何编写的呢?

 

二、Spark代码编写的套路

第一步首先要将数据读出来,创建出来。之后就是通过 API 来进行数据处理,最后是进行数据落地,同时三个步骤不仅可以用于 DataFrame 也可以用于 DataSet 和 RDD。

1. 创造 DataFrame  DataSet RDD 制造或读取数据

2. 通过 DataFrame  DataSet RDD 的 API 来进行数据处理

3. 通过 DataFrame  DataSet RDD 来进行数据落地

以上就是整个的处理步骤。

 

三、DataFrame如何创建

了解 DataFrame 怎么去处理数据之后第一步首先要了解数据读取的过程 DataFrame如何创建、DataFrame 支持什么操作和 DataFrame 如何落地,三大核心操作,其中 DataFrame 如何落地在下章节有详细介绍不再赘述。

DataFrame 如何创建有三种模式:第一种 toDF,第二种 createDataFrame,第三种还有新增的读写框架 DataFrameReader,就是通过读取外部处理集来创建 DataFrame。

1. 通过隐式转换创建 DataFrame

这种方式本质上是使用 SparkSession 中的隐式转换来进行的

val spark: SparkSession = new sql. SparkSession .Builder ()

.appName ( "hello")

.master( "local[6]")

.getOrCreate()

//必须要导入隐式转换

//注意: spark 在此处不是包,而是 SparkSession 对象

import spark.implicits._

val peopleDF: DataFrame = Seq(People("zhangsan",15),People("lisi",15)) .toDF()

1669111367648.jpg

根据源码可以知道,toDF 方法可以在 RDD 和 seq 中使用

通过集合创建 DataFrame 的时候,集合中不仅可以包含样例类,也可以只有普通数据类型,后通过指定列名来创建

val spark: SparkSession = new sql.SparkSession.Builder ().appName( "hello")

.master( "local[6]").getOrCreate()

import spark.implicits._

val df1: DataFrame = Seqr"nihao" , "hello" ).toDF( "text" )

/*

+-------+

| text l

+--------+

| nihao|

| hello |

+--------+

*/

df1.show()

val df2:DataFrame = Seq (("a”,1)).tODF ( "word", "count")

/*

+- - - -+- - - - - +

|word | count|

+- - - -+- - - - - +

|   a |   1|

|   b |   1|

+- - - -+- - - - -+

*/

df2.show( )

2. IDAE上的示例

(1)打开 IDAE 根据以下写好的方法进行

@Test

def dataframe2(): unit = {

//第一步:创建 SparkSession

val spark = Sparksession.builder()

.appName( name ="dataframe1")

.master( master = "loca1[6]")

.getorcreate()

//第二步:导入隐式转换

import spark.implicits._

//第三步:创建一个 list,而 list 当中放的是 person

val personList = seq(Person(" zhangsan",15),Person("lisi",20))

}

}

case class Person( name: string, age: Int)

(2)之后在其基础上介绍三种创建模式:①toDF ②createDataFrame ③DataFrameReader 将其简称为 read

@Test

def dataframe2(): unit = {

//第一步:创建 SparkSession

val spark = Sparksession.builder()

.appName( name ="dataframe1")

.master( master = "loca1[6]")

.getorcreate()

//第二步:导入隐式转换

import spark.implicits._

//第三步:创建一个 list,而 list当中放的是 person

val personList = seq(Person(" zhangsan",15),Person("lisi",20))

//1.toDF ,可以作用于 PersonList,因而可以直接使用 seq 来调用 toDF

val df1 = personList.toDF()

//同时也可以将其作为一个转换作用于RDD上,spark.sparkcontext.parallelize(personList)创建一个 RDD

val df2 = spark.sparkcontext.parallelize(personList).toDF()

//2.createDataFrame

//3.DataFrameReader将其简称为 read

}

}

case class Person( name: string, age: Int)

(3)toDF 在何处定义

如果将以上代码的val df1 = personList.toDF()和val df2 = spark.sparkcontext.parallelize(personList).toDF()两行注释之后,import spark.implicits._在 IDAE 中会变为灰色,代表它没有在任何地方使用,而取消注释又会变为使用状态说明 toDF 是来自于这个导入的 spark.implicits._。

因为说 toDF 是一个映式转换,首先就需要知道 toDF 将 personlist 和 RDD 都转为了什么类型,按住 ctrl 点击 toDF 就可以进行查看,点入后可以发现 toDF(),toDF()都是在 DatasetHolder 进行定义的,所以只需要找到 DatasetHolder 是在何处进行隐式转换的即可。

Ctrl+C 复制 DatasetHolder 这个类名,因为隐式转换是在 implicits._处导入的,所以 ctrl 点入会发现如下代码:

@Experimental

@Interfacestability.Evolving

object implicits extends SQLImplicits with serializable {

protected override def _SQLcontext: sQLContext = SparkSession.this.sqlcontext

}

得知它给的是一个 SQLcontext,再点入 SQLcontext 大致浏览后如果并未找到隐式转换相关,就可以在 IDAE 中直接搜索 implicits 得到如下代码,可以发现其中有一个 implicits 的 object 刚好继承了 SQLImplicits。

@Experimental

@Interfacestability.Evolving

object implicits extends SQLImplicits with serializable {

protected override def _sqlcontext:sQLcontext = self

}

再次点入 SQLImplicits,因为 toDF 是在 DatasetHolder 中定义的,所以在其中直接搜索 DatasetHolder,发现是在以下代码中进行了隐式转换,当在代码导入 import spark.implicits._后,使用对应类型时,它会传入到隐式转换的方法中来,去对比是否是该类型,是这个类型则会直接创建一个 DatasetHolder。

implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]):DatasetHolder[T] ={

DatasetHolder(_sqlcontext.createDataset(rdd))

}

/**

*creates a [[Dataset]] from a local seq.

*since 1.6.0

*/

implicit def localseqToDatasetHolder[T : Encoder](s: seq[T])DatasetHolder[T] = {

DatasetHolder(_sqlcontext.createDataset(s))

}

以上就是整个 toDF 的流程,本章介绍了 toDF 在何处定义,同时查看了源码层面,同时源码无需完全掌握,知晓思路即可,掌握下回遇到同类情况,可以去看懂源码思路即可。

同时 Spark 的代码写的较好,在网上以及博客里搜集的资料大多不同,就可以采用看源码的形式。

相关文章
|
IDE 编译器 Linux
linux 编译 c或cpp 文件为动态库 so 文件(最简单直观的模板)
linux 编译 c或cpp 文件为动态库 so 文件(最简单直观的模板)
|
小程序 前端开发 PHP
PHP实现生成小程序二维码带参数进入指定页面、小程序URL scheme实现携带数据跳转小程序
PHP实现生成小程序二维码带参数进入指定页面、小程序URL scheme实现携带数据跳转小程序
432 0
|
6月前
|
监控 Android开发
【autojs版】哈罗抢单脚本,顺风车抢单辅助,全自动插件
这是一款基于Android无障碍服务开发的脚本工具,无需ROOT即可实现界面元素监控与事件模拟,适用于学习和参考。核心功能包括:通过图像识别检测订单气泡、控件监听逻辑、悬浮窗配置、动态列表渲染及状态提示UI。示例代码展示了如何使用无障碍服务监控订单列表,并通过悬浮窗进行参数配置与状态显示。仅供技术交流,请勿用于违规场景。
|
人工智能 云栖大会
AI Infra 核心技术专题 | 2024 云栖大会预热
AI Infra 核心技术专题 | 2024 云栖大会
|
存储 SQL 关系型数据库
(六)MySQL索引原理篇:深入数据库底层揭开索引机制的神秘面纱!
《索引原理篇》它现在终于来了!但对于索引原理及底层实现,相信大家多多少少都有了解过,毕竟这也是面试过程中出现次数较为频繁的一个技术点。在本文中就来一窥`MySQL`索引底层的神秘面纱!
918 7
|
数据处理 Apache 流计算
实时计算引擎 Flink:从入门到深入理解
本篇详细介绍了Apache Flink实时计算引擎的基本概念和核心功能。从入门到深入,逐步介绍了Flink的数据源与接收、数据转换与计算、窗口操作以及状态管理等方面的内容,并附带代码示例进行实际操作演示。通过阅读本文,读者可以建立起对Flink实时计算引擎的全面理解,为实际项目中的实时数据处理提供了有力的指导和实践基础。
5374 2
|
SQL 分布式计算 Java
Spark常见错误剖析与应对策略
Spark常见错误剖析与应对策略
1016 1
|
安全 算法 网络安全
什么是 SSL 加密?
【8月更文挑战第31天】
2229 0