Note_Spark_Day08:Spark SQL(Dataset是什么、外部数据源、UDF定义和分布式SQL引擎)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
全局流量管理 GTM,标准版 1个月
简介: Note_Spark_Day08:Spark SQL(Dataset是什么、外部数据源、UDF定义和分布式SQL引擎)

Spark Day08:Spark SQL

01-[了解]-昨日课程内容回顾

上次课程主要讲解3个方面内容:SparkSQL模块概述、DataFrame数据集及综合案例分析。

1、SparkSQL 模块概述
  - 发展史【前世今生】
    Shark -> SparkSQL(1.0) -> DataFrame(1.3) -> Dataset(1.6) -> Dataset/DataFrame(2.0)
    Spark2.0中SparkSQL模块
      不仅可以处理离线数据(批处理),还可以处理流式数据(流计算)
        spark.read 批处理
        spark.readStream 流计算
      将SparkSQL可以处理流式数据功能,单独提出来,称为:StructuredStreaming结构化流
    Spark2.2 版本
      StructuredStreaming 发布Release版本
  - 官方定义:
    Spark框架模块,针对结构化数据处理模块
    - Module,Structure结构化数据
    - DataFrame,数据结构,底层还是RDD,加上Schema约束
    - SQL 分析引擎,可以类似Hive框架,解析SQL,转换为RDD操作
  - 4个特性
    易用性、多数据源、JDBC/ODBC方式、与Hive集成
2、DataFrame 是什么
  - 基于RDD之上分布式数据集,并且Schema信息,Schema就是数据内部结果,包含字段名称和字段类型
    RDD[Person] 与 DataFrame比较
    DataFrame知道数据内部结构,在计算数据之前,可以有针对性进行优化,提升性能
  - DataFrame = RDD[Row] + Schema + 优化
    来源Python中Pandas数据结构或R语言数据类型
  - RDD 转换DataFrame方式
    第一种:RDD[CaseClass]直接转换DataFrame
    第二种:RDD[Row] + Schema
    toDF函数,指定列名称,前提条件:RDD中数据类型为元组类型,或者Seq序列中数据类型为元组
3、电影评分统计分析【使用DataFrame封装】
  - SparkSQL中数据分析2种方式:
    方式一:SQL编程
      类似Hive中SQL语句
    方式二:DSL编程
      调用DataFrame中函数,包含类似RDD转换函数和类似SQL关键词函数
  - 案例分析
    - step1、加载文本数据为RDD
    - step2、通过toDF函数转换为DataFrame
    - step3、编写SQL分析
      先注册DataFrame为临时视图、再编写SQL执行
    - step4、编写DSL分析
      groupBy、agg、filter、sortBy、limit
      导入函数库:import org.apache.spark.sql.functions._
    - step5、保存结果数据
      先保存到MySQL表中
      再保存到CSV文件
    无论是编写DSL还是SQL,性能都是一样的,注意调整参数:Shuffle是分区数目
      spark.sql.shuffle.partitions=200
      Spark 3.0无需调整

02-[了解]-今日课程内容提纲

主要讲解4个方面内容:Dataset是什么、外部数据源、UDF定义和分布式SQL引擎

1、Dataset 数据结构
  Dataset = RDD[T] + Schema,可以外部数据类型、也可以知道内部数据结构
  以特殊编码存储数据,比RDD数据结构存储更加节省空间
  RDD、DataFrame和Dataset区别与联系
2、外部数据源
  如何加载和保存数据,编程模块
  保存数据时,保存模式
  内部支持外部数据源
  自定义外部数据源,实现HBase,直接使用,简易版本
  集成Hive,从Hive表读取数据分析,也可以将数据保存到Hive表,企业中使用最多
    使用Hive框架进行数据管理,使用SparkSQL分析处理数据
3、自定义UDF函数
  2种方式,分别在SQL中使用和在DSL中使用
4、分布式SQL引擎
  此部分内容,与Hive框架功能一直
  spark-sql 命令行,专门提供编写SQL语句
    类似Hive框架种hive
  SparkSQL ThriftServer当做一个服务运行,使用JDBC/ODBC方式连接,发送SQL语句执行
    类似HiveServer2服务
    - jdbc 代码
    - beeline命令行,编写SQL

03-[掌握]-Dataset 是什么

Dataset是在Spark1.6中添加的新的接口,是DataFrame API的一个扩展,是Spark最新的数据抽象,结合了RDD和DataFrame的优点

Dataset = RDD + Schema

Dataset是一个强类型的特定领域的对象,这种对象可以函数式或者关系操作并行地转换。

从Spark 2.0开始,DataFrame与Dataset合并,每个Dataset也有一个被称为一个DataFrame的类型化视图,这种DataFrame是Row类型的Dataset,即Dataset[Row]。

针对Dataset数据结构来说,可以简单的从如下四个要点记忆与理解:

Spark 框架从最初的数据结构RDD、到SparkSQL中针对结构化数据封装的数据结构DataFrame,

最终使用Dataset数据集进行封装,发展流程如下。

所以在实际项目中建议使用Dataset进行数据封装,数据分析性能和数据存储更加好。

针对RDD、DataFrame与Dataset三者编程比较来说,Dataset API无论语法错误和分析错误在编译时都能发现,然而RDD和DataFrame有的需要在运行时才能发现。

此外RDD与Dataset相比较而言,由于Dataset数据使用特殊编码,所以在存储数据时更加节省内存。

由于Dataset数据结构,是一个强类型分布式集合,并且采用特殊方式对数据进行编码,所以与DataFrame相比,编译时发现语法错误和分析错误,以及缓存数据时比RDD更加节省空间。

package cn.itcast.spark.ds
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
 * 采用反射的方式将RDD转换为Dataset
 */
object _01SparkDatasetTest {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象,设置应用名称和master
    val spark: SparkSession = SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[3]")
        .getOrCreate()
    import spark.implicits._
    // 1. 加载电影评分数据,封装数据结构RDD
    val rawRatingRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-100k/u.data")
    // 2. 将RDD数据类型转化为 MovieRating
    /*
      将原始RDD中每行数据(电影评分数据)封装到CaseClass样例类中
     */
    val ratingRDD: RDD[MovieRating] = rawRatingRDD.mapPartitions { iter =>
      iter.map { line =>
        // 按照制表符分割
        val arr: Array[String] = line.trim.split("\\t")
        // 封装样例对象
        MovieRating(
          arr(0), arr(1), arr(2).toDouble, arr(3).toLong
        )
      }
    }
    // TODO: 3. 将RDD转换为Dataset,可以通过隐式转, 要求RDD数据类型必须是CaseClass
    val ratingDS: Dataset[MovieRating] = ratingRDD.toDS()
    ratingDS.printSchema()
    ratingDS.show(10, truncate = false)
    /*
      Dataset 从Spark1.6提出
        Dataset = RDD + Schema
        DataFrame = RDD[Row] + Schema
        Dataset[Row] = DataFrame
     */
    // 从Dataset中获取RDD
    val rdd: RDD[MovieRating] = ratingDS.rdd
    val schema: StructType = ratingDS.schema
    // 从Dataset中获取DataFrame
    val ratingDF: DataFrame = ratingDS.toDF()
    // 给DataFrame加上强类型(CaseClass)就是Dataset
    /*
      DataFrame中字段名称与CaseClass中字段名称一致
     */
    val dataset: Dataset[MovieRating] = ratingDF.as[MovieRating]
    // 应用结束,关闭资源
    spark.stop()
  }
}

04-[掌握]-RDD、DS和DF之间转换

实际项目开发,常常需要对RDD、DataFrame及Dataset之间相互转换,其中要点就是Schema约束结构信息。

范例演示:分别读取people.txt文件数据封装到RDD、DataFrame及Dataset,查看区别及相互转换。

[root@node1 ~]# /export/server/spark/bin/spark-shell --master local[2]
21/04/27 09:12:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://node1.itcast.cn:4040
Spark context available as 'sc' (master = local[2], app id = local-1619485981944).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_241)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val rdd = sc.textFile("/datas/resources/people.txt")
rdd: org.apache.spark.rdd.RDD[String] = /datas/resources/people.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> 
scala> val dataframe = spark.read.text("/datas/resources/people.txt")
dataframe: org.apache.spark.sql.DataFrame = [value: string]
scala> 
scala> val dataset = spark.read.textFile("/datas/resources/people.txt")
dataset: org.apache.spark.sql.Dataset[String] = [value: string]
scala> 
scala> dataframe.rdd
res0: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at rdd at <console>:26
scala> dataset.rdd
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at rdd at <console>:26
scala> 
scala> dataset.toDF()
res2: org.apache.spark.sql.DataFrame = [value: string]
scala> dataframe.as[String]
res3: org.apache.spark.sql.Dataset[String] = [value: string]

读取Json数据,封装到DataFrame中,指定CaseClass,转换为Dataset

scala> val empDF = spark.read.json("/datas/resources/employees.json")
empDF: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]          
scala> 
scala> empDF.show()
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+
scala> 
scala> case class Emp(name: String, salary: Long)
defined class Emp
scala> 
scala> val empDS = empDF.as[Emp]
empDS: org.apache.spark.sql.Dataset[Emp] = [name: string, salary: bigint]
scala> empDS.printSchema()
root
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)
scala> empDS.show()
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

05-[掌握]-外部数据源之加载load和保存save数据

在SparkSQL模块,提供一套完成API接口,用于方便读写外部数据源的的数据(从Spark 1.4版本提供),框架本身内置外部数据源:

SparkSQL提供一套通用外部数据源接口,方便用户从数据源加载和保存数据,例如从MySQL表中既可以加载读取数据:load/read,又可以保存写入数据:save/write

  • Load 加载数据

在SparkSQL中读取数据使用SparkSession读取,并且封装到数据结构Dataset/DataFrame中。

DataFrameReader专门用于加载load读取外部数据源的数据,基本格式如下:

SparkSQL模块本身自带支持读取外部数据源的数据:

  • Save 保存数据

SparkSQL模块中可以从某个外部数据源读取数据,就能向某个外部数据源保存数据,提供相应接口,通过DataFrameWrite类将数据进行保存

与DataFrameReader类似,提供一套规则,将数据Dataset保存,基本格式如下:

SparkSQL模块内部支持保存数据源如下:

当将结果数据DataFrame/Dataset保存至Hive表中时,可以设置分区partition和分桶bucket,形式如下:

可以发现,SparkSQL模块中内置数据源中,并且对HBase表数据读取和写入支持,但是可以自己实现外部数据源接口,方便读写数据。

06-[了解]-外部数据源之案例演示及应用场景

scala> val peopleDF = spark.read.json("/datas/resources/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> peopleDF.show()
+----+-------+                                                                  
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
scala> 
scala> val resultDF = peopleDF.select("name", "age")
resultDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> 
scala> resultDF.show()
+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+
scala> 
scala> resultDF.write.parquet("/datas/people-parquet")
scala> spark.read.parquet("/datas/people-parquet/part-00000-a967d124-52d8-4ffe-91c6-59aebfed22b0-c000.snappy.parquet")
res11: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> res11.show(
     | )
+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+

查看HDFS文件系统目录,数据已保存值parquet文件,并且使用snappy压缩。

07-[掌握]-外部数据源之保存模式SaveMode

当将DataFrame或Dataset数据保存时,默认情况下,如果存在,会抛出异常。

DataFrameWriter中有一个mode方法指定模式:

通过源码发现SaveMode时枚举类,使用Java语言编写,如下四种保存模式:

⚫ 第一种:Append 追加模式,当数据存在时,继续追加;
⚫ 第二种:Overwrite 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;
⚫ 第三种:ErrorIfExists 存在及报错;
⚫ 第四种:Ignore 忽略,数据存在时不做任何操作;

由于保存DataFrame时,需要合理设置保存模式,使得将数据保存数据库时,存在一定问题的。

  • Append追加模式:
  • 数据重复,最明显错误就是:主键已经存在
  • Overwrite覆盖模式:
  • 将原来的数据删除,对于实际项目来说,以前分析结果也是需要的,不允许删除

08-[掌握]-外部数据源之案例演示(parquet、text和json)

SparkSQL模块中默认读取数据文件格式就是parquet列式存储数据,通过参数【spark.sql.sources.default】设置,默认值为【parquet】。

范例演示代码:直接load加载parquet数据和指定parquet格式加载数据。

// 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .config("spark.sql.shuffle.partitions", "4")
      .getOrCreate()
    import spark.implicits._
    // TODO 1. parquet列式存储数据
    // format方式加载
    //val df1 = spark.read.format("parquet").load("datas/resources/users.parquet")
    val df1: DataFrame = spark.read
      .format("parquet")
      .option("path", "datas/resources/users.parquet")
      .load()
    df1.printSchema()
    df1.show(10, truncate = false)
    // parquet方式加载
    val df2: DataFrame = spark.read.parquet("datas/resources/users.parquet")
    df2.show(10, truncate = false)
    // load方式加载,在SparkSQL中,当加载读取文件数据时,如果不指定格式,默认是parquet格式数据
    val df3: DataFrame = spark.read.load("datas/resources/users.parquet")
    df3.show(10, truncate = false)

SparkSession加载文本文件数据,提供两种方法,返回值分别为DataFrame和Dataset

无论是text方法还是textFile方法读取文本数据时,一行一行的加载数据,每行数据使用UTF-8编码的字符串,列名称为【value】。

// TODO: 2. 文本数据加载,text -> DataFrame   textFile -> Dataset
    // 无论是 text 还是 textFile 加载文本数据时,字段名称:value, 类型String
    val peopleDF: DataFrame = spark.read.text("datas/resources/people.txt")
    peopleDF.show(10, truncate = false)
    val peopleDS: Dataset[String] = spark.read.textFile("datas/resources/people.txt")
    peopleDS.show(10, truncate = false)

读取JSON格式文本数据,往往有2种方式:

  • 方式一:直接指定数据源为json,加载数据,自动生成Schema信息
spark.read.json("")
  • 方式二:以文本文件方式加载,然后使用函数(get_json_object)提取JSON中字段值
val dataset = spark.read.textFile("")
dataset.select(
  get_json_object($"value", "$.name")
)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Z6rA4Zfq-1627175964710)(/img/image-20210427101740141.png)]

/* =========================================================================== */
    // TODO: 3. 读取JSON格式数据,自动解析,生成Schema信息
    val empDF: DataFrame = spark.read.json("datas/resources/employees.json")
    empDF.printSchema()
    empDF.show(10, truncate = false)
    /* =========================================================================== */
    // TODO: 实际开发中,针对JSON格式文本数据,直接使用text/textFile读取,然后解析提取其中字段信息
    /*
      {"name":"Andy", "salary":30}   - value: String
            | 解析JSON格式,提取字段
        name: String, -> Andy
        salary : Int, -> 30
     */
    val dataframe: Dataset[String] = spark.read.textFile("datas/resources/employees.json")
    // 对JSON格式字符串,SparkSQL提供函数:get_json_object, def get_json_object(e: Column, path: String): Column
    import org.apache.spark.sql.functions.get_json_object
    val df = dataframe
      .select(
        get_json_object($"value", "$.name").as("name"),
        get_json_object($"value", "$.salary").cast(IntegerType).as("salary")
      )
    df.printSchema()
    df.show(10, truncate = false)

09-[掌握]-外部数据源之案例演示(csv和jdbc)

关于CSV/TSV格式数据说明:

SparkSQL中读取CSV格式数据,可以设置一些选项,重点选项:

// TODO: 1. CSV 格式数据文本文件数据 -> 依据 CSV文件首行是否是列名称,决定读取数据方式不一样的
    /*
      CSV 格式数据:
        每行数据各个字段使用逗号隔开
        也可以指的是,每行数据各个字段使用 单一 分割符 隔开数据
     */
    // 方式一:首行是列名称,数据文件u.dat
    val dataframe: DataFrame = spark.read
      .format("csv")
      .option("sep", "\\t")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("datas/ml-100k/u.dat")
    dataframe.printSchema()
    dataframe.show(10, truncate = false)
    // 方式二:首行不是列名,需要自定义Schema信息,数据文件u.data
    // 自定义schema信息
    val schema: StructType = new StructType()
      .add("user_id", IntegerType, nullable = true)
      .add("iter_id", IntegerType, nullable = true)
      .add("rating", DoubleType, nullable = true)
      .add("timestamp", LongType, nullable = true)
    val df: DataFrame = spark.read
      .format("csv")
      .schema(schema)
      .option("sep", "\\t")
      .load("datas/ml-100k/u.data")
    df.printSchema()
    df.show(10, truncate = false)

在SparkSQL模块中提供对应接口,提供三种方式读取数据:

// TODO: 2. 读取MySQL表中数据
    // 第一、简洁版格式
    /*
      def jdbc(url: String, table: String, properties: Properties): DataFrame
     */
    val props =  new Properties()
    props.put("user", "root")
    props.put("password", "123456")
    props.put("driver", "com.mysql.cj.jdbc.Driver")
    val empDF: DataFrame = spark.read.jdbc(
      "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", //
      "db_test.emp", //
      props //
    )
    println(s"Partition Number = ${empDF.rdd.getNumPartitions}")
    empDF.printSchema()
    empDF.show(10, truncate = false)
    // 第二、标准格式写
    /*
      WITH tmp AS (
        select * from emp e join dept d on e.deptno = d.deptno
      )
     */
    val table: String = "(select ename,deptname,sal from db_test.emp e join db_test.dept d on e.deptno = d.deptno) AS tmp"
    val joinDF: DataFrame = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", table)
      .load()
    joinDF.printSchema()
    joinDF.show(10, truncate = false)

从RDBMS表中读取数据,需要设置连接数据库相关信息,基本属性选项如下:

10-[掌握]-外部数据源之集成Hive(spark-shell)

Spark SQL模块从发展来说,从Apache Hive框架而来,发展历程:Hive(MapReduce)-> Shark(Hive on Spark) -> Spark SQL(SchemaRDD -> DataFrame -> Dataset),所以SparkSQL天然无缝集成Hive,可以加载Hive表数据进行分析。

  • 第一步、当编译Spark源码时,需要指定集成Hive,命令如下

  • 第二步、SparkSQL集成Hive本质就是:读取Hive框架元数据MetaStore,此处启动Hive MetaStore
    服务即可。
# 直接运行如下命令,启动HiveMetaStore服务
[root@node1 ~]# hive-daemon.sh metastore
  • 第三步、连接HiveMetaStore服务配置文件hive-site.xml,放于【$SPARK_HOME/conf】目录
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://node1.itcast.cn:9083</value>
    </property>
</configuration>
  • 第四步、案例演示,读取Hive中db_hive.emp表数据,分析数据
[root@node1 spark]# bin/spark-shell --master local[2]
21/04/27 10:55:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://node1.itcast.cn:4040
Spark context available as 'sc' (master = local[2], app id = local-1619492151923).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_241)
Type in expressions to have them evaluated.
Type :help for more information.
scala> 
scala> val empDF = spark.read.table("db_hive.emp")
empDF: org.apache.spark.sql.DataFrame = [empno: int, ename: string ... 6 more fields]
scala> empDF.show()
+-----+------+---------+----+----------+------+------+------+                   
|empno| ename|      job| mgr|  hiredate|   sal|  comm|deptno|
+-----+------+---------+----+----------+------+------+------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800.0|  null|    20|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0|    30|
| 7521|  WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0|    30|
| 7566| JONES|  MANAGER|7839|  1981-4-2|2975.0|  null|    20|
| 7654|MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0|    30|
| 7698| BLAKE|  MANAGER|7839|  1981-5-1|2850.0|  null|    30|
| 7782| CLARK|  MANAGER|7839|  1981-6-9|2450.0|  null|    10|
| 7788| SCOTT|  ANALYST|7566| 1987-4-19|3000.0|  null|    20|
| 7839|  KING|PRESIDENT|null|1981-11-17|5000.0|  null|    10|
| 7844|TURNER| SALESMAN|7698|  1981-9-8|1500.0|   0.0|    30|
| 7876| ADAMS|    CLERK|7788| 1987-5-23|1100.0|  null|    20|
| 7900| JAMES|    CLERK|7698| 1981-12-3| 950.0|  null|    30|
| 7902|  FORD|  ANALYST|7566| 1981-12-3|3000.0|  null|    20|
| 7934|MILLER|    CLERK|7782| 1982-1-23|1300.0|  null|    10|
+-----+------+---------+----+----------+------+------+------+
scala> empDF.printSchema()
root
 |-- empno: integer (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- mgr: integer (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: double (nullable = true)
 |-- comm: double (nullable = true)
 |-- deptno: integer (nullable = true)
scala> 
scala> spark.sql("select * from db_hive.emp").show()
+-----+------+---------+----+----------+------+------+------+
|empno| ename|      job| mgr|  hiredate|   sal|  comm|deptno|
+-----+------+---------+----+----------+------+------+------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800.0|  null|    20|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0|    30|
| 7521|  WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0|    30|
| 7566| JONES|  MANAGER|7839|  1981-4-2|2975.0|  null|    20|
| 7654|MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0|    30|
| 7698| BLAKE|  MANAGER|7839|  1981-5-1|2850.0|  null|    30|
| 7782| CLARK|  MANAGER|7839|  1981-6-9|2450.0|  null|    10|
| 7788| SCOTT|  ANALYST|7566| 1987-4-19|3000.0|  null|    20|
| 7839|  KING|PRESIDENT|null|1981-11-17|5000.0|  null|    10|
| 7844|TURNER| SALESMAN|7698|  1981-9-8|1500.0|   0.0|    30|
| 7876| ADAMS|    CLERK|7788| 1987-5-23|1100.0|  null|    20|
| 7900| JAMES|    CLERK|7698| 1981-12-3| 950.0|  null|    30|
| 7902|  FORD|  ANALYST|7566| 1981-12-3|3000.0|  null|    20|
| 7934|MILLER|    CLERK|7782| 1982-1-23|1300.0|  null|    10|
+-----+------+---------+----+----------+------+------+------+
scala> spark.sql("select e.ename, e.sal, d.dname from db_hive.emp e join db_hive.dept d on e.deptno = d.deptno").show()
+------+------+----------+
| ename|   sal|     dname|
+------+------+----------+
| SMITH| 800.0|  RESEARCH|
| ALLEN|1600.0|     SALES|
|  WARD|1250.0|     SALES|
| JONES|2975.0|  RESEARCH|
|MARTIN|1250.0|     SALES|
| BLAKE|2850.0|     SALES|
| CLARK|2450.0|ACCOUNTING|
| SCOTT|3000.0|  RESEARCH|
|  KING|5000.0|ACCOUNTING|
|TURNER|1500.0|     SALES|
| ADAMS|1100.0|  RESEARCH|
| JAMES| 950.0|     SALES|
|  FORD|3000.0|  RESEARCH|
|MILLER|1300.0|ACCOUNTING|
+------+------+----------+

11-[掌握]-外部数据源之集成Hive(IDEA开发)

在IDEA中开发应用,集成Hive,读取表的数据进行分析,构建SparkSession时需要设置HiveMetaStore服务器地址集成Hive选项,首先添加MAVEN依赖包:

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

范例演示代码如下:

package cn.itcast.spark.hive
import org.apache.spark.sql.SparkSession
/**
 * SparkSQL集成Hive,读取Hive表的数据进行分析
 */
object _04SparkSQLHiveTest {
  def main(args: Array[String]): Unit = {
    // TODO: 集成Hive,创建SparkSession实例对象时,进行设置HiveMetaStore服务地址
    val spark: SparkSession = SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[2]")
      // 显示指定集成Hive
        .enableHiveSupport()
      // 设置Hive MetaStore服务地址
        .config("hive.metastore.uris", "thrift://node1.itcast.cn:9083")
      .getOrCreate()
    import spark.implicits._
    // 方式一、DSL 分析数据
    val empDF = spark.read
      .table("db_hive.emp")
    empDF.printSchema()
    empDF.show(10, truncate = false)
    println("==================================================")
    // 方式二、编写SQL方式
    spark.sql("select * from db_hive.emp").show()
    // 应用结束,关闭资源
    spark.stop()
  }
}

12-[了解]-外部数据源之自定义实现接口数据源HBase

SparkSQL内部并没有实现从HBase读取数据接口,可以自己实现外部数据源接口,此处提供给大家。

需要注册实现数据源

测试实现外部数据源,从HBase表读取数据:

package cn.itcast.spark.hbase
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 自定义外部数据源HBase,实现数据读写功能
 */
object _05SparkHBaseTest {
  def main(args: Array[String]): Unit = {
    // 创建SparkSession实例对象时
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[2]")
      .config("spark.sql.shuffle.partitions", "2")
      .getOrCreate()
    import spark.implicits._
    // 从HBase表中加载数据
    val hbaseDF: DataFrame = spark.read
      .format("hbase")
        .option("zkHosts", "node1.itcast.cn")
        .option("zkPort", "2181")
        .option("hbaseTable", "stus")
        .option("family", "info")
        .option("selectFields", "name,age")
      .load()
    // 自己实现数据源,从Hbase表中读取数据的所有数据类型都是String类型
    hbaseDF.printSchema()
    hbaseDF.show(10, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}

启动HBase数据库相关服务

[root@node1 ~]# zookeeper-daemon.sh start 
JMX enabled by default
Using config: /export/server/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@node1 ~]# 
[root@node1 ~]# hbase-daemon.sh start master 
starting master, logging to /export/server/hbase/logs/hbase-root-master-node1.itcast.cn.out
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
[root@node1 ~]# 
[root@node1 ~]# hbase-daemon.sh start regionserver
starting regionserver, logging to /export/server/hbase/logs/hbase-root-regionserver-node1.itcast.cn.out
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
[root@node1 ~]# hbase shell
2021-04-27 11:21:05,566 INFO  [main] Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/export/server/hbase-1.2.0-cdh5.16.2/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/export/server/hadoop-2.6.0-cdh5.16.2/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.2.0-cdh5.16.2, rUnknown, Mon Jun  3 03:50:06 PDT 2019
hbase(main):001:0> 
hbase(main):001:0> list
TABLE                                                                                                                                
stus                                                                                                                                 
1 row(s) in 0.2420 seconds
=> ["stus"]
hbase(main):002:0> 
hbase(main):003:0* scan "stus"
ROW                                COLUMN+CELL                                                                                       
 10001                             column=info:age, timestamp=1585823829856, value=24                                                
 10001                             column=info:name, timestamp=1585823791372, value=zhangsan                                         
 10002                             column=info:age, timestamp=1585823838969, value=26                                                
 10002                             column=info:name, timestamp=1585823807947, value=lisi                                             
 10003                             column=info:age, timestamp=1585823845516, value=28                                                
 10003                             column=info:name, timestamp=1585823819460, value=wangwu                                           
3 row(s) in 0.1450 seconds

13-[掌握]-自定义UDF函数在SQL和DSL中使用

SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。

目前来说Spark 框架各个版本及各种语言对自定义函数的支持:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DApgGzLd-1627175964714)(/img/image-20210427112425417.png)]

由于SparkSQL数据分析有两种方式:DSL编程和SQL编程,所以定义UDF函数也有两种方式,不同方式可以在不同分析中使用。

  • 方式一:SQL中使用
使用SparkSession中udf方法定义和注册函数,在SQL中使用,使用如下方式定义:

  • 方式二:DSL中使用
使用org.apache.sql.functions.udf函数定义和注册函数,在DSL中使用,如下方式

案例演示如下所示:

package cn.itcast.spark.udf
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * SparkSQL中UDF函数定义与使用:分别在SQL和DSL中
 */
object _06SparkUdfTest {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象,设置应用名称和master
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .getOrCreate()
    import spark.implicits._
    val empDF: DataFrame = spark.read.json("datas/resources/employees.json")
    /*
      root
       |-- name: string (nullable = true)
       |-- salary: long (nullable = true)
     */
    //empDF.printSchema()
    /*
      +-------+------+
      |name   |salary|
      +-------+------+
      |Michael|3000  |
      |Andy   |4500  |
      |Justin |3500  |
      |Berta  |4000  |
      +-------+------+
     */
    //empDF.show(10, truncate = false)
    /*
      自定义UDF函数功能:将某个列数据,转换为大写
     */
    // TODO: 在SQL中使用
    spark.udf.register(
      "to_upper_udf", // 函数名
      (name: String) => {
        name.trim.toUpperCase
      }
    )
    // 注册DataFrame为临时视图
    empDF.createOrReplaceTempView("view_temp_emp")
    // 编写SQL并执行
    spark.sql("select name, to_upper_udf(name) AS new_name from view_temp_emp").show()
    println("=====================================================")
    // TODO: 在DSL中使用
    import org.apache.spark.sql.functions.udf
    val udf_to_upper: UserDefinedFunction = udf(
      (name: String) => {
        name.trim.toUpperCase
      }
    )
    empDF
      .select(
        $"name",
        udf_to_upper($"name").as("new_name")
      )
      .show()
    // 应用结束,关闭资源
    spark.stop()
  }
}

14-[了解]-分布式SQL引擎之spark-sql交互式命令行

回顾一下,如何使用Hive进行数据分析的,提供哪些方式交互分析???

SparkSQL模块从Hive框架衍生发展而来,所以Hive提供的所有功能(数据分析交互式方式)都支持,文档:http://spark.apache.org/docs/2.4.5/sql-distributed-sql-engine.html

SparkSQL提供spark-sql命令,类似Hive中bin/hive命令,专门编写SQL分析,启动命令如下:
[root@node1 ~]# SPARK_HOME=/export/server/spark
[root@node1 ~]# ${SPARK_HOME}/bin/spark-sql --master local[2] --conf spark.sql.shuffle.partitions=4

此种方式,目前企业使用较少,主要使用下面所述ThriftServer服务,通过Beeline连接执行SQL。

15-[掌握]-分布式SQL引擎之ThriftServer服务和beeline

Spark Thrift Server将Spark Applicaiton当做一个服务运行,提供Beeline客户端和JDBC方式访问,与Hive中HiveServer2服务一样的。

Spark Thrift JDBC/ODBC Server 依赖于HiveServer2服务(依赖JAR包),所有要想使用此功能,在编译Spark源码时,支持Hive Thrift。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jE9BfbCw-1627175964716)(/img/image-20210427113944882.png)]

在$SPARK_HOME目录下的sbin目录,有相关的服务启动命令:

SPARK_HOME=/export/server/spark
$SPARK_HOME/sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10000 \
--hiveconf hive.server2.thrift.bind.host=node1.itcast.cn \
--master local[2] \
--conf spark.sql.shuffle.partitions=2

监控WEB UI界面:

/export/server/spark/bin/beeline
Beeline version 1.2.1.spark2 by Apache Hive
beeline> !connect jdbc:hive2://node1.itcast.cn:10000
Connecting to jdbc:hive2://node1.itcast.cn:10000
Enter username for jdbc:hive2://node1.itcast.cn:10000: root
Enter password for jdbc:hive2://node1.itcast.cn:10000: ****

SparkSQL中提供类似JDBC/ODBC方式,连接Spark ThriftServer服务,执行SQL语句,首先添加Maven依赖库:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive-thriftserver_2.11</artifactId>
    <version>2.4.5</version>
</dependency>

范例演示:采用JDBC方式读取Hive中db_hive.emp表的数据。

package cn.itcast.spark.thrift
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
/**
  * SparkSQL 启动ThriftServer服务,通过JDBC方式访问数据分析查询
  * i). 通过Java JDBC的方式,来访问Thrift JDBC/ODBC server,调用Spark SQL,并直接查询Hive中的数据
  * ii). 通过Java JDBC的方式,必须通过HTTP传输协议发送thrift RPC消息,Thrift JDBC/ODBC server必须通过上面命令启动HTTP模式
  */
object _07SparkThriftJDBCTest {
  def main(args: Array[String]): Unit = {
    // 定义相关实例对象,未进行初始化
    var conn: Connection = null
    var pstmt: PreparedStatement = null
    var rs: ResultSet = null
    try {
      // TODO: a. 加载驱动类
      Class.forName("org.apache.hive.jdbc.HiveDriver")
      // TODO: b. 获取连接Connection
      conn = DriverManager.getConnection(
        "jdbc:hive2://node1.itcast.cn:10000/db_hive",
        "root",
        "123456"
      )
      // TODO: c. 构建查询语句
      val sqlStr: String =
        """
          |select * from user
        """.stripMargin
      pstmt = conn.prepareStatement(sqlStr)
      // TODO: d. 执行查询,获取结果
      rs = pstmt.executeQuery()
      // 打印查询结果
      while (rs.next()) {
        println(s"empno = ${rs.getInt(1)}, ename = ${rs.getString(2)}, sal = ${rs.getDouble(3)}, dname = ${rs.getString(4)}")
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (null != rs) rs.close()
      if (null != pstmt) pstmt.close()
      if (null != conn) conn.close()
    }
  }
}

16-[了解]-分布式SQL引擎之Catalyst 优化器

在第四章【案例:电影评分数据分析】中,运行应用程序代码,通过WEB UI界面监控可以看出,无论使用DSL还是SQL,构建Job的DAG图一样的,性能是一样的,原因在于SparkSQL中引擎:

Catalyst:将SQL和DSL转换为相同逻辑计划

Spark SQL的核心是Catalyst优化器,它以一种新颖的方式利用高级编程语言功能(例如Scala的模式匹配和quasiquotes)来构建可扩展的查询优化器。

上图中可以看到3点:
1、Frontend:前段
  编写SQL和DSL语句地方
2、Catalyst:优化器
  将SQL和DSL转换为逻辑计划LogicalPlan
  由三个部分组成
    Unresolved Logical Plan 未解析逻辑计划
        |
      Logical Plan   逻辑计划
        |
      Optimized Logical Plan 优化逻辑计划
3、Backend:后端
  将逻辑计划转换为物理计划,就是RDD转换操作

附录一、创建Maven模块

1)、Maven 工程结构

2)、POM 文件内容

Maven 工程POM文件中内容(依赖包):

<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    <properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
        <hbase.version>1.2.0-cdh5.16.2</hbase.version>
        <mysql.version>8.0.19</mysql.version>
    </properties>
    <dependencies>
        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Spark SQL 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Spark SQL 与 Hive 集成 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- HBase Client 依赖 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-hadoop2-compat</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <!-- MySQL Client 依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
    </dependencies>
    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
<version>${mysql.version}</version>
    </dependency>
</dependencies>
<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

         


目录
相关文章
|
23天前
|
机器学习/深度学习 分布式计算 数据挖掘
MaxFrame 性能评测:阿里云MaxCompute上的分布式Pandas引擎
MaxFrame是一款兼容Pandas API的分布式数据分析工具,基于MaxCompute平台,极大提升了大规模数据处理效率。其核心优势在于结合了Pandas的易用性和MaxCompute的分布式计算能力,无需学习新编程模型即可处理海量数据。性能测试显示,在涉及`groupby`和`merge`等复杂操作时,MaxFrame相比本地Pandas有显著性能提升,最高可达9倍。适用于大规模数据分析、数据清洗、预处理及机器学习特征工程等场景。尽管存在网络延迟和资源消耗等问题,MaxFrame仍是处理TB级甚至PB级数据的理想选择。
49 4
|
3天前
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
|
2月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
2月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
3月前
|
存储 缓存 数据处理
深度解析:Hologres分布式存储引擎设计原理及其优化策略
【10月更文挑战第9天】在大数据时代,数据的规模和复杂性不断增加,这对数据库系统提出了更高的要求。传统的单机数据库难以应对海量数据处理的需求,而分布式数据库通过水平扩展提供了更好的解决方案。阿里云推出的Hologres是一个实时交互式分析服务,它结合了OLAP(在线分析处理)与OLTP(在线事务处理)的优势,能够在大规模数据集上提供低延迟的数据查询能力。本文将深入探讨Hologres分布式存储引擎的设计原理,并介绍一些关键的优化策略。
187 0
|
3月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
106 0
|
3月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
117 0
|
3月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
88 0
|
SQL 分布式计算 Java
【译】深入分析Spark UDF的性能
这篇博客会阐述一份关于Apache Spark的在Scala UDF、 PySpark UDF 和PySpark Pandas UDF之间的性能评测报告。
3351 0
【译】深入分析Spark UDF的性能
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
202 2
ClickHouse与大数据生态集成:Spark & Flink 实战