版本说明:
hbase版本:hbase-1.3.1
spark版本:spark-2.4.7-bin-hadoop2.7
一、Spark与HBase的集成
背景:
Spark支持多种数据源,但是Spark对HBase的读写都没有相对优雅的api,但spark和HBase整合的场景又比较多,故通过spark的数据源API自己实现了一套比较方便操作HBase的API。
数据模型:
row,addres,age,username 001,guangzhou,20,alex 002,shenzhen,34,jack 003,beijing,23,lili
需求分析:
通过spark读取hbase中的数据或者将数据写入到hbase中。
添加配置:
在pom.xml文件中添加如下配置:
<!-- hbase依赖包 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency>
代码实现:
package com.kfk.spark.sql import com.kfk.spark.common.CommSparkSessionScala import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.sql.SparkSession /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/10 * @time : 8:53 下午 */ object HBaseSpark { def main(args: Array[String]): Unit = { // configuration val spark = CommSparkSessionScala.getSparkSession() val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.property.clientPort","2181") hbaseConf.set("hbase.zookeeper.quorum","bigdata-pro-m04") hbaseConf.set("hbase.master","bigdata-pro-m04:60010") // get getHBase(hbaseConf,spark) // write writeHBase(hbaseConf,spark) } /** * 读取hbase中的数据 * @param hbaseConf * @param spark */ def getHBase(hbaseConf : Configuration,spark : SparkSession): Unit ={ // 获取表名 hbaseConf.set(TableInputFormat.INPUT_TABLE,"stu") // 将hbase中的数据转换成rdd val hbaseRDD = spark.sparkContext.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) // 打印数据 hbaseRDD.foreach(result => { val key = Bytes.toString(result._2.getRow) val addres = Bytes.toString(result._2.getValue("info".getBytes(),"addres".getBytes())) val age = Bytes.toString(result._2.getValue("info".getBytes(),"age".getBytes())) val username = Bytes.toString(result._2.getValue("info".getBytes(),"username".getBytes())) println("row key:" + key + " addres=" + addres + " age=" + age + " username=" + username)hashCode() /** * row key:001 addres=guangzhou age=20 username=alex * row key:002 addres=shenzhen age=34 username=jack * row key:003 addres=beijing age=23 username=lili */ }) } /** * 将数据写入到hbase * @param hbaseConf * @param spark */ def writeHBase(hbaseConf : Configuration,spark : SparkSession): Unit ={ // 初始化job,设置输出格式,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的 val jobConf = new JobConf(hbaseConf) jobConf.setOutputFormat(classOf[TableOutputFormat]) // 获取表名 jobConf.set(TableOutputFormat.OUTPUT_TABLE,"stu") // 准备数据 val array = Array("004,shanghai,25,jone", "005,nanjing,31,cherry", "006,wuhan,18,pony") val rdd = spark.sparkContext.makeRDD(array) // 将写入到hbase的数据转换成rdd val saveRDD = rdd.map(line => line.split(",")).map(x => { /** * 一个Put对象就是一行记录,在构造方法中指定主键 * 所有插入的数据 须用 org.apache.hadoop.hbase.util.Bytes.toBytes 转换 * Put.addColumn 方法接收三个参数:列族,列名,数据 */ val put = new Put(Bytes.toBytes(x(0))) put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("addres"),Bytes.toBytes(x(1))) put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(x(2))) put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("username"),Bytes.toBytes(x(3))) (new ImmutableBytesWritable,put) }) // 写入到hbase中 saveRDD.saveAsHadoopDataset(jobConf) } }
运行结果:
row key:001 addres=guangzhou age=20 username=alex row key:002 addres=shenzhen age=34 username=jack row key:003 addres=beijing age=23 username=lili row key:004 addres=shanghai age=25 username=jone row key:005 addres=nanjing age=31 username=cherry row key:006 addres=wuhan age=18 username=pony
本来源数据只有前三行,通过写入后三行,再打印出结果。
二、Spark SQL与HBase的集成
Spark SQL与HBase集成,其核心就是Spark Sql通过hive外部表来获取HBase的表数据。
将hbase、hive、mysql相关jar包拷贝到spark的jars目录下
hbase: hbase-client-1.3.1.jar hbase-common-1.3.1.jar hbase-protocol-1.3.1.jar hbase-server-1.3.1.jar metrics-core-2.2.0.jar hive: hive-hbase-handler-2.3.3.jar htrace-core-3.1.0-incubating.jar mysql: mysql-connector-java-5.1.48-bin.jar
创建与HBase集成的Hive的外部表:
CREATE EXTERNAL TABLE stu( id string, addres string, age string, username string) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ( "hbase.columns.mapping" = ":key,info:addres,info:age,info:username") TBLPROPERTIES ("hbase.table.name" = "stu");