Spark查询Hbase小案例

本文涉及的产品
RDS AI 助手,专业版
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
简介: 写作目的1)正好有些Spark连接HBase的需求,当个笔记本,到时候自己在写的时候,可以看2)根据rowkey查询其实我还是查询了好久才找到,所以整理了一下3)好久没发博客了,水一篇

版本


Scala 2.11.1

Spark 2.11

HBase 2.0.5


代码


其中hbase-site.xml为hbase安装目录下/hbase/conf里的hbase-site.xml


1.png


pom依赖


 <properties>
        <mysql.version>6.0.5</mysql.version>
        <spring.version>4.3.6.RELEASE</spring.version>
        <spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version>
        <log4j.version>1.2.17</log4j.version>
        <quartz.version>2.2.3</quartz.version>
        <slf4j.version>1.7.22</slf4j.version>
        <hibernate.version>5.2.6.Final</hibernate.version>
        <camel.version>2.18.2</camel.version>
        <config.version>1.10</config.version>
        <jackson.version>2.8.6</jackson.version>
        <servlet.version>3.0.1</servlet.version>
        <net.sf.json.version>2.4</net.sf.json.version>
        <activemq.version>5.14.3</activemq.version>
        <spark.version>2.1.1</spark.version>
        <scala.version>2.11.11</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>
<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.4.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>
    </dependencies>


查询


查全表


package com.bjfu.spark.demo.hbasedemo
import com.google.common.collect.Table.Cell
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
  * @author chaird
  * @create 2020-11-13 22:04
  */
object HBaseDemo {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseDemo")
    //创建Spark上下文对象
    val sc = new SparkContext(config)
    //创建HBase配置对象
    val conf: Configuration = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, "EcodataFei")
    //全表扫描
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result])
    hbaseRDD.foreach {
      //第一种展示方式
      case (rowkey, result) => {
        val cells: Array[hbase.Cell] = result.rawCells()
        for (cell <- cells) {
          println(Bytes.toString(CellUtil.cloneValue(cell)))
        }
      }
      第二种展示方式
      //      case (rowkey,result)=>{
      //        //rowKey
      //        val key: String = Bytes.toString(result.getRow)
      //        //列族,列,值
      //        val value: String = Bytes.toString(result.getValue(Bytes.toBytes("datatype"),Bytes.toBytes("288")))
      //        println("rowKey:"+key+"  "+"value:"+value)
      //
      //      }
    }
    //释放资源
    sc.stop()
  }
}


根据rowKey查询


package com.bjfu.spark.demo.hbasedemo
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HTable, Scan}
import org.apache.hadoop.hbase.filter.FilterList.Operator
import org.apache.hadoop.hbase.filter._
import java.util
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
/**
  * @author chaird
  * @create 2020-11-14 20:39
  */
object HBaseConditionDemo {
  def main(args: Array[String]): Unit = {
    //创建Spark上下文对象
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseDemo")
    val sc = new SparkContext(config)
    //创建HBaseConf
    val hbaseConf: Configuration = HBaseConfiguration.create()
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "EcodataFei")
    //创建过滤器(主键)
    val scan = new Scan()
    val rowkeys = List("17-2020-41-12 17:09", "2-2020-37-12 17:09")
    val filters = new util.ArrayList[Filter]()
    for (cookieid <- rowkeys) {
      val filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(cookieid)))
      filters.add(filter)
    }
    val filterList = new FilterList(Operator.MUST_PASS_ONE, filters)
    scan.setFilter(filterList)
    hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan))
    val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hbaseConf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result])
    hbaseRDD.foreach {
      case (rowkey, result) => {
        //rowKey
        val key: String = Bytes.toString(result.getRow)
        //列族,列,值
        val value: String = Bytes.toString(result.getValue(Bytes.toBytes("datatype"), Bytes.toBytes("288")))
        println("rowKey:" + key + "  " + "value:" + value)
      }
    }
    //释放资源
    sc.stop()
  }
  def convertScanToString(scan: Scan) = {
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray)
  }
}


将结果集保存下来


 val list: List[String] = hbaseRDD.map(
      x => (Bytes.toString(x._2.getValue(Bytes.toBytes("datatype"), Bytes.toBytes("customer_id"))))).collect().toList
    list.foreach(println(_))
目录
相关文章
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
285 5
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
213 3
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
415 0
|
SQL 存储 分布式数据库
分布式存储数据恢复—hbase和hive数据库数据恢复案例
分布式存储数据恢复环境: 16台某品牌R730xd服务器节点,每台服务器节点上有数台虚拟机。 虚拟机上部署Hbase和Hive数据库。 分布式存储故障: 数据库底层文件被误删除,数据库不能使用。要求恢复hbase和hive数据库。
520 12
|
SQL 关系型数据库 MySQL
Sqoop【付诸实践 01】Sqoop1最新版 MySQL与HDFS\Hive\HBase 核心导入导出案例分享+多个WRAN及Exception问题处理(一篇即可学会在日常工作中使用Sqoop)
【2月更文挑战第9天】Sqoop【付诸实践 01】Sqoop1最新版 MySQL与HDFS\Hive\HBase 核心导入导出案例分享+多个WRAN及Exception问题处理(一篇即可学会在日常工作中使用Sqoop)
852 7
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
675 0
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
230 1
|
SQL 存储 分布式计算
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
266 0
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
285 0

热门文章

最新文章