开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第三阶段):KuduSpark_CRUD】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/678/detail/11785
KuduSpark_CRUD
刚才简单讲了一些针对于 kudu 的一些表的 DDL 的操作,接下来介绍如何针对于 kudu 表当中的数据进行操作,即数据操作。
在数据操作当中也要讲三个知识点,其实就是三大个知识点。第一个,数据的增,第二个,表中数据的删,第三个,改。其实就是三个知识点,还有一种操作,如果没有则添加,如果有则改,其实就是一个增改。
一、增删改
四个 API 的调用方式,其实一个叫 insertRows,一个叫 updateRows,一个叫 upsertRows,一个叫 deleteRows。
接下来打开代码,在代码当中创建一个新的方法,这个方法名字叫做 crud,就是增删改查,
@Test
def crud(): Unit = {
// 1.创建 KuduContext 和 SparkSession
val spark = SparkSession.builder()
.master( master = "1ocal[6]")
.appName( name = "kudu")
.getorcreate()
Val KUDU_ MASTERS = "192.168.169.101:7051,192.168.169.102:7051,192.168.169.103:7051 ,"
val kuduContext = new KuduContext(KUDU_ MASTERS,spark.sparkContext)
//2.增
import spark.implicits._
val df = Seq(student("zhangsan",15,60.1),student("lisi",10,50.6)). toDF()
val TABLE NAME = "students"
kuduContext.insertRows(df, TABLE_ NAME)
//3.删
kuduContext.deleteRows(df, TABLE_ NAME)
// 4.增改
kuduContext.upsertRows(df, TABLE_ NAME)
//5.改
kuduContext.updateRows(df, TABLE_ NAME|)
}
}
case class Student(name: String, age: Int, gpa: Double)
第一步,还是要创建 kuducontext 和 sparksession,这一步直接拷过来,接下来看一看增,增加一条数据,然后第三步,看一看如何删,第四步,看一看如何增改,第五步,看一看如何改,这就是大致内容。
接下来先去增,增加一条数据,如果要去增加一条数据,这个时候就要使用 kuducontext 当中的一个 API 叫做 insert,但是这个 insert 有一点特殊的地方要注意,这个特殊的地方就是和以往 kudu 的 Java API 不一样,kudu 的context 因为是和 spark 的整合,所以它在进行 insertRows 或者在进行 updateRows、deleteRows 的时候,接收的全是 DataFrame。
在 insertRows 当中,接收了两个参数,第一个参数就是 dataframe,就是保存数据的一个 dataframe,然后是表名,叫做 TABLE_NAME,这个表名叫做 students,这是刚才已经创建过的表,把 TABLE_NAME 传进来,接下来创建 df,最简单的方式是引入 spark.implicits._,这个 API 虽然好久没有用,但是这个 implicits 是一个隐式转换,能够在比如 seq 或者 list 上使用 to df 这个方法,可以直接拿到一个 df,这个 df 可以由一个 Seq 创建,那么就可以完全创建出来几个对象,比如先去创建一个 student 的对象,case class student,第一项是 name,对应的是 string,第二项是 age,对应的是 int,第三项是 gpa,对应的是 double,这就是 student 这样的一个类的创建。
这个类创建好了以后,在 seq 当中接收 student,名字叫 zhangsan,年龄15岁,成绩60.1。第二项是一个 student,对应的是 lisi,年龄10岁然后,成绩50.6,这个时候创建出来 seq 以后就可以 toDF,去创建 dataframe,接下来可以把 dataframe 插入到 kudu 表当中,既然能插入到 kudu 表当中,那么肯定可以删除掉这张表,删除这些数据,那么可以直接 delete df,然后把 TABLE_NAME 也传进来。
除了 delete 以外,还可以判断一下,先增加了数据以后,就把它给删掉,删掉以后可以再去增加一个东西,但是增已经调用过一次,就调一下增改,如果它里面有数据就进行修改,如果没有数据,就进行增加,就是 upsert,其实就是 update 加上 insert,那么就是 df 把 TABLE_NAME 传进来,除了增改以外,还有一个操作叫改,改就是 update,df 把 TABLE_NAME 传进来,这就是整个的代码。
现在去运行这段代码,会发现没有任何问题,没有报任何错,说明这个内容是已经添加进去了。但是,这个地方报了一个小错误
意思是不能去写入一个行,然后 from dateframe to kudu,它所说的这个问题就是在 delete 的时候必须只有一个列,不能多有一个 age 列。在 delete 的时候,看一下这个代码,是把整个 dataframe 传进来了,但是 Delete 的时候它要一个 dataframe 当中只能给一个列,这一列必须是主键的一个列,比如在创建这张表的时候,提供了 keys 这样的属性,在删除的时候,dataframe 当中只能有 keys 列,不能包含其他的列,应该在 deleteRosews 对 df 做一个select,可以$去其中的某一列,name 是主键,这一步做完以后再去运行一下,还是会碰见另外一个错误,但是这两个错都很重要,大家一定会遇到。
第一个,就是在删除的时候,这个数据集当中只能有主键列。
第二个,报的是行还是不能写入 rows from dataframe to kudu,看一下简单的错误信息,Already present,就是在插入数据的时候,这个 key 已经存在了,现在插不进去。在讲 kudu 插入原理的时候,说过当插入一条数据的时候,kudu 先会去判断,先会去查找其他节点里面是否有这一条数据,如果有,它会抛一个错,就是 key already present。在 kudu 当中,是不允许一个 key 出现两次的,key 是唯一的。刚才运行了一次 insert 它里面已经有了,再运行一遍就不行了。可以先运行一下第一个程序,先把表删了,然后再去运行刚才这个程序,这样就没问题了。
这个时候,已经把表删了,重新创建了,再去运行一下 CRUD,看是否会报错。
这一步,一定会成功,整个结果已经正确了,测试已经完成了、通过了,这就是 kudu 对数据的 CRUD。




