Spark【Spark SQL(二)RDD转换DataFrame、Spark SQL读写数据库 】

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: Spark【Spark SQL(二)RDD转换DataFrame、Spark SQL读写数据库 】

RDD 转换得到 DataFrame

Saprk 提供了两种方法来实现从 RDD 转换得到 DataFrame:

  1. 利用反射机制推断 RDD 模式
  2. 使用编程方式定义 RDD 模式

下面使用到的数据 people.txt :

Tom, 21
Mike, 25
Andy, 18

1、利用反射机制推断 RDD 模式

       在利用反射机制推断 RDD 模式的过程时,需要先定义一个 case 类,因为只有 case 类才能被 Spark 隐式地转换为DataFrame对象。

object Tese{
    // 反射机制推断必须使用 case 类,case class 必须放到main方法之外
    case class Person(name: String,age: Long)  //定义一个case类
def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("rdd to df 1")
      .getOrCreate()
    import spark.implicits._ //这里的spark不是org.apache.spark这个包 而是我们创建的SparkSession对象 它支持把一个RDD隐式地转换为一个 DataFrame对象
    val rdd: RDD[Person] = spark.sparkContext
      .textFile("data/sql/people.txt")
      .map(line => line.split(","))
      .map(t => Person(t(0), t(1).trim.toInt))
    // 将RDD对象转为DataFrame对象
    val df: DataFrame = rdd.toDF()
    df.createOrReplaceTempView("people")
    spark.sql("SELECT * FROM people WHERE age > 20").show()
    spark.stop()
  }
}

注意事项1:

case 类必须放到伴生对象下,main方法之外,因为在隐式转换的时候它会自动通过 伴生对象名.case类名 来调用case类,如果放到main下面就找不到了。

注意事项2:

import spark.implicits._

这里的spark不是org.apache.spark这个包 而是我们上面创建的SparkSession对象 它支持把一个RDD隐式地转换为一个 DataFrame对象

image.png

2、使用编程方式定义 RDD 模式

       反射机制推断时需要定义 case class,但当无法定义 case 类时,就需要采用编程式来定义 RDD 模式了。这种方法看起来比较繁琐,但是很好用,不容易报错。

      我们现在同样加载 people.txt 中的数据,生成 RDD 对象,再把RDD对象转为DataFrame对象,进行SparkSQL 查询。主要包括三个步骤:

  1. 制作表头 schema: StructType
  2. 制作表中记录 rowRDD: RDD[Row]
  3. 合并表头和记录 df:DataFramw
def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("rdd to df 2")
      .getOrCreate()
    //1.制作表头-也就是定义表的模式
    val schema: StructType = StructType(Array(StructField("name", StringType, true),
      StructField("age", IntegerType, true)))
    //2.加载表中的记录-也就是读取文件生成RDD
    val rowRdd: RDD[Row] = spark.sparkContext
      .textFile("data/sql/people.txt")
      .map(_.split(","))
      .map(attr => Row(attr(0), attr(1).trim.toInt))
    //3.把表头和记录拼接在一起
    val peopleDF: DataFrame = spark.createDataFrame(rowRdd, schema)
    peopleDF.createOrReplaceTempView("people")
    spark.sql("SELECT * FROM people WHERE age > 20").show()
    spark.stop()
  }

运行结果:

+----+---+
|name|age|
+----+---+
| Tom| 21|
|Mike| 25|
+----+---+

Spark SQL读取数据库

导入依赖

根据自己本地的MySQL版本导入对应的驱动。

注意:mysql8.0版本在JDBC中的url是:" com.mysql.cj.jdbc.Driver "

<dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.31</version>
        </dependency>

读取 MySQL 中的数据

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("jdbc spark sql")
      .getOrCreate()
    val mysql: DataFrame = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/spark")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("dbtable", "student")
      .option("user", "root")
      .option("password", "Yan1029.")
      .load()
    mysql.show()
    spark.stop()
  }

运行结果:

默认显示整张表

+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
|  1| Tom| 21| 男|
|  2|Andy| 20| 女|
+---+----+---+---+

向 MySQL 写入数据

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("jdbc spark sql")
      .getOrCreate()
    //导入两条student信息
    val rdd: RDD[Array[String]] = spark.sparkContext
      .parallelize(Array("3 Mike 22 男", "4 Cindy 23 女"))
      .map(_.split(" "))
    //设置模式信息-创建表头
    val schema: StructType = StructType(Array(StructField("id", IntegerType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true),
      StructField("sex", StringType, true)))
    //创建Row对象 每个 Row对象都是表中的一行-创建记录
    val rowRDD = rdd.map(stu => Row(stu(0).toInt, stu(1), stu(2).toInt, stu(3)))
    //创建DataFrame对象 拼接表头和记录
    val df = spark.createDataFrame(rowRDD, schema)
    //创建一个 prop 变量 用来保存 JDBC 连接参数
    val prop = new Properties()
    prop.put("user","root")
    prop.put("password","Yan1029.")
    prop.put("driver","com.mysql.cj.jdbc.Driver")
    //写入数据 采用 append 模式追加
    df.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)
    spark.stop()
  }

运行结果:


总结

       今天上午就学到这里,本想着今天专门看看StructType、StructField和Row这三个类的,没想到就在这节课。这一篇主要学了RDD对象向DataFrame对象的转换以及Spark SQL如何读取数据库、写入数据库。


       下午学完这一章最后的DataSet。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2月前
|
SQL 开发框架 .NET
ASP.NET连接SQL数据库:详细步骤与最佳实践指南ali01n.xinmi1009fan.com
随着Web开发技术的不断进步,ASP.NET已成为一种非常流行的Web应用程序开发框架。在ASP.NET项目中,我们经常需要与数据库进行交互,特别是SQL数据库。本文将详细介绍如何在ASP.NET项目中连接SQL数据库,并提供最佳实践指南以确保开发过程的稳定性和效率。一、准备工作在开始之前,请确保您
217 3
|
1月前
|
SQL 数据采集 监控
局域网监控电脑屏幕软件:PL/SQL 实现的数据库关联监控
在当今网络环境中,基于PL/SQL的局域网监控系统对于企业和机构的信息安全至关重要。该系统包括屏幕数据采集、数据处理与分析、数据库关联与存储三个核心模块,能够提供全面而准确的监控信息,帮助管理者有效监督局域网内的电脑使用情况。
19 2
|
2月前
|
SQL 关系型数据库 MySQL
Go语言项目高效对接SQL数据库:实践技巧与方法
在Go语言项目中,与SQL数据库进行对接是一项基础且重要的任务
83 11
|
27天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
2月前
|
SQL 监控 数据库
慢SQL对数据库写入性能的影响及优化技巧
在数据库管理系统中,慢SQL(即执行缓慢的SQL语句)不仅会影响查询性能,还可能对数据库的写入性能产生显著的不利影响
|
2月前
|
SQL 数据库 数据库管理
数据库SQL函数应用技巧与方法
在数据库管理中,SQL函数是处理和分析数据的强大工具
|
2月前
|
SQL 数据库
SQL数据库基础语法入门
[link](http://www.vvo.net.cn/post/082935.html)
|
2月前
|
SQL 存储 监控
串口调试助手连接SQL数据库的技巧与方法
串口调试助手是电子工程师和软件开发人员常用的工具,它能够帮助用户进行串口通信的调试和数据分析
|
2月前
|
SQL 存储 数据采集
如何把问卷录入SQL数据库
将问卷数据录入SQL数据库是一个涉及数据收集、处理和存储的过程
|
2月前
|
SQL 开发框架 .NET
ASP.NET连接SQL数据库:实现过程与关键细节解析an3.021-6232.com
随着互联网技术的快速发展,ASP.NET作为一种广泛使用的服务器端开发技术,其与数据库的交互操作成为了应用开发中的重要环节。本文将详细介绍在ASP.NET中如何连接SQL数据库,包括连接的基本概念、实现步骤、关键代码示例以及常见问题的解决方案。由于篇幅限制,本文不能保证达到完整的2000字,但会确保