大数据技术之HBase2

简介: 大数据技术之HBase2

3 HBase API

3.1、环境准备

新建项目后在pom.xml 中添加依赖

注意:会报错javax.el 包不存在,是一个测试用的依赖,不影响使用

<dependencies>
    <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>2.4.11</version>
        <exclusions> 
            <exclusion>
                <groupId>org.glassfish</groupId>
                <artifactId>javax.el</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.glassfish</groupId>
        <artifactId>javax.el</artifactId>
        <version>3.0.1-b06</version>
    </dependency>
</dependencies>

3.2、创建连接

根据官方API 介绍,HBase 的客户端连接由ConnectionFactory 类来创建,用户使用完成之后需要手动关闭连接。同时连接是一个重量级的,推荐一个进程使用一个连接,对HBase 的命令通过连接中的两个属性Admin 和Table 来实现


3.2.1、单线程创建连接

package com.atguigu.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection; 
import  org.apache.hadoop.hbase.client.Connection; 
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class HBaseConnect {
 public static void main(String[] args) throws IOException {
     // 1. 创建配置对象
     Configuration conf = new Configuration();
     // 2. 添加配置参数
    conf.set("hbase.zookeeper.quorum","hadoop102,hadoop103,hadoop104");
     // 3. 创建 hbase 的连接
     // 默认使用同步连接
     Connection connection = ConnectionFactory.createConnection(conf);
     // 可以使用异步连接
     // 主要影响后续的 DML 操作
     CompletableFuture<AsyncConnection> asyncConnection = ConnectionFactory.createAsyncConnection(conf);
     // 4. 使用连接
     System.out.println(connection);
     // 5. 关闭连接
     connection.close();
     }
}


3.2.2、多线程创建连接

使用类单例模式,确保使用一个连接,可以同时用于多个线程。

 package com.atguigu;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class HBaseConnect {
     // 设置静态属性 hbase 连接
     public static Connection connection = null;
     static {
         // 创建 hbase 的连接
         try {
             // 使用配置文件的方法
             connection = ConnectionFactory.createConnection();
         } catch (IOException e) {
             System.out.println("连接获取失败");
             e.printStackTrace();
         }
     }
     /**
     * 连接关闭方法,用于进程关闭时调用
     * @throws IOException
     */
     public static void closeConnection() throws IOException {
         if (connection != null) {
             connection.close();
         }
     }
}

在 resources 文件夹中创建配置文件 hbase-site.xml,添加以下内容

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
     <property>
         <name>hbase.zookeeper.quorum</name>
         <value>hadoop102,hadoop103,hadoop104</value>
     </property>
</configuration>


3.3 、DDL

创建 HBaseDDL 类,添加静态方法即可作为工具类

public class HBaseDDL {
     // 添加静态属性 connection 指向单例连接
     public static Connection connection = HBaseConnect.connection;
}

3.3.1 、创建命名空间

/**
 * 创建命名空间
 * @param namespace 命名空间名称
 */
 public static void createNamespace(String namespace) throws IOException {
     // 1. 获取 admin
     // 此处的异常先不要抛出 等待方法写完 再统一进行处理
     // admin 的连接是轻量级的 不是线程安全的 不推荐池化或者缓存这个连接
     Admin admin = connection.getAdmin();
     // 2. 调用方法创建命名空间
     // 代码相对 shell 更加底层 所以 shell 能够实现的功能 代码一定能实现
     // 所以需要填写完整的命名空间描述
     // 2.1 创建命令空间描述建造者 => 设计师
     NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(namespace);
     // 2.2 给命令空间添加需求
     builder.addConfiguration("user","atguigu");
     // 2.3 使用 builder 构造出对应的添加完参数的对象 完成创建
     // 创建命名空间出现的问题 都属于本方法自身的问题 不应该抛出
     try {
         admin.createNamespace(builder.build());
     } catch (IOException e) {
         System.out.println("命令空间已经存在");
         e.printStackTrace();
     }
     // 3. 关闭 admin
     admin.close();
 }


骚戴理解: admin.createNamespace(builder.build());的异常是同try-catch处理的,为什么不直接抛出去呢?这是因为直接抛出去的话,假如出现了异常,那么异常代码后面的代码就都不会执行了,如果是try-catch捕获处理了异常,那么异常代码后的代码都会继续正常的执行


3.3.2 、判断表格是否存在

/**
 * 判断表格是否存在
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @return ture 表示存在
 */
 public static boolean isTableExists(String namespace,String tableName) throws IOException {
     // 1. 获取 admin
     Admin admin = connection.getAdmin();
     // 2. 使用方法判断表格是否存在
     boolean b = false;
     try {
         b = admin.tableExists(TableName.valueOf(namespace, tableName));
     } catch (IOException e) {
         e.printStackTrace();
     }
     // 3. 关闭 admin
     admin.close();
     // 3. 返回结果
     return b;
     // 后面的代码不能生效
 }

骚戴理解:这里的admin.close();要写在 return b;的前面,因为return表示方法结束了,就直接退出方法了,那写在后面的话是不会执行的!!获取TableName对象的时候不能直接通过new来获取,因为这类的构造方法是私有的,所以通过静态方法TableName.valueOf(namespace, tableName)来获取TableName对象

3.3.3 、创建表

/**
 * 创建表格
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @param columnFamilies 列族名称 可以有多个
 */
 public static void createTable(String namespace , String tableName , String...columnFamilies) throws IOException {
     // 判断是否有至少一个列族
     if (columnFamilies.length == 0){
         System.out.println("创建表格至少有一个列族");
         return;
     }
     // 判断表格是否存在
     if (isTableExists(namespace,tableName)){
         System.out.println("表格已经存在");
         return;
     }
     // 1.获取 admin
     Admin admin = connection.getAdmin();
     // 2. 调用方法创建表格
     // 2.1 创建表格描述的建造者
     TableDescriptorBuilder tableDescriptorBuilder = 
     TableDescriptorBuilder.newBuilder(TableName.valueOf(namespace, tableName));
     // 2.2 添加参数
     for (String columnFamily : columnFamilies) {
     // 2.3 创建列族描述的建造者
         ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
         // 2.4 对应当前的列族添加参数
         // 添加版本参数
         columnFamilyDescriptorBuilder.setMaxVersions(5);
         // 2.5 创建添加完参数的列族描述
        tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
     }
     // 2.6 创建对应的表格描述
     try {
         admin.createTable(tableDescriptorBuilder.build());
     } catch (IOException e) {
         e.printStackTrace();
     }
     // 3. 关闭 admin
     admin.close();
 }

骚戴理解: String...columnFamilies是可变参数,可以理解为其实就是一个数组,可以传入多个值

3.3.4、 修改表

/**
 * 修改表格中一个列族的版本
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @param columnFamily 列族名称
 * @param version 版本
 */
 public static void modifyTable(String namespace ,String tableName,String columnFamily,int version) throws IOException {
     // 判断表格是否存在
     if (!isTableExists(namespace,tableName)){
         System.out.println("表格不存在无法修改");
         return;
     }
     // 1. 获取 admin
     Admin admin = connection.getAdmin();
     try {
         // 2. 调用方法修改表格
         // 2.0 获取之前的表格描述
         TableDescriptor descriptor = 
        admin.getDescriptor(TableName.valueOf(namespace, tableName));
         // 2.1 创建一个表格描述建造者
         // 如果使用填写 tableName 的方法 相当于创建了一个新的表格描述建造
        者 没有之前的信息
         // 如果想要修改之前的信息 必须调用方法填写一个旧的表格描述
         TableDescriptorBuilder tableDescriptorBuilder = 
        TableDescriptorBuilder.newBuilder(descriptor);
         // 2.2 对应建造者进行表格数据的修改
         ColumnFamilyDescriptor columnFamily1 = 
        descriptor.getColumnFamily(Bytes.toBytes(columnFamily));
         // 创建列族描述建造者
         // 需要填写旧的列族描述
         ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 
        ColumnFamilyDescriptorBuilder.newBuilder(columnFamily1);
         // 修改对应的版本
         columnFamilyDescriptorBuilder.setMaxVersions(version);
         // 此处修改的时候 如果填写的新创建 那么别的参数会初始化
        tableDescriptorBuilder.modifyColumnFamily(columnFamilyDescriptorB
        uilder.build());
        admin.modifyTable(tableDescriptorBuilder.build());
     } catch (IOException e) {
         e.printStackTrace();
     }
     // 3. 关闭 admin
     admin.close();
 }

骚戴理解:如果想要修改之前的信息 必须调用方法填写一个旧的表格描述,也就是不能像上面创建表描述一样直接新建一个TableDescriptorBuilder表格描述对象,而是通过下面的语句来获取旧的表描述

  TableDescriptor descriptor =admin.getDescriptor(TableName.valueOf(namespace,tableName));

同样,修改列值也要用原来旧的表描述的列族,通过下面的语句来获取旧的列族描述

 ColumnFamilyDescriptor columnFamily1 = descriptor.getColumnFamily(Bytes.toBytes(columnFamily));

3.3.5 、删除表

/**
 * 删除表格
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @return true 表示删除成功
 */
 public static boolean deleteTable(String namespace ,String tableName) throws IOException {
     // 1. 判断表格是否存在
     if (!isTableExists(namespace,tableName)){
         System.out.println("表格不存在 无法删除");
         return false;
     }
     // 2. 获取 admin
     Admin admin = connection.getAdmin();
     // 3. 调用相关的方法删除表格
     try {
         // HBase 删除表格之前 一定要先标记表格为不可以
         TableName tableName1 = TableName.valueOf(namespace, tableName);
         admin.disableTable(tableName1);
         admin.deleteTable(tableName1);
     } catch (IOException e) {
         e.printStackTrace();
     }
     // 4. 关闭 admin
     admin.close();
     return true;
 }


骚戴理解: HBase 删除表格之前 一定要先标记表格为不可以,通过admin.disableTable(tableName1);语句来设置表格为不可用

3.4 、DML

创建类 HBaseDML

public class HBaseDML {
 // 添加静态属性 connection 指向单例连接
 public static Connection connection = HBaseConnect.connection;
}


3.4.1、 插入数据

/**
 * 插入数据
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @param rowKey 主键
 * @param columnFamily 列族名称
 * @param columnName 列名
 * @param value 值
 */
 public static void putCell(String namespace,String tableName,String rowKey, String columnFamily,String columnName,String value) throws IOException {
     // 1. 获取 table
     Table table = connection.getTable(TableName.valueOf(namespace, tableName));
     // 2. 调用相关方法插入数据
     // 2.1 创建 put 对象
     Put put = new Put(Bytes.toBytes(rowKey));
     // 2.2. 给 put 对象添加数据
    put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(value));
     // 2.3 将对象写入对应的方法
     try {
         table.put(put);
     } catch (IOException e) {
         e.printStackTrace();
     }
     // 3. 关闭 table
     table.close();
 }

3.4.2 、读取数据(读取对应的一行中的某一列)

/**
 * 读取数据 读取对应的一行中的某一列
 *
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @param rowKey 主键
 * @param columnFamily 列族名称
 * @param columnName 列名
 */
 public static void getCells(String namespace, String tableName, String rowKey, String columnFamily, String columnName) throws IOException {
     // 1. 获取 table
     Table table = connection.getTable(TableName.valueOf(namespace, tableName));
     // 2. 创建 get 对象
     Get get = new Get(Bytes.toBytes(rowKey));
     // 如果直接调用 get 方法读取数据 此时读一整行数据
     // 如果想读取某一列的数据 需要添加对应的参数
     get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
     // 设置读取数据的版本
     get.readAllVersions();
     try {
     // 读取数据 得到 result 对象
     Result result = table.get(get);
     // 处理数据
     Cell[] cells = result.rawCells();
     // 测试方法: 直接把读取的数据打印到控制台
     // 如果是实际开发 需要再额外写方法 对应处理数据
     for (Cell cell : cells) {
         // cell 存储数据比较底层
         String value = new String(CellUtil.cloneValue(cell));
         System.out.println(value);
         }
     } catch (IOException e) {
         e.printStackTrace();
     }
     // 关闭 table
     table.close();
 }


骚戴理解:String value = new String(CellUtil.cloneValue(cell));这句是为了防止乱码,通过这句代码把底层的cell转成字符串来方便打印出来

3.4.3 、扫描数据

/**
 * 扫描数据
 *
 * @param namespace 命名空间
 * @param tableName 表格名称
 * @param startRow 开始的 row 包含的
 * @param stopRow 结束的 row 不包含
 */
 public static void scanRows(String namespace, String tableName, String startRow, String stopRow) throws IOException {
     // 1. 获取 table
     Table table = connection.getTable(TableName.valueOf(namespace, tableName));
     // 2. 创建 scan 对象
     Scan scan = new Scan();
     // 如果此时直接调用 会直接扫描整张表
     // 添加参数 来控制扫描的数据
     // 默认包含
     scan.withStartRow(Bytes.toBytes(startRow));
     // 默认不包含
     scan.withStopRow(Bytes.toBytes(stopRow));
     try {
     // 读取多行数据 获得 scanner
     ResultScanner scanner = table.getScanner(scan);
     // result 来记录一行数据 cell 数组
     // ResultScanner 来记录多行数据 result 的数组
     for (Result result : scanner) {
        Cell[] cells = result.rawCells();
     for (Cell cell : cells) {
         System.out.print (new 
            String(CellUtil.cloneRow(cell)) + "-" + new 
            String(CellUtil.cloneFamily(cell)) + "-" + new 
            String(CellUtil.cloneQualifier(cell)) + "-" + new 
            String(CellUtil.cloneValue(cell)) + "\t");
             }
         System.out.println();
         }
     } catch (IOException e) {
         e.printStackTrace();
     }
     // 3. 关闭 table
     table.close();
 }

骚戴理解:result 来记录一行数据 cell 数组, ResultScanner 来记录多行数据 result 的数组这两句话说的非常精准,这里要理解什么是cell才能理解那个双重for循环


Cell由{rowkey, column Family:column Qualifier, timestamp} 唯一确定的单元。cell 中的数据全部是字节码形式存贮。下面就是一个cell,对照下面的图片和代码比较一下

   // result 来记录一行数据 cell 数组
     // ResultScanner 来记录多行数据 result 的数组
     for (Result result : scanner) {
        Cell[] cells = result.rawCells();
     for (Cell cell : cells) {
         System.out.print (new 
            String(CellUtil.cloneRow(cell)) + "-" + new 
            String(CellUtil.cloneFamily(cell)) + "-" + new 
            String(CellUtil.cloneQualifier(cell)) + "-" + new 
            String(CellUtil.cloneValue(cell)) + "\t");
             }
         System.out.println();
         }

对比就可以知道这个Cell[]数组其实里面就是每一行的每一个列值,而不是所有的行!

看下面的打印结果就能知道了

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
7月前
|
存储 人工智能 大数据
云栖2025|阿里云开源大数据发布新一代“湖流一体”数智平台及全栈技术升级
阿里云在云栖大会发布“湖流一体”数智平台,推出DLF-3.0全模态湖仓、实时计算Flink版升级及EMR系列新品,融合实时化、多模态、智能化技术,打造AI时代高效开放的数据底座,赋能企业数字化转型。
1440 0
|
9月前
|
数据采集 人工智能 分布式计算
ODPS在AI时代的发展战略与技术演进分析报告
ODPS(现MaxCompute)历经十五年发展,从分布式计算平台演进为AI时代的数据基础设施,以超大规模处理、多模态融合与Data+AI协同为核心竞争力,支撑大模型训练与实时分析等前沿场景,助力企业实现数据驱动与智能化转型。
623 4
|
10月前
|
存储 分布式计算 Hadoop
Hadoop框架解析:大数据处理的核心技术
组件是对数据和方法的封装,从用户角度看是实现特定功能的独立黑盒子,能够有效完成任务。组件,也常被称作封装体,是对数据和方法的简洁封装形式。从用户的角度来看,它就像是一个实现了特定功能的黑盒子,具备输入和输出接口,能够独立完成某些任务。
|
7月前
|
数据可视化 大数据 关系型数据库
基于python大数据技术的医疗数据分析与研究
在数字化时代,医疗数据呈爆炸式增长,涵盖患者信息、检查指标、生活方式等。大数据技术助力疾病预测、资源优化与智慧医疗发展,结合Python、MySQL与B/S架构,推动医疗系统高效实现。
|
9月前
|
SQL 分布式计算 大数据
我与ODPS的十年技术共生之路
ODPS十年相伴,从初识的分布式计算到共生进化,突破架构边界,推动数据价值深挖。其湖仓一体、隐私计算与Serverless能力,助力企业降本增效,赋能政务与商业场景,成为数字化转型的“数字神经系统”。
|
9月前
|
存储 人工智能 算法
Java 大视界 -- Java 大数据在智能医疗影像数据压缩与传输优化中的技术应用(227)
本文探讨 Java 大数据在智能医疗影像压缩与传输中的关键技术应用,分析其如何解决医疗影像数据存储、传输与压缩三大难题,并结合实际案例展示技术落地效果。
|
9月前
|
机器学习/深度学习 算法 Java
Java 大视界 -- Java 大数据在智能物流运输车辆智能调度与路径优化中的技术实现(218)
本文深入探讨了Java大数据技术在智能物流运输中车辆调度与路径优化的应用。通过遗传算法实现车辆资源的智能调度,结合实时路况数据和强化学习算法进行动态路径优化,有效提升了物流效率与客户满意度。以京东物流和顺丰速运的实际案例为支撑,展示了Java大数据在解决行业痛点问题中的强大能力,为物流行业的智能化转型提供了切实可行的技术方案。
|
10月前
|
数据采集 自然语言处理 分布式计算
大数据岗位技能需求挖掘:Python爬虫与NLP技术结合
大数据岗位技能需求挖掘:Python爬虫与NLP技术结合
|
10月前
|
存储 分布式计算 算法
Java 大视界 -- Java 大数据在智能教育在线考试监考与作弊检测中的技术创新(193)
本文探讨了Java大数据技术在智能教育在线考试监考与作弊检测中的创新应用。随着在线考试的普及,作弊问题日益突出,传统监考方式难以应对。通过Java大数据技术,可实现考生行为分析、图像识别等多维度监控,提升作弊检测的准确性与效率。结合Hadoop与Spark等技术,系统能实时处理海量数据,构建智能监考体系,保障考试公平性,推动教育评价体系的数字化转型。
|
10月前
|
SQL 缓存 监控
大数据之路:阿里巴巴大数据实践——实时技术与数据服务
实时技术通过流式架构实现数据的实时采集、处理与存储,支持高并发、低延迟的数据服务。架构涵盖数据分层、多流关联,结合Flink、Kafka等技术实现高效流计算。数据服务提供统一接口,支持SQL查询、数据推送与定时任务,保障数据实时性与可靠性。
1370 0

热门文章

最新文章