Spark与HBase的集成

简介: 笔记

版本说明:


hbase版本:hbase-1.3.1

spark版本:spark-2.4.7-bin-hadoop2.7


一、Spark与HBase的集成


背景:

Spark支持多种数据源,但是Spark对HBase的读写都没有相对优雅的api,但spark和HBase整合的场景又比较多,故通过spark的数据源API自己实现了一套比较方便操作HBase的API。


数据模型:

row,addres,age,username
001,guangzhou,20,alex
002,shenzhen,34,jack
003,beijing,23,lili

需求分析:

通过spark读取hbase中的数据或者将数据写入到hbase中。

添加配置:

在pom.xml文件中添加如下配置:

<!-- hbase依赖包 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

代码实现:

package com.kfk.spark.sql
import com.kfk.spark.common.CommSparkSessionScala
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/10
 * @time : 8:53 下午
 */
object HBaseSpark {
    def main(args: Array[String]): Unit = {
        // configuration
        val spark = CommSparkSessionScala.getSparkSession()
        val hbaseConf = HBaseConfiguration.create()
        hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
        hbaseConf.set("hbase.zookeeper.quorum","bigdata-pro-m04")
        hbaseConf.set("hbase.master","bigdata-pro-m04:60010")
        // get
        getHBase(hbaseConf,spark)
        // write
        writeHBase(hbaseConf,spark)
    }
    /**
     * 读取hbase中的数据
     * @param hbaseConf
     * @param spark
     */
    def getHBase(hbaseConf : Configuration,spark : SparkSession): Unit ={
        // 获取表名
        hbaseConf.set(TableInputFormat.INPUT_TABLE,"stu")
        // 将hbase中的数据转换成rdd
        val hbaseRDD =  spark.sparkContext.newAPIHadoopRDD(hbaseConf,
            classOf[TableInputFormat],
            classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
            classOf[org.apache.hadoop.hbase.client.Result])
        // 打印数据
        hbaseRDD.foreach(result => {
            val key = Bytes.toString(result._2.getRow)
            val addres = Bytes.toString(result._2.getValue("info".getBytes(),"addres".getBytes()))
            val age = Bytes.toString(result._2.getValue("info".getBytes(),"age".getBytes()))
            val username = Bytes.toString(result._2.getValue("info".getBytes(),"username".getBytes()))
            println("row key:" + key + " addres=" + addres + " age=" + age + " username=" + username)hashCode()
            /**
             * row key:001 addres=guangzhou age=20 username=alex
             * row key:002 addres=shenzhen age=34 username=jack
             * row key:003 addres=beijing age=23 username=lili
             */
        })
    }
    /**
     * 将数据写入到hbase
     * @param hbaseConf
     * @param spark
     */
    def writeHBase(hbaseConf : Configuration,spark : SparkSession): Unit ={
        // 初始化job,设置输出格式,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的
        val jobConf = new JobConf(hbaseConf)
        jobConf.setOutputFormat(classOf[TableOutputFormat])
        // 获取表名
        jobConf.set(TableOutputFormat.OUTPUT_TABLE,"stu")
        // 准备数据
        val array = Array("004,shanghai,25,jone",
                          "005,nanjing,31,cherry",
                          "006,wuhan,18,pony")
        val rdd = spark.sparkContext.makeRDD(array)
        // 将写入到hbase的数据转换成rdd
        val saveRDD = rdd.map(line => line.split(",")).map(x => {
            /**
             * 一个Put对象就是一行记录,在构造方法中指定主键
             * 所有插入的数据 须用 org.apache.hadoop.hbase.util.Bytes.toBytes 转换
             * Put.addColumn 方法接收三个参数:列族,列名,数据
             */
            val put = new Put(Bytes.toBytes(x(0)))
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("addres"),Bytes.toBytes(x(1)))
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(x(2)))
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("username"),Bytes.toBytes(x(3)))
            (new ImmutableBytesWritable,put)
        })
        // 写入到hbase中
        saveRDD.saveAsHadoopDataset(jobConf)
    }
}

运行结果:

row key:001 addres=guangzhou age=20 username=alex
row key:002 addres=shenzhen age=34 username=jack
row key:003 addres=beijing age=23 username=lili
row key:004 addres=shanghai age=25 username=jone
row key:005 addres=nanjing age=31 username=cherry
row key:006 addres=wuhan age=18 username=pony

本来源数据只有前三行,通过写入后三行,再打印出结果。


二、Spark SQL与HBase的集成


Spark SQL与HBase集成,其核心就是Spark Sql通过hive外部表来获取HBase的表数据。


将hbase、hive、mysql相关jar包拷贝到spark的jars目录下

hbase:
hbase-client-1.3.1.jar
hbase-common-1.3.1.jar
hbase-protocol-1.3.1.jar
hbase-server-1.3.1.jar
metrics-core-2.2.0.jar
hive:
hive-hbase-handler-2.3.3.jar
htrace-core-3.1.0-incubating.jar
mysql:
mysql-connector-java-5.1.48-bin.jar

创建与HBase集成的Hive的外部表:

CREATE EXTERNAL TABLE stu(
id string,
addres string,
age string,
username string) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
"hbase.columns.mapping" = 
":key,info:addres,info:age,info:username") 
TBLPROPERTIES ("hbase.table.name" = "stu");


相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
105 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
62 1
|
7月前
|
分布式计算 API Apache
Spark与Elasticsearch的集成与全文搜索
Spark与Elasticsearch的集成与全文搜索
|
7月前
|
SQL 分布式计算 大数据
Paimon 与 Spark 的集成(二):查询优化
通过一系列优化,我们将 Paimon x Spark 在 TpcDS 上的性能提高了37+%,已基本和 Parquet x Spark 持平,本文对其中的关键优化点进行了详细介绍。
118314 30
|
7月前
|
SQL 分布式计算 大数据
Spark 的集成
Spark 的集成
76 2
|
7月前
|
存储 缓存 分布式计算
Spark与云存储的集成:S3、Azure Blob Storage
Spark与云存储的集成:S3、Azure Blob Storage
|
7月前
|
消息中间件 分布式计算 Kafka
Spark与Kafka的集成与流数据处理
Spark与Kafka的集成与流数据处理
|
3月前
|
分布式计算 Java Hadoop
java使用hbase、hadoop报错举例
java使用hbase、hadoop报错举例
111 4
|
2月前
|
分布式计算 Hadoop Shell
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
76 4
|
2月前
|
SQL 分布式计算 Hadoop
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
34 3