使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000次 1年
表格存储 Tablestore,50G 2个月
简介: 本文介绍使用 Data Lake Formation (DLF)服务,实时订阅 Tablestore(原 OTS) 的数据,并以 Delta Lake 的格式投递进入 OSS,构建实时数据湖。 ## 架构介绍 表格存储是一种全托管的云原生数据库,使用表格存储您无需担心软硬件预置、配置、故障、集群扩展、安全等问题。提供高服务可用性的同时极大地减少了管理成本。 表格存储支持多种数据库模型

本文介绍使用 Data Lake Formation (DLF)服务,实时订阅 Tablestore(原 OTS) 的数据,并以 Delta Lake 的格式投递进入 OSS,构建实时数据湖。

架构介绍

表格存储是一种全托管的云原生数据库,使用表格存储您无需担心软硬件预置、配置、故障、集群扩展、安全等问题。提供高服务可用性的同时极大地减少了管理成本。
表格存储支持多种数据库模型,可以广泛应用于时序数据、时空数据、消息数据、元数据以及大数据等核心数据场景。当海量的数据存储在表格存储中您希望把数据实时汇聚在 OSS 构建内部的大数据数据湖时,可以使用 Data Lake Formation 提供的托管的数据投递功能,把写入表格存储的数据实时 ETL 到 OSS 中,比以 Delta Lake 的格式存储在 OSS 之上。进入 Delta Lake 后的数据可以再做进一步的流计算或者批计算,大体架构如下图所示

1609138189979-643a9101-1813-44f1-8ab9-8a616c13e1a5.png

本文会重点介绍如何操作快速构建上述的架构图。这里我们假设你已经购买并使用 Tablestore 做为你的存储选型,如果还没有使用,可以参考这里。购买并开通表格存储服务,使用我们的 SDK,或者各类数据管道,导入工具例如 Datax,数据集成服务,Datahub,DTS,又或者计算引擎 Spark,Flink,把数据写入表格存储。下面我们重点介绍如何把这些数据进行 Delta Lake 的数据构建。

数据湖构建环境准备

1.登录数据湖构建
1609138965257-ec79421a-13d1-4247-b046-b8f29700eb89.png

2.创建新的入湖模板

选择实时 OTS,这里 DLF 会使用 Spark Streaming 的方式去订阅 Tablestore 中的数据。Tablestore 的 CDC 因为支持灵活的数据订阅能力,包括全量,全加增,以及增量三种模式,所以可以使用同一的流式入口。
3.png

4.png

3.注意如果是第一次使用,需要创建配置一下数据湖位置,即上图中的目标数据库,选择一个你的 OSS 路径。配置后点击下一步,如图中所示我们创建了一个 BaseAndStream 类型的通道,使得数据湖中可以包含表格存储的全量数据以及未来的新增实时数据。CU 可以根据你的数据写入量和大小进行动态配置。这里我们设置了10cu。

5.png

4.创建好入湖模板后,点击运行即可开始数据的实时入湖工作流。

6.png

5.点击运行,投递任务开始进行,这时候等一小段时间就可以在 OSS 中看到你的 delta 数据,分为 log 路径和data 路径,如下图所示:

7.png

数据文件使用的也是 parquet。这时候当数据实时汇聚在 delta 后,就可以开始我们的基于 deltalake 的数据分析处理了。

6.除了通过 dlf 的监控查看消费情况,还可以在 Tablestore 控制台查看数据投递进度:

8.png
    

数据湖数据分析

这一节来简单介绍下,数据通过 DLF 投递入湖后,如何进行数据的分析。这里我们使用 EMR Spark 来进行数据分析。使用了 DatalakeFormation 进行数据投递后,创建 emr 集群选择数据湖元数据,做为 emr 元数据。此时我们之前投递的 OSS 数据湖可以自动关联外表。
9.png
集群创建好后,我们登陆集群启动 Spark SQL,

spark-sql --master yarn --num-executors 4 --executor-memory 8g --executor-cores 4

执行showdatabse,dlf create oss 数据湖会被list出来:
20/12/29 11:26:29 INFO [main] SparkSQLCLIDriver: Spark master: yarn, Application Id: application_1609144808351_0011
spark-sql> show databases;
20/12/29 11:26:51 INFO [main] SparkSQLQueryListener: command is called
20/12/29 11:26:51 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212411579 with spark sql successfully.
20/12/29 11:26:51 INFO [main] CodeGenerator: Code generated in 168.938794 ms
20/12/29 11:26:51 INFO [main] SparkSQLQueryListener: execution is called
20/12/29 11:26:51 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212411922 with spark sql successfully.
default
dlftpch1
testlakeformation1
Time taken: 1.344 seconds, Fetched 3 row(s)
20/12/29 11:26:51 INFO [main] SparkSQLCLIDriver: Time taken: 1.344 seconds, Fetched 3 row(s)

进入dlf的数据库dlftpch1,执行show tables; deltalake的数据湖也会被自动关联外表,schema和我们的DLF schema一致。
spark-sql> show tables;
20/12/29 11:28:18 INFO [main] SparkSQLQueryListener: command is called
20/12/29 11:28:18 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212498535 with spark sql successfully.
20/12/29 11:28:18 INFO [main] CodeGenerator: Code generated in 9.697932 ms
20/12/29 11:28:18 INFO [main] SparkSQLQueryListener: execution is called
20/12/29 11:28:18 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212498560 with spark sql successfully.
dlftpch1        tpchdata1       false
Time taken: 0.121 seconds, Fetched 1 row(s)
20/12/29 11:28:18 INFO [main] SparkSQLCLIDriver: Time taken: 0.121 seconds, Fetched 1 row(s)

然后我们就可以在这张deltalake表上进行adhoc的sql计算
spark-sql> select count(*) from tpchdata1;
20/12/29 11:29:11 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212551026 with spark sql successfully.
528026067
Time taken: 21.712 seconds, Fetched 1 row(s)
20/12/29 11:29:11 INFO [main] SparkSQLCLIDriver: Time taken: 21.712 seconds, Fetched 1 row(s)

我们可以查看下这张外表的create语句:
spark-sql> show create table tpchdata1;
20/12/29 11:31:21 INFO [main] SparkSQLQueryListener: command is called
20/12/29 11:31:21 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212681298 with spark sql successfully.
20/12/29 11:31:21 INFO [main] SparkSQLQueryListener: execution is called
20/12/29 11:31:21 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212681306 with spark sql successfully.
CREATE TABLE `tpchdata1` (`l_orderkey` INT, `l_linenumber` INT, `l_comment` STRING, `l_commitdate` STRING, `l_discount` DOUBLE, `l_extendedprice` DOUBLE, `l_linestatus` STRING, `l_partkey` INT, `l_quantity` DOUBLE)
USING delta
OPTIONS (
  `serialization.format` '1',
  path 'oss://lakeformation1/DLF-tpchdata1'
)

总结

本文介绍了使用 DLF (Data Lake Formation)的实时数据湖构建能力,订阅 Tablestore 的全增数据,构建流批一体的数据湖存储格式 Delta Lake。并在实例中使用 EMR Spark 进行构建数据的交互分析。整套架构可以帮助你基于 Tablestore + OSS 两套 Serverless 存储,低成本的构建实时的数据读写和分析。对细节架构感兴趣的同学欢迎加群交流(群号:23307953)。

test

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
存储 索引
表格存储根据多元索引查询条件直接更新数据
表格存储是否可以根据多元索引查询条件直接更新数据?
114 3
|
SQL NoSQL 数据可视化
玩转Tablestore:使用Grafana快速展示时序数据
Grafana 是一款采用 go 语言编写的开源应用,主要用于大规模指标数据的可视化展现,是网络架构和应用分析中最流行的时序数据展示工具,可以通过将采集的数据查询然后可视化的展示,实现报警通知;Grafana拥有丰富的数据源,官方支持以下数据源:Graphite,Elasticsearch,InfluxDB,Prometheus,Cloudwatch,MySQ
1756 0
玩转Tablestore:使用Grafana快速展示时序数据
|
5月前
|
DataWorks NoSQL 关系型数据库
DataWorks产品使用合集之如何从Tablestore同步数据到MySQL
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
7月前
|
分布式计算 DataWorks API
DataWorks常见问题之按指定条件物理删除OTS中的数据失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
7月前
|
DataWorks NoSQL 关系型数据库
可以使用dataworks从tablestore同步数据到mysql吗?
可以使用dataworks从tablestore同步数据到mysql吗?
75 1
|
NoSQL 开发工具
TableStore表格存储(阿里云OTS)多行数据操作查询,支持倒序,过滤条件和分页
1. 批量读取操作 批量读取操作可以通过多种方式进行,包括: GetRow:根据主键读取一行数据。 BatchGetRow:批量读取多行数据。 GetRange:根据范围读取多行数据。
885 0
|
存储 消息中间件 NoSQL
物联网数据通过规则引擎流转到OTS|学习笔记
快速学习物联网数据通过规则引擎流转到OTS
341 15
物联网数据通过规则引擎流转到OTS|学习笔记
|
存储 负载均衡 开发者
表格存储数据多版本介绍| 学习笔记
快速学习表格存储数据多版本介绍。
表格存储数据多版本介绍| 学习笔记
|
存储 NoSQL 关系型数据库
基于TableStore的海量气象格点数据解决方案实战 王怀远
基于TableStore的海量气象格点数据解决方案实战 王怀远
425 0
基于TableStore的海量气象格点数据解决方案实战 王怀远
|
存储 SQL 运维
基于Tablestore 实现大规模订单系统海量订单/日志数据分类存储的实践
前言:从最早的互联网高速发展、到移动互联网的爆发式增长,再到今天的产业互联网、物联网的快速崛起,各种各样新应用、新系统产生了众多订单类型的需求,比如电商购物订单、银行流水、运营商话费账单、外卖订单、设备信息等,产生的数据种类和数据量越来越多;其中订单系统就是一个非常广泛、通用的系统。而随着数据规模的快速增长、大数据技术的发展、运营水平的不断提高,包括数据消费的能力要求越来越高,这对支撑订单系统的数据库设计、存储系统也提出了更多的要求。在新的需求下,传统的经典架构面临着诸多挑战,需要进一步思考架构优化,以更好支撑业务发展;
796 0
基于Tablestore 实现大规模订单系统海量订单/日志数据分类存储的实践