Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))

02-[了解]-内容提纲

主要2个方面内容:DataFrame是什么和数据分析(案例讲解)

1、DataFrame是什么
  SparkSQL模块前世今生、官方定义和特性
  DataFrame是什么
    DataFrame = RDD[Row] + Schema,Row表示每行数据,抽象的,并不知道每行Row数据有多少列,弱类型
  案例演示,spark-shell命令行
  Row 表示每行数据,如何获取各个列的值
  RDD如何转换为DataFrame
    - 反射推断
    - 自定义Schema
  调用toDF函数,创建DataFrame
2、数据分析(案例讲解)
  编写DSL,调用DataFrame API(类似RDD中函数,比如flatMap和类似SQL中关键词函数,比如select)
  编写SQL语句
    注册DataFrame为临时视图
    编写SQL语句,类似Hive中SQL语句
  使用函数:
    org.apache.spark.sql.functions._
  电影评分数据分析
    分别使用DSL和SQL

03-[了解]-SparkSQL 概述之前世今生

SparkSQL模块一直到Spark 2.0版本才算真正稳定,发挥其巨大功能,发展经历如下几个阶段。

1、Spark 1.0之前
  Shark = Hive + Spark
  将Hive框架源码,修改其中转换SQL为MapReduce,变为转换RDD操作,称为Shark
  问题:
    维护成本太高,没有更多精力在于框架性能提升
2、Spark 1.0开始提出SparkSQL模块
  重新编写引擎Catalyst,将SQL解析为优化逻辑计划Logical Plan
  此时数据结构:SchemaRDD
  测试开发版本,不能用于生产环境
3、Spark 1.3版本,SparkSQL成为Release版本
  数据结构DataFrame,借鉴与Python和R中dataframe
  提供外部数据源接口
    方便可以从任意外部数据源加载load和保存save数据
4、Spark 1.6版本,SparkSQL数据结构Dataset
  坊间流传,参考Flink中DataSet数据结构而来
  Dataset = RDD + schema
5、Spark 2.0版本,DataFrame和Dataset何为一体
  Dataset = RDD + schema
  DataFrame = Dataset[Row]

Spark 2.x发布时,将Dataset和DataFrame统一为一套API,以Dataset数据结构为主(Dataset= RDD + Schema),其中DataFrame = Dataset[Row]

04-[了解]-SparkSQL 概述之官方定义及特性

SparkSQL模块官方定义:针对结构化数据处理Spark Module模块

主要包含三层含义:

  • 第一、针对结构化数据处理,属于Spark框架一个部分

  • 第二、抽象数据结构:DataFrame
DataFrame = RDD[Row] + Schema信息;
  • 第三、分布式SQL引擎,类似Hive框架

从Hive框架继承而来,Hive中提供bin/hive交互式SQL命令行及HiveServer2服务,SparkSQL都可以;

Spark SQL模块架构示意图如下:

Spark SQL是Spark用来处理结构化数据的一个模块,主要四个特性:

官方文档:http://spark.apache.org/docs/2.4.5/sql-distributed-sql-engine.html

05-[掌握]-DataFrame是什么及案例演示

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。

DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型

使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行针对性的优化,最终达到大幅提升运行时效率

DataFrame有如下特性:

范例演示加载json格式数据

[root@node1 spark]# bin/spark-shell --master local[2]
21/04/26 09:26:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://node1.itcast.cn:4040
Spark context available as 'sc' (master = local[2], app id = local-1619400386041).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_241)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val empDF = spark.read.json("/datas/resources/employees.json")
empDF: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
scala> empDF.printSchema()
root
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)
scala> 
scala> empDF.show()
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+
scala> 
scala> empDF.rdd
res2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[12] at rdd at <console>:26

所以,可以看出:DataFrame = RDD[Row] + Schema信息

06-[掌握]-DataFrame中Schema和Row

查看DataFrame中Schema是什么,执行如下命令:

scala> empDF.schema

可以发现Schema封装类:StructType,结构化类型,里面存储的每个字段封装的类型:StructField,结构化字段。

  • 其一、StructType 定义,是一个样例类,属性为StructField的数组

  • 其二、StructField 定义,同样是一个样例类,有四个属性,其中字段名称和类型为必填

自定义Schema结构,官方提供实例代码:

DataFrame中每条数据封装在Row中,Row表示每行数据,具体哪些字段位置,获取DataFrame中第一条数据。

如何获取Row中每个字段的值呢????

  • 方式一:下标获取,从0开始,类似数组下标获取

  • 方式二:指定下标,知道类型

  • 方式三:通过As转换类型, 此种方式开发中使用最多

如何创建Row对象呢???要么是传递value,要么传递Seq

07-[掌握]-RDD转换DataFrame之反射类型推断

实际项目开发中,往往需要将RDD数据集转换为DataFrame,本质上就是给RDD加上Schema信息,官方提供两种方式:类型推断和自定义Schema

文档:http://spark.apache.org/docs/2.4.5/sql-getting-started.html#interoperating-with-rdds

范例演示说明:使用经典数据集【电影评分数据u.data】,先读取为RDD,再转换为DataFrame。

字段信息:user id 、 item id、 rating 、 timestamp。

当RDD中数据类型CaseClass样例类时,通过反射Reflecttion获取属性名称和类型,构建Schema,应用到RDD数据集,将其转换为DataFrame。

package cn.itcast.spark.convert
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
 * 采用反射的方式将RDD转换为DataFrame和Dataset
 */
object _01SparkRDDInferring {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象,设置应用名称和master
    val spark: SparkSession = SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[3]")
        .getOrCreate()
    import spark.implicits._
    // 1. 加载电影评分数据,封装数据结构RDD
    val rawRatingRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-100k/u.data")
    // 2. 将RDD数据类型转化为 MovieRating
    /*
      将原始RDD中每行数据(电影评分数据)封装到CaseClass样例类中
     */
    val ratingRDD: RDD[MovieRating] = rawRatingRDD.mapPartitions { iter =>
      iter.map { line =>
        // 按照制表符分割
        val arr: Array[String] = line.trim.split("\\t")
        // 封装样例对象
        MovieRating(
          arr(0), arr(1), arr(2).toDouble, arr(3).toLong
        )
      }
    }
    // 3. 通过隐式转换,直接将CaseClass类型RDD转换为DataFrame
    val ratingDF: DataFrame = ratingRDD.toDF()
    //ratingDF.printSchema()
    //ratingDF.show(10, truncate = false)
    /*
      Dataset 从Spark1.6提出
        Dataset = RDD + Schema
        DataFrame = RDD[Row] + Schema
        Dataset[Row] = DataFrame
     */
    // 将DataFrame转换为Dataset,只需要加上CaseClass强类型即可
    val ratingDS: Dataset[MovieRating] = ratingDF.as[MovieRating]
    ratingDS.printSchema()
    ratingDS.show(10, truncate = false)
    // TODO: 将RDD转换为Dataset,可以通过隐式转, 要求RDD数据类型必须是CaseClass
    val dataset: Dataset[MovieRating] = ratingRDD.toDS()
    dataset.printSchema()
    dataset.show(10, truncate = false)
    // 将Dataset直接转换为DataFrame
    val dataframe = dataset.toDF()
    dataframe.printSchema()
    dataframe.show(10, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}

08-[掌握]-RDD转换DataFrame之自定义Schema

依据RDD中数据自定义Schema,类型为StructType,每个字段的约束使用StructField定义,具体步骤如下:

package cn.itcast.spark.convert
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
 * 自定义Schema方式转换RDD为DataFrame
 */
object _02SparkRDDSchema {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象,设置应用名称和master
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .getOrCreate()
    import spark.implicits._
    // 1. 加载电影评分数据,封装数据结构RDD
    val rawRatingsRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-100k/u.data", minPartitions = 2)
    // 2. TODO: step1. RDD中数据类型为Row:RDD[Row]
    val rowRDD: RDD[Row] = rawRatingsRDD.mapPartitions { iter =>
      iter.map {
        line =>
          // 按照制表符分割
          val arr: Array[String] = line.trim.split("\\t")
          // 封装样例对象
          Row(arr(0), arr(1), arr(2).toDouble, arr(3).toLong)
      }
    }
    // 3. TODO:step2. 针对Row中数据定义Schema:StructType
    val schema: StructType = StructType(
      Array(
        StructField("user_id", StringType, nullable = true),
        StructField("movie_id", StringType, nullable = true),
        StructField("rating", DoubleType, nullable = true),
        StructField("timestamp", LongType, nullable = true)
      )
    )
    // 4. TODO:step3. 使用SparkSession中方法将定义的Schema应用到RDD[Row]上
    val ratingDF: DataFrame = spark.createDataFrame(rowRDD, schema)
    ratingDF.printSchema()
    ratingDF.show(10, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}

09-[掌握]-toDF函数指定列名称转换为DataFrame

SparkSQL中提供一个函数:toDF,通过指定列名称,将数据类型为元组的RDD或Seq转换为DataFrame,实际开发中也常常使用。

范例演示:将数据类型为元组的RDD或Seq直接转换为DataFrame。

package cn.itcast.spark.todf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 隐式调用toDF函数,将数据类型为元组的Seq和RDD集合转换为DataFrame
 */
object _03SparkSQLToDF {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象,设置应用名称和master
    val spark: SparkSession = SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[3]")
        .getOrCreate()
    import spark.implicits._
    // 1. 将数据类型为元组的RDD,转换为DataFrame
    val rdd: RDD[(Int, String, String)] = spark.sparkContext.parallelize(
      List((1001, "zhangsan", "male"), (1003, "lisi", "male"), (1003, "xiaohong", "female"))
    )
    // 调用toDF方法,指定列名称,将RDD转换为DataFrame
    val dataframe: DataFrame = rdd.toDF("id", "name", "gender")
    dataframe.printSchema()
    dataframe.show(10, truncate = false)
    println("==========================================================")
    // 定义一个Seq序列,其中数据类型为元组
    val seq: Seq[(Int, String, String)] = Seq(
      (1001, "zhangsan", "male"), (1003, "lisi", "male"), (1003, "xiaohong", "female")
    )
    // 将数据类型为元组Seq序列转换为DataFrame
    val df: DataFrame = seq.toDF("id", "name", "gender")
    df.printSchema()
    df.show(10, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}

10-[了解]-SparkSQL中数据处理方式

在SparkSQL模块中,将结构化数据封装到DataFrame或Dataset集合中后,提供两种方式分析处理数据,正如前面案例【词频统计WordCount】两种方式:

  • 第一种:DSL(domain-specific language)编程

数据分析人员,尤其使用Python数据分析人员

  • 第二种:SQL 编程

尤其DBA和数据仓库分析人员擅长编写SQL语句,采用SQL编程

11-[掌握]-基于DSL分析(函数说明)和SQL分析

  • 基于DSL分析
  • 调用DataFrame/Dataset中API(函数)分析数据,其中函数包含RDD中转换函数和类似SQL
    语句函数,部分截图如下:

  • 基于SQL分析
  • 将Dataset/DataFrame注册为临时视图,编写SQL执行分析,分为两个步骤:

其中SQL语句类似Hive中SQL语句,查看Hive官方文档,SQL查询分析语句语法,官方文档文档:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select

12-[掌握]-电影评分数据分析之需求说明和加载数据

使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6ypUaVpL-1627176341890)(/img/image-20210426105132291.png)]

数据集ratings.dat总共100万条数据,数据格式如下,每行数据各个字段之间使用双冒号分开:

数据处理分析步骤如下:

将分析结果,分别保存到MySQL数据库表中及CSV文本文件中。

首先加载电影评分数据,封装到RDD中

// 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .getOrCreate()
    // 导入隐式转换
    import spark.implicits._
    // TODO: step1. 读取电影评分数据,从本地文件系统读取,封装数据至RDD中
    val ratingRDD: RDD[String] = spark.read.textFile("datas/ml-1m/ratings.dat").rdd
    println(s"Count = ${ratingRDD.count()}")
    println(s"first:\n\t${ratingRDD.first()}")

13-[掌握]-电影评分数据分析之数据ETL

读取电影评分数据,将其转换为DataFrame,使用指定列名方式定义Schema信息,采用toDF函数,代码下:

val ratingDF: DataFrame = ratingRDD
      .filter(line => null != line && line.trim.split("::").length == 4)
      .mapPartitions{iter =>
        iter.map{line =>
          // a. 解析每条数据
          val arr = line.trim.split("::")
          // b. 构建元组对象
          (arr(0).toInt, arr(1).toInt, arr(2).toDouble, arr(3).toLong)
        }
      }
      // c. 调用toDF函数,指定列名称
        .toDF("user_id", "item_id", "rating", "timestamp")
    ratingDF.printSchema()
    ratingDF.show(10, truncate = false)
    println(s"count = ${ratingDF.count()}")

将RDD转换为DataFrame数据集,方便采用DSL或SQL分析数据。

14-[掌握]-电影评分数据分析之SQL分析

首先将DataFrame注册为临时视图,再编写SQL语句,最后使用SparkSession执行,代码如下;

// TODO: step3. 基于SQL方式分析
    /*
      a. 注册为临时视图
      b. 编写SQL,执行分析
     */
    // a. 将DataFrame注册为临时视图
    ratingDF.createOrReplaceTempView("view_temp_ratings")
    // b. 编写SQL,执行分析
    val top10MovieDF: DataFrame = spark.sql(
      """
        |SELECT
        |  item_id, ROUND(AVG(rating), 2) AS avg_rating, COUNT(1) AS cnt_rating
        |FROM
        |  view_temp_ratings
        |GROUP BY
        |  item_id
        |HAVING
        |  cnt_rating >= 2000
        |ORDER BY
        |  avg_rating DESC
        |LIMIT
        |  10
        |""".stripMargin)
    /*
      root
       |-- item_id: integer (nullable = false)
       |-- avg_rating: double (nullable = true)
       |-- count_rating: long (nullable = false)
     */
    top10MovieDF.printSchema()
    /*
      +--------+----------+------------+
      |movie_id|avg_rating|count_rating|
      +--------+----------+------------+
      |318     |4.55      |2227        |
      |858     |4.52      |2223        |
      |527     |4.51      |2304        |
      |1198    |4.48      |2514        |
      |260     |4.45      |2991        |
      |2762    |4.41      |2459        |
      |593     |4.35      |2578        |
      |2028    |4.34      |2653        |
      |2858    |4.32      |3428        |
      |2571    |4.32      |2590        |
      +--------+----------+------------+
     */
    top10MovieDF.show(10, truncate = false)

15-[掌握]-电影评分数据分析之Shuffle分区数

运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。

原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为200,在实际项目中要合理的设置。在构建SparkSession实例对象时,设置参数的值

好消息:在Spark3.0开始,不用关心参数值,程序自动依据Shuffle时数据量,合理设置分区数目

16-[掌握]-电影评分数据分析之DSL分析

调用Dataset中函数,采用链式编程分析数据,核心代码如下:

val resultDF: DataFrame = ratingDF
      // a. 按照电影ID分组
      .groupBy($"item_id")
      // b. 组合数据进行聚合,评分平均值和评分的次数
      .agg(
        round(avg($"rating"), 2).as("avg_rating"), //
        count($"user_id").as("cnt_rating")
      )
      // c. 过滤评分次数大于2000
      .filter($"cnt_rating" > 2000)
      // d. 按照评分平均值降序排序
      .orderBy($"avg_rating".desc)
      // e. 获取前10条数据
      .limit(10)
    resultDF.printSchema()
    resultDF.show(10, truncate = false)

使用需要导入函数库:import org.apache.spark.sql.functions._

使用DSL编程分析和SQL编程分析,哪一种方式性能更好呢?实际开发中如何选择呢???

无论是DSL编程还是SQL编程,性能一模一样,底层转换为RDD操作时,都是一样的:Catalyst

17-[掌握]-电影评分数据分析之保存结果至MySQL

将分析数据保持到MySQL表中,直接调用Dataframe中writer方法,写入数据到MYSQL表中

// TODO: step 4. 将分析结果数据保存到外部存储系统中,比如保存到MySQL数据库表中或者CSV文件中
    resultDF.persist(StorageLevel.MEMORY_AND_DISK)
    // 保存结果数据至MySQL表中
    val props = new Properties()
    props.put("user", "root")
    props.put("password", "123456")
    props.put("driver", "com.mysql.cj.jdbc.Driver")
    resultDF
      .coalesce(1) // 对结果数据考虑降低分区数
      .write
        .mode(SaveMode.Overwrite)
      .jdbc(
        "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", //
        "db_test.tb_top10_movies", //
        props
      )
    /*
    mysql> select * from tb_top10_movies ;
    +---------+------------+------------+
    | item_id | avg_rating | cnt_rating |
    +---------+------------+------------+
    |     318 |       4.55 |       2227 |
    |     858 |       4.52 |       2223 |
    |     527 |       4.51 |       2304 |
    |    1198 |       4.48 |       2514 |
    |     260 |       4.45 |       2991 |
    |    2762 |       4.41 |       2459 |
    |     593 |       4.35 |       2578 |
    |    2028 |       4.34 |       2653 |
    |    2858 |       4.32 |       3428 |
    |    2571 |       4.32 |       2590 |
    +---------+------------+------------+
     */
    // 保存结果数据至CSv文件中
    // 数据不在使用时,释放资源
    resultDF.unpersist()

18-[掌握]-电影评分数据分析之保存结果至CSV文件

将结果DataFrame保存值CSV文件中,文件首行为列名称,核心代码如下:

// 保存结果数据至CSv文件中
resultDF
   .coalesce(1)
   .write
   .mode(SaveMode.Overwrite)
      .option("header", "true")
   .csv("datas/top10-movies")

截图如下所示:

发现,SparkSQL加载数据源数据和保存结果数据,操作非常方便,原因在于:SparkSQL提供强大功能【外部数据源接口】,使得操作数据方便简洁。

附录一、创建Maven模块

1)、Maven 工程结构

2)、POM 文件内容

Maven 工程POM文件中内容(依赖包):

<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    <properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
        <hbase.version>1.2.0-cdh5.16.2</hbase.version>
        <mysql.version>8.0.19</mysql.version>
    </properties>
    <dependencies>
        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Spark SQL 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Spark SQL 与 Hive 集成 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- HBase Client 依赖 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-hadoop2-compat</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <!-- MySQL Client 依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
    </dependencies>
    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

aven 编译的插件 -->



org.apache.maven.plugins

maven-compiler-plugin

3.0


1.8

1.8

UTF-8




net.alchim31.maven

scala-maven-plugin

3.2.0




compile

testCompile







         


相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
13天前
|
机器学习/深度学习 数据采集 DataWorks
数据分析经典案例重现:使用DataWorks Notebook 实现Kaggle竞赛之房价预测,成为数据分析大神!
Python是目前当之无愧的数据分析第一语言,大量的数据科学家使用Python来完成各种各样的数据科学任务。本文以Kaggle竞赛中的房价预测为例,结合DataWorks Notebook,完成数据加载、数据探索、数据可视化、数据清洗、特征分析、特征处理、机器学习、回归预测等步骤,主要Python工具是Pandas和SKLearn。本文中仅仅使用了线性回归这一最基本的机器学习模型,读者可以自行尝试其他更加复杂模型,比如随机森林、支持向量机、XGBoost等。
|
2月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
2月前
|
SQL 数据挖掘 Python
数据分析编程:SQL,Python or SPL?
数据分析编程用什么,SQL、python or SPL?话不多说,直接上代码,对比明显,明眼人一看就明了:本案例涵盖五个数据分析任务:1) 计算用户会话次数;2) 球员连续得分分析;3) 连续三天活跃用户数统计;4) 新用户次日留存率计算;5) 股价涨跌幅分析。每个任务基于相应数据表进行处理和计算。
|
3月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
83 0
|
3月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
105 0
|
3月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
76 0
|
SQL 消息中间件 分布式计算
通过Spark SQL实时归档SLS数据
我在前一篇文章介绍过基于Spark SQL实现对HDFS操作的实时监控报警。今天,我再举例说明一下如何使用Spark SQL进行流式应用的开发。
2573 0
|
4月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
6月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
142 13
|
6月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。