Spark与HBase的整合

简介: 之前因为仅仅是把HBase当成一个可横向扩展并且具有持久化能力的KV数据库,所以只用在了指标存储上,参看很早之前的一篇文章基于HBase做Storm 实时计算指标存储。这次将HBase用在了用户行为存储上,因为Rowkey的过滤功能也很不错,可以很方便的把按人或者内容的维度过滤出所有的行为。
前言
之前因为仅仅是把HBase当成一个可横向扩展并且具有持久化能力的KV数据库,所以只用在了指标存储上,参看很早之前的一篇文章 基于HBase做Storm 实时计算指标存储。这次将HBase用在了用户行为存储上,因为Rowkey的过滤功能也很不错,可以很方便的把按人或者内容的维度过滤出所有的行为。从某种意义上,HBase的是一个有且仅有一个多字段复合索引的存储引擎。

虽然我比较推崇实时计算,不过补数据或者计算历史数据啥的,批处理还是少不了的。对于历史数据的计算,其实我是有两个选择的,一个是基于HBase的已经存储好的行为数据进行计算,或者基于Hive的原始数据进行计算,最终选择了前者,这就涉及到Spark(StreamingPro) 对HBase的批处理操作了。
整合过程

和Spark 整合,意味着最好能有Schema(Mapping),因为Dataframe 以及SQL API 都要求你有Schema。 遗憾的是HBase 有没有Schema取决于使用者和场景。通常SparkOnHBase的库都要求你定义一个Mapping(Schema),比如hortonworks的 SHC(https://github.com/hortonworks-spark/shc) 就要求你定义一个如下的配置:

{
"rowkey":"key",
"table":{"namespace":"default", "name":"pi_user_log", "tableCoder":"PrimitiveType"},
"columns":{"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"col1":{"cf":"f","col":"col1", "type":"string"}
}
}

看上面的定义已经还是很容易看出来的。对HBase的一个列族和列取一个名字,这样就可以在Spark的DataSource API使用了,关于如何开发Spark DataSource API可以参考我的这篇文章利用 Spark DataSource API 实现Rest数据源中使用,SHC大体实现的就是这个API。现在你可以这么用了:

 val cat = "{\n\"rowkey\":\"key\",\"table\":{\"namespace\":\"default\", \"name\":\"pi_user_log\", \"tableCoder\":\"PrimitiveType\"},\n\"columns\":{\"col0\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"},\n\"28360592\":{\"cf\":\"f\",\"col\":\"28360592\", \"type\":\"string\"}\n}\n}"
    val cc = sqlContext
      .read
      .options(Map(HBaseTableCatalog.tableCatalog -> cat))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load()
不过当你有成千上万个列,那么这个就无解了,你不大可能一一定义,而且很多时候使用者也不知道会有哪些列,列名甚至可能是一个时间戳。我们现在好几种情况都遇到了,所以都需要解决:
  1. 自动获取HBase里所有的列形成Schema,这样就不需要用户配置了。
  2. 规定HBase只有两个列,一个rowkey,一个 content,content 是一个map,包含所有以列族+列名为key,对应内容为value。

先说说第二种方案(因为其实第一种方案也要依赖于第二种方案):

{
        "name": "batch.sources",
        "params": [
          {
            "inputTableName": "log1",
            "format": "org.apache.spark.sql.execution.datasources.hbase.raw",
            "path": "-",
            "outputTable": "log1"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select rowkey,json_value_collect(content) as actionList from log1",
            "outputTableName":"finalTable"
          }
        ]
      },
首先我们配置了一个HBase的表,叫log1,当然,这里是因为程序通过hbase-site.xml获得HBase的链接,所以配置上你看不到HBase相关的信息。接着呢,在SQL 里你就可以对content 做处理了。我这里是把content 转化成了JSON格式字符串。再之后你就可以自己写一个UDF函数之类的做处理了,从而实现你复杂的业务逻辑。我们其实每个字段里存储的都是JSON,所以我其实不关心列名,只要让我拿到所有的列就好。而上面的例子正好能够满足我这个需求了。

而且实现这个HBase DataSource 也很简单,核心逻辑大体如下:
case class HBaseRelation(
                          parameters: Map[String, String],
                          userSpecifiedschema: Option[StructType]
                        )(@transient val sqlContext: SQLContext)
  extends BaseRelation with TableScan with Logging {

  val hbaseConf = HBaseConfiguration.create()


  def buildScan(): RDD[Row] = {
    hbaseConf.set(TableInputFormat.INPUT_TABLE, parameters("inputTableName"))
    val hBaseRDD = sqlContext.sparkContext.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
      .map { line =>
        val rowKey = Bytes.toString(line._2.getRow)

        import net.liftweb.{json => SJSon}
        implicit val formats = SJSon.Serialization.formats(SJSon.NoTypeHints)

        val content = line._2.getMap.navigableKeySet().flatMap { f =>
          line._2.getFamilyMap(f).map { c =>
            (Bytes.toString(f) + ":" + Bytes.toString(c._1), Bytes.toString(c._2))
          }
        }.toMap

        val contentStr = SJSon.Serialization.write(content)

        Row.fromSeq(Seq(UTF8String.fromString(rowKey), UTF8String.fromString(contentStr)))
      }
    hBaseRDD
  }
}
那么我们回过头来,如何让Spark自动发现Schema呢?大体你还是需要过滤所有数据得到列的合集,然后形成Schema的,成本开销很大。我们也可以先将我们的数据转化为JSON格式,然后就可以利用Spark已经支持的JSON格式来自动推倒Schema的能力了。

总体而言,其实并不太鼓励大家使用Spark 对HBase进行批处理,因为这很容易让HBase过载,比如内存溢出导致RegionServer 挂掉,最遗憾的地方是一旦RegionServer 挂掉了,会有一段时间读写不可用,而HBase 又很容易作为实时在线程序的存储,所以影响很大。
相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
8月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
171 0
|
8月前
|
分布式计算 分布式数据库 API
Spark与HBase的集成与数据访问
Spark与HBase的集成与数据访问
|
分布式计算 分布式数据库 Scala
Spark查询Hbase小案例
写作目的 1)正好有些Spark连接HBase的需求,当个笔记本,到时候自己在写的时候,可以看 2)根据rowkey查询其实我还是查询了好久才找到,所以整理了一下 3)好久没发博客了,水一篇
216 0
Spark查询Hbase小案例
|
分布式计算 数据处理 分布式数据库
《基于HBase和Spark构建企业级数据处理平台》电子版地址
基于HBase和Spark构建企业级数据处理平台
116 0
《基于HBase和Spark构建企业级数据处理平台》电子版地址
|
分布式计算 Hadoop Linux
云计算集群搭建记录[Hadoop|Zookeeper|Hbase|Spark | Docker]更新索引 |动态更新
为了能够更好的查看所更新的文章,讲该博文设为索引 小约定 为了解决在编辑文件等操作的过程中的权限问题,博主一律默认采用root账户登录 对于初次安装的用户可以采用如下命令行:
152 0
云计算集群搭建记录[Hadoop|Zookeeper|Hbase|Spark | Docker]更新索引 |动态更新
|
SQL 分布式计算 关系型数据库
|
分布式计算 安全 Shell
Maxcompute Spark 访问 阿里云 Hbase
引子 本来这个东西是没啥好写的,但是在帮客户解决问题的时候,发现链路太长,不能怪客户弄不出来,记录一下 需求列表 MaxCompute Spark包 (写文章时刻为版本 0.32.1, 请自行更新,本文不是文档) Spark 配置 spark.
Maxcompute Spark 访问 阿里云 Hbase
|
分布式计算 Spark
spark访问hbase
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark.rdd.NewHadoopRDD val conf = HBaseConfigurat
1690 0
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
203 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
85 0