SparkSQL 读写_JDBC_写入数据 | 学习笔记

简介: 快速学习 SparkSQL 读写_JDBC_写入数据

开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段SparkSQL 读写_JDBC_写入数据】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/690/detail/12065


SparkSQL 读写_JDBC_写入数据

 

内容介绍:

一、步骤

二、实操

 

通过上节课的学习,我们配置好了相关的环境,就可以使用 SparkSQL 访问 MySQL 当中的数据了。

在实际操作中,应该先写数据再读数据,因为先把数据写到数据库里,再使用 SparkSQL 读取,这样读出的数据才会比较多。本节课就学习直接使用 SparkSQL 来去写入数据。

 

一、步骤

1、创建 SparkSession

无论是编写何种程序,第一步都应创建创建 SparkSession。

val spark=SparkSession

.builder()

.appName("hive example")

.master("local[6]")

.getOrCreate()

2、读取数据

要读取数据,首先要先有数据集,才能将其写到 MySQL 中,故要先去读该数据集 studenttab10k,因为创建表时也是使用这个 schema 来完成的,因此,此时需要去指定一个 schema。

val schema=StructType(

List(

StructField("name",StringType),

StructField("age",IntegerType),

StructField("gpa",FloatType)

)

)

val studentDF=spark.read

.option("delimiter","\t")

.schema(schema)

.csv("dataset/studenttabl0k")

3、数据写入

studentDF.write.format("jdbc").mode(SaveMode.Overwrite)

.option("url","jdbc:mysql://node01:3306/spark test")

.option("dbtable","student")

.option("user","spark")

.option("password","Spark123!")

.save()

先指定 format 是 JDBC,指定写入模式,最后 save,这是一个标准的场景。

但是要解决数据应写在哪里、URL 是什么、表应该是怎样的等一系列问题,还需要通过一些参数配置信息,配置这些信息的参数在这里都有显示。

 

image.png

首先,可以通过 url 指定要连接的 JDBC 的 url;也通过 dbtable 可以指定要访问的表;在读取数据时,可以使用 fetchsize 来进行数据抓取大小设置,即一次最多抓取的数据的条数;在进行数据写入时,可以使用 batchsize 规定一次最多写的数据的条数;最后可以通过 isolation level 来指定事物隔离级别。

事物隔离级别不是经常设置的参数,因为数据库一般都有自己默认的,如果有进行修改的需求,也一般会改一些公共的,如改数据库或者默认的参数。

 

二、实操

进入到 idea 实现步骤。首先创建 Scala Class,并将其指定为 object,为其命名为 MySQLWrite。

package cn.itcast.spark.sql

//MySQL 的访问方式有两种,一种是使用本地运行的方式运行,另一种是提交到集群中运行。设置这样两种不同的访问方式的原因在于在本地运行和提交到集群中运行的部分内容存在差异。

//为了能体现这两种访问方式,在写入 MySQL 数据时,使用两种访问方式操作,即使用本地运行的访问方式,而读取数据时使用集群运行。

object MySQLWrite(

//直接定义一个 object:MySQLWrite

def main(args:Array[string]):unit = (

//创建 main 方法

//1、创建 SparkSession 对象

val spark = New Spark/SparkSession.builder

//可以使用 New Spark 或直接使用 SparkSession 的静态方法 builder 来直接获取 builder 对象

.master("local[6]")

//设置 builder 对象的 master 为 local[6]

.appName("mysql write")

//设置 appName 为 mysql write

.gettoCreate()

//通过 builder 对象创建 SparkSession 对象

//2、从文件中读取数据,创建 DateFrame

(1)拷贝文件

进入到/spark/Files/Dataset/,将 studenttab10k 文件拷贝到 idea 中,放在 Dataset 目录下。打开该文件,依旧没有相关的 schema 信息,因此,还需要手动指定 schema,并指定分隔符 delimiter 为 1 个 tab。

(2)读取

val schema = StructType(

//通过创建 StructType 来创建 schema,注意 StructType 在导入包时候应导入的是 sql.types 下的 StructType,而不是 ColumMetaData 下的 StructType

List(

StructField("name",StringType),

StructField("age",IntegerType),

StructField("gpa",FloatType)

)

//在这个 play 方法中,应传入 List,在 List 中使用 StructField 指定每一列的信息,在打包 StructField 时要注意导入 spark 的包

)

val df = spark.read

.schema(schema)

//指定 schema 为 schema

.option("delimiter","\t")

//指定 option,指定读取文件的制表符为\t

.csv("dataset/studenttabl0k")

//指定路径

//3、处理数据,是一般的数据处理流程

val resultDF = df.where("age < 30")

//where 中不只可以使用表达式,也可使用 sql 的字串

//创建了变量 resultDF,获取到了 DateFrame

//4、落地数据,即将数据落地到 MySQL 中

//将 DateFrame 落地到数据库中,并指定相关信息

resultDF.write

.format("jdbc")

//指定 format 是 jdbc

.option("url"jdbc:mysql://node01:3306/spark02")

//指定 url 为 jdbc:mysql://node01:3306

//指定数据库名为 spark02

.option("dbtable","student")

//指定数据最终落地的表 dbtable 为 student

.option("user","spark03")

//指定用户名,即访问的方式为 spark

.option("password","Spark03!")

//指定密码 password 为“Spark03!”

.save()

//保存

5、运行纠

运行这段代码,运行结果显示有两处错误。

其一是 Table or view “student”already exist,即表 student 已经存在,则应指定写入模式是追加或是覆盖。因此,可以通过在 save(保存)之前指定写入方式直接覆盖掉原 student 表,即:

.mode(Savemode.OverWrite)

其二是 java.sql.SQL exception:No suitable driver,即没有 driver。现在要访问 sql,但编码没有给定 driver,因此要指定 driver。

driver 实际上是 Mysql 的 JDBC 的 driver。如果是本地运行,那么就要去修改 pom 文件。打开 pom 文件指定 dependency。

将 Maven 依赖的 dependency 拷贝到 pom 文件中的任意位置,且版本为 5.1.47:

mysql

mysql-connector-

java5.1.47

等待加载结束,如果加载时间较长,可以使用 Maven 进行 compare 或 package,使用 Maven 加载会更快一些,而单使用 idea 加载速度较慢。

再次运行程序,运行结束后,进入代码编辑进行相应的查看。

mysql -u root -p ;

输入密码之后,再输入代码进行查询:

select * from spark02.student limit100;

image.png

可以发现数据中人的年龄全小于 30 岁,说明数据处理没有任何问题。

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
9月前
|
Java 数据库连接 数据库
【YashanDB知识库】jdbc查询st_geometry类型的数据时抛出YAS-00101 cannot allocate 0 bytes for anlHeapMalloc异常
【YashanDB知识库】jdbc查询st_geometry类型的数据时抛出YAS-00101 cannot allocate 0 bytes for anlHeapMalloc异常
|
9月前
|
Java 数据库连接 定位技术
【YashanDB知识库】如何使用jdbc向YashanDB批量插入gis数据
本文以GIS表为例,介绍通过Java代码向数据库插入POINT类型地理数据的方法。首先创建包含ID和POS字段的GIS表,POS字段为ST_GEOMETRY类型。接着利用Java的PreparedStatement批量插入10条经纬度相同的POINT数据,最后查询结果显示成功插入10条记录,验证了操作的正确性。
181 19
|
9月前
|
Java 数据库连接 定位技术
【YashanDB知识库】如何使用jdbc向YashanDB批量插入gis数据
【YashanDB知识库】如何使用jdbc向YashanDB批量插入gis数据
|
10月前
|
Java 数据库连接 数据库
【YashanDB 知识库】jdbc 查询 st_geometry 类型的数据时抛出 YAS-00101 cannot allocate 0 bytes for anlHeapMalloc 异常
**简介:** 客户在使用 YashanDB JDBC 驱动查询含 st_geometry 列的数据时,遇到 YAS-00101 错误,提示无法分配内存。该问题影响所有版本的 YashanDB,导致业务中断。原因是用户缺少对 st_geometry 类型的 execute 权限。解决方法是为用户赋权:`grant execute any type to &lt;username&gt;;` 以恢复正常运行。
|
9月前
|
Java 数据库连接 定位技术
【YashanDB知识库】如何使用jdbc向YashanDB批量插入gis数据
【YashanDB知识库】如何使用jdbc向YashanDB批量插入gis数据
|
9月前
|
SQL Java 数据库连接
【YashanDB数据库】由于网络带宽不足导致的jdbc向yashandb插入数据慢
由于网络带宽不足导致的jdbc向yashandb插入数据慢
|
10月前
|
Java 数据库连接 数据库
【YashanDB 知识库】jdbc 查询 st_geometry 类型的数据时抛出 YAS-00101 cannot allocate 0 bytes for anlHeapMalloc 异常
**问题简介:** 客户使用 YashanDB JDBC 驱动查询含 st_geometry 列的数据时,出现 YAS-00101 错误,提示无法分配 0 字节内存。该问题影响所有 YashanDB 版本,导致业务中断。原因是数据库用户缺少 st_geometry 类型的 execute 权限。解决方法是为用户赋权:`grant execute any type to &lt;username&gt;;`。
|
SQL druid Java
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(下)
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)
203 3
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(下)
|
SQL Java 关系型数据库
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(上)
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)
600 3
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(上)
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
286 0