KuduSpark_DDL | 学习笔记

简介: 快速学习 KuduSpark_DDL

开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第三阶段) KuduSpark_DDL学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/678/detail/11784


KuduSpark_DDL


内容介绍:

一、准备工作

二、DDL

 

刚才介绍了如何使用 Java API 来操作 kudu,接下来介绍如何使用 spark 来操作 kuduKudu 是介于 OLTP OLAP 之间的一个系统,它拥有这两个数据库系统的优势,能够快速的插入,也能够进行大规模的数据分析,但是 kudu 也是一个大数据系统,它能够存储大量的数据,甚至是以 TB 来去计算的数据,所以和 kudu 进行配合,一般情况下可能会使用 spark 这样的操作,也有一部分是使用 Java API,因为有可能需要在 web 程序里面向 kudu 中去插数据,但是更多见的还是使用 spark 来操作 kudu。这样,在 spark 操作这一部分就要简单说几个知识点,第一个知识点,如果想使用 spark kudu 来进行整合,需要做一些前期工作。接下来介绍如何使用 spark 操作 kudu 的表去创建表、去删除表、去判断表是否存在,这个操作就称之为 DDL

DDL 是对于模式的定义,在 DDL 当中要讲三个小知识点,第一个是创建表,第二个是删除表,第三个是判断表是否存在。

 

一、准备工作

首先,准备工作上来看,如果想使用 spark kudu 进行整合,要导入一些 maven 的依赖。需要什么样的 maven 依赖呢?第一个,需要使用到 kudu-client,因为无论怎么去操作 kudu,这个 SDK 是不会变的,使用的客户端是不会变的,不过有可能 kudu-client 在其之上做了一层封装,提供了 spark API。这个上层封装叫 kudu-sparkkudu-spark 这样的一个maven 库,主要目的是让 kudu spark 进行整合,把 kudu 的操作能够融入到 spark 当中,这就是 kudu-spark 这个 API 的作用。然后要去使用到 spark 全家桶,既要用到  spark-core,也要用到  spark-sql,也要用到 spark-hive,如果要使用 spark,还需要去导入 Scala 的基础库,这就是 maven 里面可能需要的一些内容。

 

二、DDL

DDL 操作,按顺序大概有如下几个步骤。创建一个 kudu 表,但是一般情况下,如果要在 hive 当中去判断表、去创建表的时候,会写create table if not exist,会先去判断这个表是否存在。第一个操作就是通过 tableExists 判断表是否存在,假如这个表,在大数据系统当中和以往的关系型数据库是有区别的,大数据系统的数据库表一般是可以回溯的,比如在大数据系统当中创建了用于分析的一个表,这个表可能来源前面的一些其他的表,比如有可能是用户表,以及销售表,然后这两个表集合起来,变成了一个分析表,这张分析表是可以删除的,如果把这个分析表删除了,那么它里面数据有一些问题,或者这张表本身有一些问题,删除掉它以后,还是依然可以通过用户表和销售表重新创建一个分析表,所以在大数据系统当中,通过 delete table 删除某一张表是可以的。

接下来可以来做第三步操作,第三步操作就是使用 createTable 创建表,但是创建表的时候需要用到很多东西,其中有几个东西是没有办法忽略的,主要就是这三项,第一项,一定要提供创建表的 schema 信息,第二点,一定要提供创建表的主键,用哪个键来做分区,第三列,必须要给定访问表的一些参数,比如如何分区、一些附加信息,比如 factor 就是复制因子是多少,一个表要复制多少份一个 tablet 复制多少份。在做所有操作之前,需要先去配置 kudu spark,使用 kudu 配合 spark 进行操作,有一个 API 是必须要创建的,叫做 KuduContext,接下来,就这些内容来写一些代码,测试并运行,看一看怎么操作。

第一步,拷贝这个 maven 库,

<!--Spark -->

<dependency>

<groupId>org. scala-lang</groupId>

<artifactId>scala-library</artifactId>

<version>2.11.8</version>

</ dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core 2.11</artifactId>

<version>2.2.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sqL_ 2.11</artifactId>

<version>2.2.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-hive_ 2.11</artifactId>

<version>2.2.0</version>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-client</artifactId>

<version>2.6.0</version>

</dependency>

<!-- Kudu Spark -->

<dependency>

<groupId>org.apache.kudu</groupId>

<artifactId>kudu-spark2 2.11</artifactId>

<version>1.7.0-cdh5.16.1</version>

</dependency>

之后放到工程当中,打开 pom 文件,放在 dependency 节点下,其实就是一些 spark 的依赖,然后再加上一个 kudu spark 这样的一个包,这样 kudu spark 的整合就已经做完了。

接下来创建新的 scala class,命名为 KuduSparkAPI

image.png

然后等待这个类的创建,创建好了以后,现在要做一个方法。

@Test

def ddl(): Unit = {

// 1.创建 KuduContext SparkSession

val spark = SparkSession.builder()

.master( master = "1oca1[6]")

.appName( name = "kudu")

.getorCreate( )

val KUDU_ MASTERS = 192.168.169.101:7051,192.168.169. 102:7051,192.168.169.103:7051,

val kuduContext = new KuduContext (KUDU_ MASTERS, spark. sparkContext)

// 2.判断表是否存在,如果存在则删除表

val TABLE_ NAME = "students"

if (kuduContext.tableExists(TABLE_ NAME)) {

kuduContext.deleteTable(TABLE_ NAME)

}

//3.创建一张 Kudu

val schema = StructType(

structField("name", stringType, nullable = false)::

StructField("age", IntegerType, nullable = false)::

StructField("gpa", DoubleType, nullable = false)::Nil

)

val keys = Seq("name" )

import scala.collection. JavaConverters._

val options = new CreateTableOptions()

.setRangePartitionColumns(List("name").asJava)

.setNumReplicas(1)

kuduContext.createTable(tableName = TABLE_ NAME ,

schema = schema,

keys = keys,

options = options)

}

}

要去做一些和表相关的 DDL 操作,直接把方法名命名为 DDL,第一步,在做所有操作之前要先创建 KuduContext SparkSession,因为操作 kudu spark 的整合就必须要用到 KuduContext,操作 spark 就必须要用到 SparkSession,这两个是必须要去创建的。第二步,要去判断表是否存在,如果存在则删除表,第三步,创建一张 kudu 表,这就是所做的三大步操作。有一点需要声明,KuduContext 在这一小节一定是用到的,SparkSession 并不会直接用到,但是还是要给它创建出来,因为它是基础。

接下来先去做第一个,创建 SparkSession,直接 SparkSession,然后 builderbuilder 以后可以设置 master,设置 master 里面传入 local6,然后设置 appName,这个 appName 叫做 kudugetOrCreate,这个时候这行代码就写完了,整理一下格式,一定要注意自己的格式,因为如果把这个代码写好了,其实不是浪费时间,这个程序你的代码大部分时间是给人看的,偶尔放在机器里面运行,所以要善待自己,因为看这份代码的人最多的还是自己。

要创建 KuduContext,拿到 KuduContext new 一个 KuduContext,之后需要在里面传入两个参数,第一个,一定要传入 KUDU_MASTERS,说明是有多个 master,第二个,要去传入 spark sparkcontext,这是两个必须要传的参数。接下来就创建出来 KUDU_MASTERS,它就是一个字符串,这个字符串叫做 cdh01,但是还是写192.168.169.101:7051 这样的 IP 地址,然后复制三个这样的字符串,分别把第二项改为102,第三项改为103KUDU_MASTERS 创建好了,KuduContext 也创建好了。

If 一下判断表是否存在,判断 kuduContext,然后判断 tableExists,需要指定一个表名,叫做 TABLE_NAME,暂且命名为 students,一个学生表,这个时候把 TABLE_NAME 传进来,这时就判断了表名是否存在,如果存在,tableExist 会返回 true,会进入到里面的这一部分,如果这个表存在,要去删除这张表,使用 kuduContext 可以deleteTable,然后 deleteTable 里面也要传入表名叫做 TABLE_NAME,这时就做完了第二步操作,判断表是否存在,如果存在则删除表。

接下来要去创建一张新的表,如果创建一张新的表 API kuduContext.createTable 注意判断表是否存在就是 tableExists,删除表就是 deleteTable,创建一张表就是 createTablecreateTable 当中接收三个参数,第一个参数就是 tableName,就是 TABLE_NAME,接下来逗号,然后另起一行写第二个参数,第二个参数叫做 schema,这个 schema 现在暂时给不了,然后再另起一行写第三个参数,第三个参数就是这个主键列,叫做 key,主建列暂时先放在这,然后第四个参数是 table options,现在把它补全,所以把这给一个空 null,然后这个主键列应该叫做 prime key,第四个参数是一个 create table options,这个 options 也先放在这,现在需要创建的参数有三个,第一个是 schema,第二个是 Key,第三个是 options,但是对于 key,它是多个 key,有可能 kudu 当中会有多个主键列,所以把这指定为 keys,这就是内容,就分别创建出来没有的这三个东西,第一个东西称之为 val schema,第二个是需要一个 keys,第三个是需要一个 options,分别就要把这三个内容创建出来,如果是这样的话,这个 schema 非常简单就可以创建了,现在是和 spark 进行交互,所以这个 schema 还是要给 spark 的这个 schema,给出一个 StructType,这个 StructType 就是 spark当中的 schema 的定义,StructType 当中可以给出 StructField,这个 StructField 当中可以去接收一些首先列名,给出一个 name 列。

然后,是 string 类型的,就给出 StringType,然后,第三个它不可以为空,nullable = false,然后可以去拼接一个 StructField,后面跟上第二列是 age 列,age 列有了列名以后它是 IntegerType 的类型,接下来它不可以为空,然后再去拼接下一个 StructField,然后其中可以给出第三列的列名叫做 gpa,原来那个数据集其中有一项是平均成绩,就是一个 double 类型的 doubleType,肯定还是不可以为空,然后就可以再去拼接一个 Nil,在这个 StructType 内部就拼接出来了一个 list,这就是创建方式。Keys 比较简单,要指定出来有几列的数据作为主键,在这个里面可以只给一列,把 name 当做主键,然后 options 当中需要去指定几个参数,第一个参数,new 出来一个 createTableOptions,接下来设置 set

可以使用 RangePartitionColumns 这样的方式来进行分区,可以指定按照哪一列来进行分区,它要求接收的是一个 list,但是这个 list scala 当中的 list 不是同一个 list,所以,要把 scala 当中的 list 转为 java 当中的 list,这一步要去导入一个隐式转换,引入scala 中的 collection ,然后引入collection 当中的 JavaConverters._,在这个 list 上可以调用 as.java 来去转为 java list,再接下来,就可以设置 NumReplicas,就是这个复制因子设置为1,因为现在根本没有3 tablet server,接下来 schema 传进来,然后下一项 keys 也传进来,再下一项,options 也传进来,这个时候 kudu 的表就创建出来了。

接下来就一起去运行一下这段代码

image.png

运行完毕以后,没有发现任何问题,也没有任何报错信息,说明这个表已经创建出来了。

image.png

可以打开浏览器,在浏览器地址栏里面输入101,端口是8051,打开 master 这样一个界面,选择 Tables,就能发现students 这张表已经创建出来了。

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
相关文章
|
4天前
|
数据采集 人工智能 安全
|
13天前
|
云安全 监控 安全
|
5天前
|
自然语言处理 API
万相 Wan2.6 全新升级发布!人人都能当导演的时代来了
通义万相2.6全新升级,支持文生图、图生视频、文生视频,打造电影级创作体验。智能分镜、角色扮演、音画同步,让创意一键成片,大众也能轻松制作高质量短视频。
1092 152
|
18天前
|
机器学习/深度学习 人工智能 自然语言处理
Z-Image:冲击体验上限的下一代图像生成模型
通义实验室推出全新文生图模型Z-Image,以6B参数实现“快、稳、轻、准”突破。Turbo版本仅需8步亚秒级生成,支持16GB显存设备,中英双语理解与文字渲染尤为出色,真实感和美学表现媲美国际顶尖模型,被誉为“最值得关注的开源生图模型之一”。
1768 9
|
10天前
|
人工智能 自然语言处理 API
一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸
一句话生成拓扑图!next-ai-draw-io 结合 AI 与 Draw.io,通过自然语言秒出架构图,支持私有部署、免费大模型接口,彻底解放生产力,绘图效率直接爆炸。
700 152
|
12天前
|
人工智能 安全 前端开发
AgentScope Java v1.0 发布,让 Java 开发者轻松构建企业级 Agentic 应用
AgentScope 重磅发布 Java 版本,拥抱企业开发主流技术栈。
663 13
|
7天前
|
SQL 自然语言处理 调度
Agent Skills 的一次工程实践
**本文采用 Agent Skills 实现整体智能体**,开发框架采用 AgentScope,模型使用 **qwen3-max**。Agent Skills 是 Anthropic 新推出的一种有别于mcp server的一种开发方式,用于为 AI **引入可共享的专业技能**。经验封装到**可发现、可复用的能力单元**中,每个技能以文件夹形式存在,包含特定任务的指导性说明(SKILL.md 文件)、脚本代码和资源等 。大模型可以根据需要动态加载这些技能,从而扩展自身的功能。目前不少国内外的一些框架也开始支持此种的开发方式,详细介绍如下。
455 5