Flink SQL 实战:HBase 的结合应用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文着重介绍 HBase 和 Flink 在实际场景中的结合使用。主要分为两种场景,第一种场景:HBase 作为维表与 Flink Kafka table 做 temporal table join 的场景;第二种场景:Flink SQL 做计算之后的结果写到 HBase 表,供其他用户查询的场景。

本文主要介绍 HBase 和 Flink SQL 的结合使用。HBase 作为 Google 发表 Big Table 论文的开源实现版本,是一种分布式列式存储的数据库,构建在 HDFS 之上的 NoSQL 数据库,非常适合大规模实时查询,因此 HBase 在实时计算领域使用非常广泛。可以实时写 HBase,也可以利用 buckload 一把把离线 Job 生成 HFile Load 到HBase 表中。而当下 Flink SQL 的火热程度不用多说,Flink SQL 也为 HBase 提供了 connector,因此 HBase 与 Flink SQL 的结合非常有必要实践实践。

当然,本文假设用户有一定的 HBase 知识基础,不会详细去介绍 HBase 的架构和原理,本文着重介绍 HBase 和 Flink 在实际场景中的结合使用。主要分为两种场景,第一种场景:HBase 作为维表与 Flink Kafka table 做 temporal table join 的场景;第二种场景:Flink SQL 做计算之后的结果写到 HBase 表,供其他用户查询的场景。因此,本文介绍的内容如下所示:

· HBase 环境准备
· 数据准备
· HBase 作为维度表进行 temporal table join的场景
· Flink SQL 做计算写 HBase 的场景
· 总结

一、HBase 环境准备

由于没有测试的 HBase 环境以及为了避免污染线上 Hbase 环境。因此,自己 build一个 Hbase docker image(大家可以 docker pull guxinglei/myhbase 拉到本地),是基于官方干净的 ubuntu imgae 之上安装了 Hbase 2.2.0 版本以及 JDK1.8 版本。

启动容器,暴露 Hbase web UI 端口以及内置 zk 端口,方便我们从 web 页面看信息以及创建 Flink Hbase table 需要 zk 的链接信息。

docker run -it --network=host -p 2181:2181 -p 60011:60011 docker.io/guxinglei/myhbase:latest bash

1.png

· 进入容器,启动 HBase 集群,以及启动 rest server,后续方便我们用 REST API 来读取 Flink SQL 写进 HBase 的数据。

# 启动hbase 集群bin/start-hbase.sh# 后台启动restServerbin/hbase-daemon.sh start rest -p 8000

2.png

二、数据准备

由于 HBase 环境是自己临时搞的单机服务,里面没有数据,需要往里面写点数据供后续示例用。在 Flink SQL 实战系列第二篇中介绍了如何注册 Flink Mysql table,我们可以将广告位表抽取到 HBase 表中,用来做维度表,进行 temporal table join。因此,我们需要在 HBase 中创建一张表,同时还需要创建 Flink HBase table, 这两张表通过 Flink SQL 的 HBase connector 关联起来。

· 在容器中启动 HBase shell,创建一张名为 dim_hbase 的 HBase 表,建表语句如下所示:

# 在hbase shell创建 hbase表
hbase(main):002:0> create 'dim_hbase','cf'
Created table dim_hbase
Took 1.3120 seconds
=> Hbase::Table - dim_hbase

3.png

· 在 Flink 中创建 Flink HBase table,建表语句如下所示:

# 注册 Flink Hbase table
DROP TABLE IF EXISTS flink_rtdw.demo.hbase_dim_table;
CREATE TABLE flink_rtdw.demo.hbase_dim_table (
  rowkey STRING,
  cf ROW < adspace_name STRING >,
  PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'dim_hbase',
'sink.buffer-flush.max-rows' = '1000',
'zookeeper.quorum' = 'localhost:2181'
);

· Flink MySQL table 和 Flink HBase table 已经创建好了,就可以写抽取数据到HBase 的 SQL job 了,SQL 语句以及 job 状态如下所示:

# 抽取Mysql数据到Hbase表中


insert into
  hbase_dim_table
select
CAST (ID as VARCHAR),
ROW(name)
from
  mysql_dim_table;

4.png

5.png

6.png

7.png

03 HBase 作为维表与 Kafka 做 temporal join 的场景

在 Flink SQL join 中,维度表的 join 一定绕不开的,比如订单金额 join 汇率表,点击流 join 广告位的明细表等等,使用场景非常广泛。那么作为分布式数据库的 HBase 比 MySQL 作为维度表用作维度表 join 更有优势。在 Flink SQL 实战系列第二篇中,我们注册了广告的点击流,将 Kafka topic 注册 Flink Kafka Table,同时也介绍了 temporal table join 在 Flink SQL 中的使用;那么本节中将会介绍 HBase 作为维度表来使用,上面小节中已经将数据抽取到 Hbase 中了,我们直接写 temporal table join 计算逻辑即可。

· 作为广告点击流的 Flink Kafa table 与 作为广告位的 Flink HBase table 通过广告位 Id 进行 temporal table join,输出广告位 ID 和广告位中文名字,SQL join 逻辑如下所示:

select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId,
       hbase_dim_table.cf.adspace_name as publisher_adspace_name
from adsdw_dwd_max_click_mobileapp
left join hbase_dim_table FOR SYSTEM_TIME AS OF adsdw_dwd_max_click_mobileapp.procTime
on cast(adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as string) = hbase_dim_table.rowkey;

· temporal table join job 提交 Flink 集群上的状态以及 join 结果如下所示:

8.png

9.png

四、计算结果 sink 到 HBase 作为结果的场景

上面小节中,HBase 作为维度表用作 temporal table join 是非常常见的场景,实际上 HBase 作为存储计算结果也是非常常见的场景,毕竟 Hbase 作为分布式数据库,底层存储是拥有多副本机制的 HDFS,维护简单,扩容方便, 实时查询快,而且提供各种客户端方便下游使用存储在 HBase 中的数据。那么本小节就介绍 Flink SQL 将计算结果写到 HBase,并且通过 REST API 查询计算结果的场景。

· 进入容器中,在 HBase 中新建一张 HBase 表,一个 column family 就满足需求,建表语句如下所示:

# 注册hbase sink table
create 'dwa_hbase_click_report','cf'

10.png

· 建立好 HBase 表之后,我们需要在 Flink SQL 创建一张 Flink HBase table,这个时候我们需要明确 cf 这个 column famaly 下面 column 字段,在 Flink SQL实战第二篇中,已经注册好了作为点击流的 Flink Kafka table,因此本节中,将会计算点击流的 uv 和点击数,因此两个 column 分别为 uv 和 click_count,建表语句如下所示:

# 注册 Flink Hbase table
DROP TABLE IF EXISTS flink_rtdw.demo.dwa_hbase_click_report;
CREATE TABLE flink_rtdw.demo.dwa_hbase_click_report (
  rowkey STRING,
  cf ROW < uv BIGINT, click_count BIGINT >,
  PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'dwa_hbase_click_report',
'sink.buffer-flush.max-rows' = '1000',
'zookeeper.quorum' = 'hostname:2181'
);

11.png

· 前面点击流的 Flink Kafka table 和存储计算结果的 HBase table 和 Flink HBase table 已经准备了,我们将做一个1分钟的翻转窗口计算 uv 和点击数,并且将计算结果写到 HBase 中。对 HBase 了解的人应该知道,rowkey 的设计对 hbase regoin 的分布有着非常重要的影响,基于此我们的 rowkey 是使用 Flink SQL 内置的 reverse 函数进行广告位 Id 进行反转和窗口启始时间做 concat,因此,SQL 逻辑语句如下所示:

INSERT INTO dwa_hbase_click_report
SELECT
CONCAT(REVERSE(CAST(publisher_adspace_adspaceId AS STRING)) ,
'_',
CAST((UNIX_TIMESTAMP(DATE_FORMAT(TUMBLE_START(ets, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss')) * 1000) AS STRING)
  ) as rowkey, 
ROW(COUNT(DISTINCT audience_mvid) , COUNT(audience_behavior_click_creative_impressionId)) as cf
FROM
  adsdw_dwd_max_click_mobileapp
WHERE publisher_adspace_adspaceId IS NOT NULL AND audience_mvid IS NOT NULL AND audience_behavior_click_creative_impressionId IS NOT NULL
GROUP BY
  TUMBLE(ets, INTERVAL '1' MINUTE),
  publisher_adspace_adspaceId;

12.png

· SQL job 提交之后的状态以及结果 check 如下所示:

13.png

14.png

上述 SQL job 已经成功的将结算结果写到 HBase 中了。对于线上的 HBase 服务来讲,很多同事不一定有 HBase 客户端的权限,从而也不能通过 HBase shell 读取数据;另外作为线上报表服务显然不可能通过 HBase shell 来通过查询数据。因此,在实时报表场景中,数据开发工程师将数据写入 HBase, 前端工程师通过 REST API 来读取数据。前面我们已经启动了 HBase rest server 进程,我们可以通 rest 服务提供读取 HBase 里面的数据。

· 我们先 get 一条刚刚写到 HBase 中的数据看看,如下所示:

15.png

· 下面我们开始通过 REST API 来查询 HBase 中的数据,第一步,执行如下语句拿到 scannerId;首先需要将要查询的 rowkey 进行 base64 编码才能使用,后面需要将结果进行 base64 解码

rowkey base64 编码前:0122612_1606295280000
base64 编码之后:MDEyMjYxMl8xNjA2Mjk1MjgwMDAw

curl -vi -X PUT \
         -H "Accept: text/xml" \
         -H "Content-Type: text/xml" \
         -d '<Scanner startRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw" endRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw"></Scanner>' \
"http://hostname:8000/dwa_hbase_click_report/scanner"

16.png

· 第二步,执行如下语句根据上条语句返回的 scannerID 查询数据,可以看到返回的结果:

curl -vi -X GET \
         -H "Accept: application/json" \
"http://hostname:8000/dwa_hbase_click_report/scanner/16063768141736ac0a8b5"

17.png

· 第三步,查询完毕之后,执行如下语句删除该scannerId:

curl -vi -X DELETE \
         -H "Accept: text/xml" \
"http://hostname:8000/dwa_hbase_click_report/scanner/16063768141736ac0a8b5"

18.png

五、总结

在本篇文章中,我们介绍了 HBase 和 Flink SQL 的结合使用比较广泛两种的场景:作为维度表用以及存储计算结果;同时使用 REST API 对 HBase 中的数据进行查询,对于查询用户来说,避免直接暴露 HBase 的 zk,同时将 rest server 和 HBase 集群解耦。

作者简介

余敖,360 数据开发高级工程师,目前专注于基于 Flink 的实时数仓建设与平台化工作。对 Flink、Kafka、Hive、Spark 等进行数据 ETL 和数仓开发有丰富的经验。

社区二维码.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
232 1
|
3月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
184 15
|
2月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
49 0
|
3月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
49 1
|
7月前
|
SQL 关系型数据库 MySQL
SQL基础开发与应用-课程及场景介绍
这是一门关于《SQL基础开发与应用》的课程介绍,主要针对数据库Clouder认证的第二阶段。课程以电商平台后端开发为背景,教授RDS for MySQL的SQL基础知识,包括存储过程、触发器和视图等高级特性,并指导学员使用Python进行数据库的增删改查操作。学习目标包括掌握SQL基础操作,了解RDS的高阶功能,并熟悉Python连接RDS进行数据处理。课程采用场景化教学,以跨境电商网站数据库搭建为例,帮助学员理解实际应用。
87 0
|
SQL
Sql开发与应用
1. create table a1 (id number(*,2));  // 如果整数部分长度不确定,可以用*号来代替number(*,2) ...
3521 0
|
3月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
5月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
128 13
|
5月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。

相关产品

  • 实时计算 Flink版