通过开源Flink读写云原生数据仓库AnalyticDB PostgreSQL

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: 本文介绍如何通过开源Flink版实时读写云原生数据仓库AnalyticDB PostgreSQL(以下简称ADB PG版,原分析型数据库PostgreSQL版)数据,包括版本限制、网络要求、操作步骤、类型映射和参数支持等。

ADB PG版基于Flink 自定义conenctor支持读取(维表)和写入(结果表)。通过Flink SQL即可实现对ADB PG版的访问。

前提条件

版本要求

Flink 1.11及以上版本

ADBPG 6.0版本;

网络要求

ADBPG实例与Flink实例在同一VPC下;

ADBPG设置白名单,开放对Flink实例的网络访问。


操作步骤

设置ADBPG实例

1、购买6.0版本ADBPG实例,创建账号,并设置白名单:

2、连接数据库,创建待写入目标表、和待查询源数据表:

create table test6(a int,b text,c text,d int,e int, f int, g bigint, h float, i double precision, j boolean);

insert into test6 values(0 ,'b0', 'c0', 40,  50,    60,    70,       80.1,    90.1,     'true');

insert into test6 values(0 ,'b0', 'c0', 40,  50,    60,    70,       80.2,    90.2,     'false');

insert into test6 values(1 ,'b1', 'c1', 41,  50,    60,    70,       80.1,    90.1,     'true');

insert into test6 values(1 ,'b1', 'c1', 41,  50,    60,    70,       80.2,    90.2,     'false');

insert into test6 values(2 ,'b2', 'c2', 40,  50,    60,    70,       80.1,    90.1,     'true');

insert into test6 values(2 ,'b2', 'c2', 40,  50,    60,    70,       80.2,    90.2,     'false');

insert into test6 values(3 ,'b3', 'c3', 40,  50,    60,    70,       80.1,    90.1,     'true');

insert into test6 values(3 ,'b3', 'c3', 40,  50,    60,    70,       80.2,    90.2,     'false');

insert into test6 values(4 ,'b4', 'c4', 40,  50,    60,    70,       80.1,    90.1,     'true');

insert into test6 values(4 ,'b4', 'c4', 40,  50,    60,    70,       80.2,    90.2,     'false');

insert into test6 values(5 ,'b5', 'c5', 40,  50,    60,    70,       80.1,    90.1,     'true');

insert into test6 values(5 ,'b5', 'c5', 40,  50,    60,    70,       80.2,    90.2,     'false');

insert into test6 values(6 ,'b6', 'c6', 40,  50,    60,    70,       80.1,    90.1,     'true');

insert into test6 values(6 ,'b6', 'c6', 40,  50,    60,    70,       80.2,    90.2,     'false');

insert into test6 values(7 ,'b7', 'c7', 40,  50,    60,    70,       80.1,    90.1,     'true');

insert into test6 values(7 ,'b7', 'c7', 40,  50,    60,    70,       80.2,    90.2,     'false');

insert into test6 values(8 ,'b8', 'c8', 40,  50,    60,    70,       80.1,    90.1,     'true');

insert into test6 values(8 ,'b8', 'c8', 40,  50,    60,    70,       80.2,    90.2,     'false');

insert into test6 values(9 ,'b9', 'c9', 40,  50,    60,    70,       80.1,    90.1,     'true');

insert into test6 values(9 ,'b9', 'c9', 40,  50,    60,    70,       80.2,    90.2,     'false');

create table test7(a int,b text,c text,d int,e int, f int, g bigint, h float, i double precision, j boolean, k int,l text,m text,n int,o int, p int, q bigint, r float, s double precision, t boolean);


Flink作业开发

1、创建Flink vvp版实例,要保证Flink实例与ADBPG实例处于同一个VPC下;

2、创建SQL作业

3、作业开发

代码参考:

CREATE TEMPORARY TABLE datagen_source2(

 a INT,

 b VARCHAR,

 c CHAR(15),

 d TINYINT,

 e SMALLINT,

 f INT,

 g BIGINT,

 h FLOAT,

 i DOUBLE,

 j BOOLEAN,

 `proctime` AS PROCTIME()

) with (

 'connector' = 'datagen'

);


CREATE TEMPORARY TABLE adbpg_dim2 (

 a INT,

 b VARCHAR,

 c CHAR(15),

 d TINYINT,

 e SMALLINT,

 f INT,

 g BIGINT,

 h FLOAT,

 i DOUBLE,

 j BOOLEAN

) with (

  'connector' = 'adbpg',

  'password' = 'password',

  'tablename' = 'tablename',

  'username' = 'username',

  'url' = 'jdbc:postgresql://url:port/databasename',

  'maxretrytimes' = '2',

  'connectionmaxactive' = '5',

  'targetschema' = 'public',

  'casesensitive' = '0',

  'retrywaittime' = '200',

  'cache' = 'lru',

  'cacheSize'= '1000000',

  'cacheTTLMs' = '2000000000');


CREATE TEMPORARY TABLE adbpg_sink2(

 a INT,

 b VARCHAR,

 c CHAR(15),

 d TINYINT,

 e SMALLINT,

 f INT,

 g BIGINT,

 h FLOAT,

 i DOUBLE,

 j BOOLEAN,

 k INT,

 l VARCHAR,

 m CHAR(15),

 n TINYINT,

 o SMALLINT,

 p INT,

 q BIGINT,

 r FLOAT,

 s DOUBLE,

 t BOOLEAN

) with (

  'connector' = 'adbpg',

  'password' = 'password',

  'tablename' = 'tablename',

  'username' = 'username',

  'url' = 'jdbc:postgresql://url:port/databasename',

  'maxretrytimes' = '2',

  'batchsize' = '100',

  'connectionmaxactive' = '5',

  'conflictmode' = 'ignore',

  'usecopy' = '0',

  'targetschema' = 'public',

  'exceptionmode' = 'ignore',

  'casesensitive' = '0',

  'writemode' = '0',

  'retrywaittime' = '200'

);


insert into adbpg_sink2 select T.a, T.b, T.c, T.d, T.e, T.f, T.g, T.h, T.i, T.j, H.a, H.b, H.c, H.d, H.e, H.f, H.g, H.h, H.i, H.j FROM datagen_source2 AS T JOIN adbpg_dim2 FOR SYSTEM_TIME AS OF T.proctime AS H ON MOD(T.a, 10) = H.a;

4、上传jar包:

https://adbpg-public.oss-cn-beijing.aliyuncs.com/flink-connector-adbpg-1.11.1-jar-with-dependencies.jar


5、运行上线:

点击验证、运行、上线,观察日志和数据库判断是否有异常,是否成功写入数据库。

13.png

维表参数说明

参数名

参数含义

备注

url

ADBPG连接地址

必填,需要填写内网连接地址。

tableName

ADBPG源表名

必填,填写维表对应的ADBPG数据仓库中的表名。

userName

ADBPG用户名

必填。

password

ADBPG密码

必填。

joinMaxRows

左表一条记录连接右表的最大记录数

非必填,表示在一对多连接时,左表一条记录连接右表的最大记录数(默认值为1024)。在一对多连接的记录数过多时,可能会极大的影响流任务的性能,因此您需要增大Cache的内存(cacheSize限制的是左表key的个数)。

maxRetryTimes

单次SQL失败后重试次数

非必填,实际执行时,可能会因为各种因素造成执行失败,比如网络或者IO不稳定,超时等原因,ADBPG维表支持SQL执行失败后自动重试,用maxRetryTimes参数可以设定重试次数。默认值为3。

connectionMaxActive

连接池最大连接数

非必填,ADBPG维表中内置连接池,设置合理的连接池最大连接数可以兼顾效率和安全性,默认值为5。

retryWaitTime

重试休眠时间

非必填,每次SQL失败重试之间的sleep间隔,单位ms,默认值100

targetSchema

查询的ADBPG schema

非必填,默认值public

caseSensitive

是否大小写敏感

非必填,默认值0,即不敏感;填1可以设置为敏感;

cache

缓存策略

目前分析型数据库PostgreSQL版支持以下三种缓存策略:

  • none(默认值):无缓存。
  • lru:缓存维表里的部分数据。源表来一条数据,系统会先查找Cache,如果没有找到,则去物理维表中查询。需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

cacheSize

设置LRU缓存的最大行数

非必填,默认为10000行

cacheTTLMs

缓存更新时间间隔。系统会根据您设置的缓存更新时间间隔,重新加载一次维表中的最新数据,保证源表能JOIN到维表的最新数据。

非必填,单位为毫秒。默认不设置此参数,表示不重新加载维表中的新数据。


结果表参数说明

参数

注释说明

是否必选

备注

type

类型

固定值,为adbpg

url

jdbc连接地址

分析型数据库PostgreSQL版数据库的jdbc连接地址 。

格式为:'jdbc:postgresql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>'

其中<yourNetworkAddress>为目标分析型数据库PostgreSQL版数据库的主机地址,<PortId>为连接端口,<yourDatabaseName>为连接的数据库。

示例:url=’jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres‘

tableName

表名

无。

username

账号

无。

password

密码

无。

maxRetryTimes

写入重试次数

默认为3。

useCopy

是否采用copy API写入数据

默认为1,表示采用copy API方式写入;

当取值为0时,代表根据writeMode字段采用其他方式写入数据。

batchSize

一次批量写入的条数

默认值为5000。

exceptionMode

当存在写入过程中出现异常时的处理策略

支持以下两种取值:

1)"ignore": 忽略出现导致写入异常的数据;

2)"strict": 日志记录导致写入异常的数据,然后停止任务;

默认取值为"ignore"

conflictMode

当出现主键冲突或者唯一索引冲突时的处理策略

支持以下三种取值:

1)"ignore": 忽略出现导致主键冲突的数据;

2)"strict": 日志记录导致主键冲突的数据,然后停止任务;

3)"update":当出现主键冲突时更新为新值。

4) "upsert": 以insert on conflict方式处理主键冲突。

默认取值为"ignore"

targetSchema

schema名称

默认值为"public"

writeMode

在useCopy字段基础上,更细分的写入方式

默认值为1,代表采用copy API写入数据;

在useCopy字段为0的场景下,可以设定writeMode字段采用其他写入方式:

writeMode=0 :采用insert方式写入数据;

writeMode=2:采用upsert方式写入数据。

upsert含义见文档

注意采用upsert方式写入时需要设定主键字段,设定主键的方式参考示例语句。

/

相关实践学习
阿里云百炼xAnalyticDB PostgreSQL构建AIGC应用
通过该实验体验在阿里云百炼中构建企业专属知识库构建及应用全流程。同时体验使用ADB-PG向量检索引擎提供专属安全存储,保障企业数据隐私安全。
AnalyticDB PostgreSQL 企业智能数据中台:一站式管理数据服务资产
企业在数据仓库之上可构建丰富的数据服务用以支持数据应用及业务场景;ADB PG推出全新企业智能数据平台,用以帮助用户一站式的管理企业数据服务资产,包括创建, 管理,探索, 监控等; 助力企业在现有平台之上快速构建起数据服务资产体系
目录
相关文章
|
5月前
|
存储 缓存 Cloud Native
MPP架构数据仓库使用问题之ADB PG云原生版本的扩缩容性能怎么样
MPP架构数据仓库使用问题之ADB PG云原生版本的扩缩容性能怎么样
MPP架构数据仓库使用问题之ADB PG云原生版本的扩缩容性能怎么样
|
7天前
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。
|
2月前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
480 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
3月前
|
存储 数据采集 大数据
Flink实时湖仓,为汽车行业数字化加速!
本文由阿里云计算平台产品专家李鲁兵(云觉)分享,聚焦汽车行业大数据应用。内容涵盖市场趋势、典型大数据架构、产品市场地位及能力解读,以及典型客户案例。文章详细介绍了新能源汽车市场的快速增长、大数据架构分析、实时湖仓方案的优势,以及Flink和Paimon在车联网中的应用案例。
206 8
Flink实时湖仓,为汽车行业数字化加速!
|
2月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
78 1
|
3月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
101 3
|
4月前
|
存储 数据采集 OLAP
饿了么基于Flink+Paimon+StarRocks的实时湖仓探索
饿了么的实时数仓经历了多个阶段的演进。初期通过实时ETL、报表应用、联动及监控构建基础架构,随后形成了涵盖数据采集、加工和服务的整体数据架构。1.0版本通过日志和Binlog采集数据,但在研发效率和数据一致性方面存在问题。2.0版本通过Dataphin构建流批一体化系统,提升了数据一致性和研发效率,但仍面临新业务适应性等问题。最终,饿了么选择Paimon和StarRocks作为实时湖仓方案,显著降低了存储成本并提高了系统稳定性。未来,将进一步优化带宽瓶颈、小文件问题及权限控制,实现更多场景的应用。
461 7
饿了么基于Flink+Paimon+StarRocks的实时湖仓探索
|
5月前
|
SQL 分布式计算 数据库
畅捷通基于Flink的实时数仓落地实践
本文整理自畅捷通总架构师、阿里云MVP专家郑芸老师在 Flink Forward Asia 2023 中闭门会上的分享。
8337 15
畅捷通基于Flink的实时数仓落地实践
|
5月前
|
Cloud Native 安全 调度
Flink 新一代流计算和容错问题之Flink 通过云原生技术改进容错设计要如何操作
Flink 新一代流计算和容错问题之Flink 通过云原生技术改进容错设计要如何操作
|
5月前
|
运维 Cloud Native 数据库
Flink 新一代流计算和容错问题之将 Flink 的容错与云原生的弹性扩缩容相结合要怎么操作
Flink 新一代流计算和容错问题之将 Flink 的容错与云原生的弹性扩缩容相结合要怎么操作