【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)

需要源码和依赖请点赞关注收藏后评论区留言私信~~~

一、Dataframe操作

步骤如下

1)利用IntelliJ IDEA新建一个maven工程,界面如下

2)修改pom.XML添加相关依赖包

3)在工程名处点右键,选择Open Module Settings

4)配置Scala Sdk,界面如下

5)新建文件夹scala,界面如下:

6) 将文件夹scala设置成Source Root,界面如下:

7) 新建scala类,界面如下:

此类主要功能是读取D盘下的people.txt文件,使用编程方式操作DataFrame,相关代码如下

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
case class Person(name:String,age:Long)
object sparkSqlSchema {
  def main(args: Array[String]): Unit = {
    //创建Spark运行环境
    val spark = SparkSession.builder().appName("sparkSqlSchema").master("local").getOrCreate()
    val sc = spark.sparkContext;
    //读取文件
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt"). map (x => x.split(","));
    //将RDD与样例类关联
    val personRdd: RDD[Person] = data. map (x => Person(x(0),x(1).toLong))
    //手动导入隐式转换
    import spark.implicits._
    val personDF: DataFrame = personRdd.toDF
    //显示DataFrame的数据
    personDF.show()
    //显示DataFrame的schema信息
    personDF.printSchema()
    //显示DataFrame记录数
    println(personDF.count())
    //显示DataFrame的所有字段
    personDF.columns.foreach(println)
    //取出DataFrame的第一行记录
    println(personDF.head())
    //显示DataFrame中name字段的所有值
    personDF.select("name").show()
    //过滤出DataFrame中年龄大于20的记录
    personDF.filter($"age" > 20).show()
    //统计DataFrame中年龄大于20的人数
    println(personDF.filter($"age" > 20).count())
    //统计DataFrame中按照年龄进行分组,求每个组的人数
    personDF.groupBy("age").count().show()
    //将DataFrame注册成临时表
    personDF.createOrReplaceTempView("t_person")
    //传入sql语句,进行操作
    spark.sql("select * from t_person").show()
    spark.sql("select * from t_person where name='王五'").show()
    spark.sql("select * from t_person order by age desc").show()
    //DataFrame转换成Dataset
    var ds=personDF.as[Person]
    ds.show()
    //关闭操作
    sc.stop()
    spark.stop()
  }
}

二、Spark SQL读写MySQL数据库

下面的代码使用JDBC连接MySQL数据库,并进行读写操作 主要步骤如下

1:新建数据库

2:新建表

3:添加依赖包

4:新建类

5:查看运行结果

代码如下

import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode,SparkSession}
object sparkSqlMysql {
  def main(args: Array[String]): Unit = {
    //创建sparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .appName("sparkSqlMysql")
      .master("local")
      .getOrCreate()
    val sc = spark.sparkContext
    //读取数据
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt").map(x => x.split(","));
    //RDD关联Person
    val personRdd: RDD[Person] = data.map(x => Person(x(0), x(1).toLong))
    //导入隐式转换
    import spark.implicits._
    //将RDD转换成DataFrame
    val personDF: DataFrame = personRdd.toDF()
    personDF.show()
    //创建Properties对象,配置连接mysql的用户名和密码
    val prop =new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","123456")
    //将personDF写入MySQL
    personDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/spark?useUnicode=true&characterEncoding=utf8","person",prop)
    //从数据库里读取数据
    val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/spark", "person",                   prop)
    mysqlDF.show()
    spark.stop()
  }
}

三、Spark SQL读写Hive

下面的示例程序连接Hive,并读写Hive下的表 主要步骤如下

1:在pom.xml中添加Hive依赖包

2:连接Hive

3:新建表

4:向Hive表写入数据,新scala类sparksqlToHIVE,主要功能是读取D盘下的people.txt文件,使用编程方式操作DataFrame,然后插入到HIVE的表中。

5:查看运行结果

代码如下

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame,SparkSession}
object  sparksqlToHIVE {
  def main(args: Array[String]): Unit = {
    //设置访问用户名,主要用于访问HDFS下的Hive warehouse目录
    System.setProperty("HADOOP_USER_NAME", "root")
    //创建sparkSession
    val spark: SparkSession = SparkSession.builder()
      .appName("sparksqlToHIVE")
      .config("executor-cores",1)
      .master("local")
      .enableHiveSupport() //开启支持Hive
      .getOrCreate()
    val sc = spark.sparkContext
    //读取文件
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt"). map (x => x.split(","));
    //将RDD与样例类关联
    val personRdd: RDD[Person] = data. map (x => Person(x(0),x(1).toLong))
    //手动导入隐式转换
    import spark.implicits._
    val personDF: DataFrame = personRdd.toDF
    //显示DataFrame的数据
    personDF.show()
    //将DataFrame注册成临时表t_person
    personDF.createOrReplaceTempView("t_person")
    //显示临时表t_person的数据
    spark.sql("select * from t_person").show()
    //使用Hive中bigdata的数据库
    spark.sql("use bigdata")
    //将临时表t_person的数据插入使用Hive中bigdata数据库下的person表中
    spark.sql("insert into person select * from t_person")
    //显示用Hive中bigdata数据库下的person表数据
    spark.sql("select * from person").show()
    spark.stop()
  }
}

创作不易 觉得有帮助请点赞关注收藏~~~

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
26天前
|
SQL 存储 关系型数据库
【MySQL基础篇】全面学习总结SQL语法、DataGrip安装教程
本文详细介绍了MySQL中的SQL语法,包括数据定义(DDL)、数据操作(DML)、数据查询(DQL)和数据控制(DCL)四个主要部分。内容涵盖了创建、修改和删除数据库、表以及表字段的操作,以及通过图形化工具DataGrip进行数据库管理和查询。此外,还讲解了数据的增、删、改、查操作,以及查询语句的条件、聚合函数、分组、排序和分页等知识点。
【MySQL基础篇】全面学习总结SQL语法、DataGrip安装教程
|
3天前
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
|
1月前
|
SQL 存储 缓存
MySQL进阶突击系列(02)一条更新SQL执行过程 | 讲透undoLog、redoLog、binLog日志三宝
本文详细介绍了MySQL中update SQL执行过程涉及的undoLog、redoLog和binLog三种日志的作用及其工作原理,包括它们如何确保数据的一致性和完整性,以及在事务提交过程中各自的角色。同时,文章还探讨了这些日志在故障恢复中的重要性,强调了合理配置相关参数对于提高系统稳定性的必要性。
|
1月前
|
SQL 关系型数据库 MySQL
MySQL 高级(进阶) SQL 语句
MySQL 提供了丰富的高级 SQL 语句功能,能够处理复杂的数据查询和管理需求。通过掌握窗口函数、子查询、联合查询、复杂连接操作和事务处理等高级技术,能够大幅提升数据库操作的效率和灵活性。在实际应用中,合理使用这些高级功能,可以更高效地管理和查询数据,满足多样化的业务需求。
187 3
|
1月前
|
SQL 关系型数据库 MySQL
MySQL导入.sql文件后数据库乱码问题
本文分析了导入.sql文件后数据库备注出现乱码的原因,包括字符集不匹配、备注内容编码问题及MySQL版本或配置问题,并提供了详细的解决步骤,如检查和统一字符集设置、修改客户端连接方式、检查MySQL配置等,确保导入过程顺利。
|
1月前
|
SQL 存储 关系型数据库
MySQL进阶突击系列(01)一条简单SQL搞懂MySQL架构原理 | 含实用命令参数集
本文从MySQL的架构原理出发,详细介绍其SQL查询的全过程,涵盖客户端发起SQL查询、服务端SQL接口、解析器、优化器、存储引擎及日志数据等内容。同时提供了MySQL常用的管理命令参数集,帮助读者深入了解MySQL的技术细节和优化方法。
|
1月前
|
SQL Oracle 关系型数据库
SQL(MySQL)
SQL语言是指结构化查询语言,是一门ANSI的标准计算机语言,用来访问和操作数据库。 数据库包括SQL server,MySQL和Oracle。(语法大致相同) 创建数据库指令:CRATE DATABASE websecurity; 查看数据库:show datebase; 切换数据库:USE websecurity; 删除数据库:DROP DATABASE websecurity;
|
2月前
|
PHP 数据库 数据安全/隐私保护
布谷直播源码部署服务器关于数据库配置的详细说明
布谷直播系统源码搭建部署时数据库配置明细!
|
2月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
27天前
|
存储 Oracle 关系型数据库
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
55 3