HybridDB(基于Greenplum)经过长达四个月时间的公测,终于开始商业化的征程、为我们客户提供计算分析能力。
在这之前,我们团队做了许多技术、产品上的打磨,其中OSS的高效访问与处理是其中较为重要的一环。这个功能可以给用户在数据流转方面带来质的变化。
缘起
在传统的OLAP方案中,链路是比较长的,数据流转的代价较为高昂。而且往往常用的数据同步工具未必能够满足需求,复杂的分析在同步上会需要一些功能的定制。而且资源的不够弹性、管理上的诸多麻烦,也带来成本的上升。
那么在云环境中,这一情况可能发生变化,数据的同步、冷备或许会变得更加简单,在线和离线数据的界线或距离可能会变得不那么明显。
oss_ext 与 oss_fdw
HybridDB和PostgreSQL分别是通过oss_ext和oss_fdw来实现对OSS的读写操作。
oss_ext
HybridDB是在PostgreSQL的基础上修改,因此和PostgreSQL一样拥有良好、可靠、高效的扩展性。基于HybridDB插件机制,我们开发了oss\_ext这个插件,用于在PostgreSQL和HybridDB中无缝访问OSS。 HybridDB的外表功能与PG稍有不同,因此用法上也有细微差别,分为读写两种,详细请参考[高速 OSS 并行导入导出](https://help.aliyun.com/document_detail/35457.html?spm=5176.doc35459.6.553.tfadXu)。下面我们会以具体例子来介绍。
oss_fdw
PostgreSQL的Foreign Data Wrapper是一套很强大的机制,可以允许用户以SQL方式访问各种各样的数据源,OSS也不在话下。详细的用法参考[从 OSS 装载数据到 PostgreSQL](https://yq.aliyun.com/articles/59413?spm=5176.100240.searchblog.142.nyQfB6)
注:OSS(Object Storage Service)是阿里云对外提供的海量、安全和高可靠的云存储服务,容量和处理能力的弹性扩展,按实际容量付费,比较适合做数据的冷备。详细请参考对象存储OSS。
OSS --> HybridDB 并行加载
HybridDB是多节点的分布式数据库,在读写OSS的时候,每个节点都会建立一个到OSS的连接,并行读写。如图所示:
数据在导入后,会在每个节点计算HASH值后做分布,相比通过Master的写入,性能可以随节点数线性提升。
create extension oss_ext;
create READABLE external table singlepath (id int, name text) location('oss://oss-cn-beijing.aliyuncs.com dir=loaddata/ id=xxxx key=xxxxx bucket=ext-buffer') format 'csv';
这里的dir参数会指定一个目录loaddata,当前对该表进行查询的时候,会将OSS中该目录下所有文本全部加载进来,但不包括次级目录。执行结果如下:
gpdb=> select * from singlepath;
id | name
----+---------
0 | zero
1 | first
2 | second
(3 rows)
现在在OSS中的目录结构是这样的,两个"output"中的内容是一样的:
loaddata/
-- secondary/
-- output
-- output
创建可以读取层级目录的外部表:
create READABLE external table hyrachi (id int, name text) location('oss://oss-cn-beijing.aliyuncs.com prefix=loaddata/ id=xxxx key=xxxxx bucket=ext-buffer') format 'csv';
查询的结果如下:
gpdb=> select * from hyrachi;
id | name
----+---------
0 | zero
1 | first
2 | second
0 | zero
1 | first
2 | second
(6 rows)
关于数据导入,一些最佳实践的建议:
- 文件数最好是集群总CPU的整数倍,可以充分利用整个HybridDB集群的资源,最大化的并行导入
- 单个文件不宜过小也不宜过大。OSS上文件打开的时间消耗要大于普通的文件打开,文件过小会导致打开时间占比过高,也会导致文件数过多而影响加载;而过大则会导致单个文件的导入时间过久
- 目录(多个文件)的加载,不同的文件之间大小不宜差别过大(如一个几M、一个几G等),会引起木桶效应
- 可以通过管理文件和目录实现对数据的过滤、分割等一系列操作
PostgreSQl --> OSS
相比于HybridDB的oss_ext,PostgreSQL的oss_fdw要更强大一些,可以直接进行读写。
执行以下步骤创建相应外表:
create extension oss_fdw ;
CREATE SERVER ossserver FOREIGN DATA WRAPPER oss_fdw OPTIONS (host 'oss-cn-beijing.aliyuncs.com' , id 'xxxx', key 'xxxxx', bucket 'ext-buffer');
create FOREIGN TABLE singpath (id int, name text) SERVER ossserver OPTIONS (dir 'loaddata/', delimiter ',', format 'csv');
查询结果如下:
postgres=> select * from singpath;
id | name
----+---------
0 | zero
1 | first
2 | second
(3 rows)
从另外一张表中写入到OSS外表,从而写入到OSS:
postgres=> select * from oss_examp ;
id | name
----+-------
0 | zero
1 | first
(2 rows)
postgres=> insert into singpath select id, name from oss_examp where id = 0;
NOTICE: begin writiing data to oss directory loaddata/, with block size 32 MB and oss file size 1024 MB
INSERT 0 2
postgres=> select * from singpath ;
NOTICE: a total of 2 files will be loaded
id | name
----+---------
0 | zero
1 | first
2 | second
0 | zero
(5 rows)
利用对OSS外表的读写这一强大的功能,可以很方便地通过SQL实现对数据的清洗、过滤、导出等功能。上面的"insert into ... select ..." 语句中的select子句可以换成任意条件的查询,从而达到数据处理的目的。
PostgreSQL --> OSS --> HybridDB 数据同步
非常重要的一点是:PostgreSQL和HybridDB在接口上基本兼容,基本可以共用。
通过oss_fdw,PostgreSQL可以通过SQL实现很方便地将数据流到OSS中;利用oss_ext,HybridDB则可以通过SQL直接、高效地读取OSS,实现数据的流入。架构如下:
通过在OSS上合理控制目录结构并写入,可以实现PostgreSQL与HybridDB的无缝同步。这里从以下几个例子予以说明:
- 增量导入
- 数据的分柝与快速分析
- 不同PG实例中相同表的数据合并
增量导入
增量导入一直是被问的比较多的问题,这对于异构数据库一直是一个大的难题。利用oss_ext和oss_fdw则可以基本上通过SQL语句就可以完成需要的工作。
在PG中,建立外表,根据业务需要,定期不定期的根据一定条件将数据同步到OSS的特定目录。例如现在有一张售卖表,每天都有新的售卖记录,需要每天定时统计今天的业务量:
create TABLE sales(id int, name text, deal_day date);
这里是按天来存储数据,一张外表对应OSS中sales目录下的一个子目录,当需要导入新的一天的数据的时候,只需要执行下面两条SQL:
create FOREIGN TABLE sales_2016_12_13 (id int, name text, deal_day date) SERVER ossserver OPTIONS (dir 'sales/2016_12_13/', delimiter ',', format 'csv');
insert into sales_2016_12_13 select * from sales where deal_day == '2016-12-13';
然后,在GP端,可以提前建立好一张总表:
create TABLE sales(id int, name text, deal_day date) distributed by (id);
同样两条SQL,将数据加载进来:
create READABLE external table sales_2016_12_13 (id int, name text) location('oss://oss-cn-beijing.aliyuncs.com dir=sales/2016-12-13/ id=xxxx key=xxxxx bucket=ext-buffer') format 'csv';
insert into sales select * from sales_2016_12_13;
也可以一次性完整的加载:
create READABLE external table sales_ext (id int, name text) location('oss://oss-cn-beijing.aliyuncs.com prefix=sales/ id=xxxx key=xxxxx bucket=ext-buffer') format 'csv';
insert into sales select * from sales_ext;
注:请注意这里的location定义中的"dir" 换成了"prefix"。
将上面的逻辑整理成一个Python脚本就是:
#! /bin/env python
import time
import json
import psycopg2
import datetime
PREFIX = "sales"
def sync_day():
now_date = datetime.datetime.now()
today = "%s_%s_%s" % (now_date.year, now_date.month, now_date.day)
table_name = "%s_%s" % (PREFIX, today)
oss_dir = "%s/%s/" % (PREFIX, today)
print today
print table_name
print oss_dir
pgsql_conn_string = "host=apsaradbsamplepgsqlinstance.pg.rds.aliyuncs.com port=3569 dbname=postgres password= sample user=sample"
pgsql_create_table = "create FOREIGN TABLE %s (id int, name text, deal_day date) SERVER ossserver OPTIONS (dir '%s', delimiter ',', format 'csv');" % ( table_name, oss_dir)
pgsql_insert = "insert into %s select * from %s where deal_day = '%s';" % (table_name, PREFIX, today)
gpdb_conn_string = "host=apsaradbsamplegpdbinstance.gpdb.rds.aliyuncs.com port=3569 dbname=gpdb user=sample password=sample"
gpdb_create_table = "create READABLE external table %s(id int, name text, deal_day date) location('oss://oss-cn-beijing.aliyuncs.com dir=%s id=xxxx key=xxxxx bucket=ext-buffer') format 'csv';" % (table_name, oss_dir)
gpdb_insert = "insert into %s select * from %s where deal_day = '%s';" % (PREFIX, table_name, today)
print pgsql_create_table
execute(pgsql_create_table, pgsql_conn_string)
print pgsql_insert
execute(pgsql_insert, pgsql_conn_string)
print gpdb_create_table
execute(gpdb_create_table, gpdb_conn_string)
print gpdb_insert
execute(gpdb_insert, gpdb_conn_string)
def execute(sql, conn_string):
conn = None
try:
conn = psycopg2.connect(conn_string)
conn.autocommit = True
cur = conn.cursor()
cur.execute(sql)
except Exception as e:
if conn:
try:
conn.close()
except:
pass
time.sleep(10)
print e
return None
def main():
sync_day()
if __name__ == "__main__":
main()
更完善一些的话,可以:
- 加上一些错误处理
- 加上对OSS的管理部分
数据的分柝与快速分析
这里的“数据的分柝”是指根据原始数据的某个字段或维度,将数据拆分成多份,分开存储与计算。相比要更简单一些,将上面的例子中稍做修改即可。比如,在PG中,将数据依据条件导出到不同的OSS目录:
create table sales(id int, name text, deal_day date, site_id integer);
create FOREIGN TABLE sales_site_0 (id int, name text, deal_day date, site_id integer) SERVER ossserver OPTIONS (dir 'sales/site_0/', delimiter ',', format 'csv');
create FOREIGN TABLE sales_site_1 (id int, name text, deal_day date, site_id integer) SERVER ossserver OPTIONS (dir 'sales/site_1/', delimiter ',', format 'csv');
create FOREIGN TABLE sales_site_2 (id int, name text, deal_day date, site_id integer) SERVER ossserver OPTIONS (dir 'sales/site_2/', delimiter ',', format 'csv');
insert into sales_site_0 select * from sales where site_id = 0;
insert into sales_site_1 select * from sales where site_id = 1;
insert into sales_site_2 select * from sales where site_id = 2;
导入到OSS中的目录结构是:
sales
--> site_0
--> file1
--> file2
--> file3
--> site_1
--> file1
--> file2
--> site_2
--> file1
--> file2
--> file3
那么,可以将site_0、site_1、site_2分别导入不同的GP实例,或同一个GP实例的不同表中,以达到数据拆分的目的。
当数据量不是特别大的时候,也可以通过下面语句直接进行分析而不必将数据完全导入,以实现快速的检查性分析:
select * from sales_2016_12_13 where deal_day != '2016-12-13';
请注意:这种快速分析的方式,会先将所有数据加载到内存进行计算,不适合大于总内存数据量的场景。
不同PG实例中相同表的数据合并
在了解了“增量同步”的部分之后,数据的合并也是比较简单的。这里的“数据合并”是指:多个不同的PG实例中,具有相同的表结构的数据,最后需要汇总进行分析、计算,比如分库分表。
那么这个时候,只需将不同的PG实例中的数据写入到同一个OSS目录中、然后在GP中递归加载(prefix参数)上一层目录即可。这里需要注意的有以下几点:
- 不同的PG实例中的表结构需要相同,不然GP在加载的时候会报错(某些文件会报字段不存在、或字段多了)
- 写入同一个OSS目录中,可以继续分层级,但层级不宜过多
总结
从上面的例子中可以看出:HybridDB + OSS + PostgreSQL,可以有效覆盖OLTP和OLAP一体化的场景。当前在使用上,需要用户做一些自行的定义,同时也提供了足够的灵活度。而在后面,我们可能会进一步进行一体化打造,比如PG到GP的逻辑复制、MySQL到GP的逻辑复制等。让我们在使用体验上更加顺滑。
很荣幸的,见到了很多客户对我们的认可与信赖,甚至有一些客户不断催促我们尽快的商业化,因为他们的业务也要跟着正式上线。这是一份沉甸甸的信任,同时也让我们心存忐忑与敬畏。