基于PolarDB的图分析:通过DTS将其它数据库的数据表同步到PolarDB的图

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 本文介绍了使用DTS任务将数据从MySQL等数据源实时同步到PolarDB-PG的图数据库中的步骤.

内容简介和数据格式需求

在图数据库的应用场景中,部分客户会选择先将数据写入到其它数据库中,再进一步将数据同步到图数据库中进行图查询。

PolarDB-PostgreSQL兼容版(简称PolarDB-PG)有着强大的一库统管能力,鼓励用户将多模异构的数据统一放在PolarDB-PG中进行处理。同时,也可以单独使用其多模的能力。

本文以MySQL数据源为例,介绍了通过DTS任务将MySQL写入的数据,通过搭建实时同步链路,同步到PolarDB-PG所管理的图数据库的全过程。

这里我们要求我们的写入的节点和边数据有一列在其类型(label)中唯一的,小于2^48的id。而边数据除唯一id之外,还有两列分别指定其起点和终点对应的节点的id。

对于没有唯一id、或者唯一id不为整数类型的节点和边,通常可以添加一列serial类型的列作为唯一id。如果没有唯一id的帮助,我们将无法同步对数据的更改和删除。这列唯一id可以在数据表中使用serial的特性自动生成而不必手动插入,同时可以选择不将此列id加入到图。

样例场景

我们开通一个PolarDB-MySQL实例和一个PolarDB-PG实例。在PolarDB-MySQL实例上,我们将新增的数据写入到表中,通过DTS任务同步到PolarDB-PostgreSQL的图数据库中。

假设我们要同步的图数据由三部分组成:两个点表A和B,记录图上的点,各自有一个唯一id;一个边表C,记录其起点在A中和终点在B中的唯一ID。要在PolarDB-PostgreSQL中建立一张名为gra的图,包含A,B两种类型的节点和C这种类型的边。

在PolarDB-MySQL中的表定义为

CREATE TABLE raw_A(id integer, name text, `desc` text, time_created timestamp);
CREATE TABLE raw_B(id integer, name text, `desc` text, `value` integer, time_created timestamp);
CREATE TABLE raw_C(id integer, id_a integer, id_b integer);

对于MySQL数据库,在使用DTS前,需要开启binlog功能。

之后,需要先在目标数据库库创建图。首先创建插件

CREATE EXTENSION age;
SET search_path = "$user", public, ag_catalog;
ALTER DATABASE <dbname>
SET search_path = "$user", public, ag_catalog;
ALTER DATABASE <dbname> SET session_preload_libraries TO 'age';

然后,使用SQL语句创建图、点和边

SELECT create_graph('gra');
SELECT create_vlabel('gra', 'label_a');
SELECT create_vlabel('gra', 'label_b');
SELECT create_elabel('gra', 'edge_c');

通过DTS将数据同步到PolarDB-PG

DTS控制台中,选择数据同步项目,并选择创建任务

在创建界面,选择源库为MySQL,目标库为PostgreSQL,并均选择 “专线/VPN网关/智能网关”方式接入,

然后根据实际的实例地区,vpc网段,集群地址,端口,用户名和密码。

之后在同步对象选择中,选择对应数据库下,TABLE栏目中的raw_A, raw_B, raw_C三张表。之后一直确认,等待同步完成。

通过触发器将数据同步到图

首先创建如下辅助函数:

CREATE OR REPLACE FUNCTION age_name_to_idx_start(graph_name text, kind_name text, label_name text)
RETURNS bigint
AS 'SELECT id::bigint<<48 FROM ag_catalog.ag_label WHERE kind = kind_name and name = label_name and graph = (SELECT graphid FROM ag_catalog.ag_graph WHERE name = graph_name)'
language SQL IMMUTABLE STRICT PARALLEL SAFE;
CREATE OR REPLACE FUNCTION build_age_triggers_for_vertex(table_name text, table_id_col text, graph_name text, graph_label text)
RETURNS BOOL
AS
$outer$
DECLARE
  column_names TEXT;
  sql TEXT;
BEGIN
  SELECT string_agg(format('val.%I', column_name), ', ')
    INTO column_names
    FROM information_schema.columns
    WHERE columns.table_name = build_age_triggers_for_vertex.table_name;
  sql := $$
CREATE OR REPLACE FUNCTION _sync_$$ || table_name || $$_row_to_id(id bigint)
RETURNS graphid
AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''v'', ''$$ || graph_label|| $$'') + id)::text::graphid'
LANGUAGE SQL;
CREATE OR REPLACE FUNCTION _sync_$$ || table_name || $$_row_to_properties(val $$ || table_name || $$)
RETURNS agtype
AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'
LANGUAGE SQL;
CREATE OR REPLACE FUNCTION _sync_$$ || table_name || $$() RETURNS TRIGGER AS
$inner$
BEGIN
  IF TG_OP = 'INSERT' THEN
    INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, properties) VALUES (_sync_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || table_name || $$_row_to_properties(NEW));
    RETURN NEW;
  ELSIF TG_OP = 'UPDATE' THEN
    UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET properties = _sync_raw_A_row_to_properties(NEW) WHERE id = _sync_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
    RETURN NEW;
  ELSIF TG_OP = 'DELETE' THEN
    DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
    RETURN OLD;
  END IF;
  RETURN NULL;
END;
$inner$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER _sync_$$ || table_name || $$_insert
AFTER INSERT ON $$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || table_name || $$();
CREATE OR REPLACE TRIGGER _sync_$$ || table_name || $$_update
AFTER UPDATE ON $$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || table_name || $$();
CREATE OR REPLACE TRIGGER _sync_$$ || table_name || $$_delete
AFTER DELETE ON $$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || table_name || $$();
ALTER TABLE $$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || table_name || $$_insert;
ALTER TABLE $$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || table_name || $$_update;
ALTER TABLE $$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || table_name || $$_delete;
  $$;
  EXECUTE sql;
  RETURN true;
END;
$outer$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION build_age_triggers_for_edge(table_name text, table_id_col text, start_table_name text, start_id_col text, end_table_name text, end_id_col text, graph_name text, graph_label text)
RETURNS BOOL
AS
$outer$
DECLARE
  column_names TEXT;
  sql TEXT;
BEGIN
  SELECT string_agg(format('val.%I', column_name), ', ')
    INTO column_names
    FROM information_schema.columns
    WHERE columns.table_name = build_age_triggers_for_edge.table_name;
  sql := $$
CREATE OR REPLACE FUNCTION _sync_$$ || table_name || $$_row_to_id(id bigint)
RETURNS graphid
AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''e'', ''$$ || graph_label|| $$'') + id)::text::graphid'
LANGUAGE SQL;
CREATE OR REPLACE FUNCTION _sync_$$ || table_name || $$_row_to_properties(val $$ || table_name || $$)
RETURNS agtype
AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'
LANGUAGE SQL;
CREATE OR REPLACE FUNCTION _sync_$$ || table_name || $$() RETURNS TRIGGER AS
$inner$
BEGIN
  IF TG_OP = 'INSERT' THEN
    INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, start_id, end_id, properties) VALUES (_sync_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), _sync_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), _sync_$$ || table_name || $$_row_to_properties(NEW));
    RETURN NEW;
  ELSIF TG_OP = 'UPDATE' THEN
    UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET start_id = _sync_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), end_id = _sync_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), properties = _sync_raw_A_row_to_properties(NEW) WHERE id = _sync_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
    RETURN NEW;
  ELSIF TG_OP = 'DELETE' THEN
    DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
    RETURN OLD;
  END IF;
  RETURN NULL;
END;
$inner$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER _sync_$$ || table_name || $$_insert
AFTER INSERT ON $$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || table_name || $$();
CREATE OR REPLACE TRIGGER _sync_$$ || table_name || $$_update
AFTER UPDATE ON $$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || table_name || $$();
CREATE OR REPLACE TRIGGER _sync_$$ || table_name || $$_delete
AFTER DELETE ON $$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || table_name || $$();
ALTER TABLE $$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || table_name || $$_insert;
ALTER TABLE $$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || table_name || $$_update;
ALTER TABLE $$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || table_name || $$_delete;
  $$;
  EXECUTE sql;
  RETURN true;
END;
$outer$
LANGUAGE plpgsql;

然后只需要执行

select build_age_triggers_for_vertex('raw_a', 'id', 'gra', 'label_a');
select build_age_triggers_for_vertex('raw_b', 'id', 'gra', 'label_b');
select build_age_triggers_for_edge('raw_c', 'id', 'raw_a', 'id_a', 'raw_b', 'id_b', 'gra', 'edge_c');

即可构建从同步表到图中的触发器。(注意:请统一使用小写,大小写敏感)

上述触发器只能同步增量数据,对于有存量数据的表,需要在创建上述触发器之后执行

INSERT INTO "gra"."label_a" (id, properties) SELECT sync_a_row_to_id(raw_A.id), sync_a_row_to_properties(raw_A) FROM raw_A;
INSERT INTO "gra"."label_b" (id, properties) SELECT sync_b_row_to_id(raw_B.id), sync_b_row_to_properties(raw_B) FROM raw_B;
INSERT INTO "gra"."edge_c" (id, start_id, end_id, properties) SELECT sync_c_row_to_id(raw_C.id), sync_a_row_to_id(raw_C.id_a), sync_b_row_to_id(raw_C.id_b), sync_c_row_to_properties(raw_C) FROM raw_C;

测试验证

在MySQL中,先向同步的表中插入测试数据:

INSERT INTO raw_a values(1,1,1,'2000-01-01');
INSERT INTO raw_b values(1,1,1,1,'2000-01-01');
INSERT INTO raw_c values(1,1,1);

然后使用cypher语言进行图查询,验证数据插入成功:

SELECT * FROM cypher('gra', $$
MATCH (v)
RETURN v
$$) as (v agtype);
------
 {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "1", "time_created": "2000-01-01T00:00:00"}}::vertex
 {"id": 1125899906842625, "label": "label_b", "properties": {"id": 1, "desc": "1", "name": "1", "value": 1, "time_created": "2000-01-01T00:00:00"}}::vertex
SELECT * FROM cypher('gra', $$
MATCH (v)-[e]->(v2)
RETURN e
$$) as (e agtype);
------
 {"id": 1407374883553281, "label": "edge_c", "end_id": 1125899906842625, "start_id": 844424930131969, "properties": {"id": "11"}}::edge

接着验证修改属性

-- 在源库上
UPDATE raw_a SET name = '2' WHERE id = 1;

在图中使用SQL进行查询,看到边的属性已被修改

-- 在图中
SELECT * FROM cypher('gra', $$
MATCH (v:label_a {id:1})
RETURN v
$$) as (v agtype);
-----
 {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "2", "time_created": "2000-01-01T00:00:00"}}::vertex

最后验证删除数据

-- 在源库上
DELETE FROM raw_c WHERE id = 1;

看到图中的数据也同步地删除了

-- 在图中
SELECT * FROM cypher('gra', $$
MATCH (v)-[e]->(v2)
RETURN e
$$) as (e agtype);
-----

补充和注意事项

  1. 在搭建同步链路的过程中不可以写入数据,否则这部分数据会无法导入图。
  2. 在搭建完成DTS链路后,不能再修改同步表的数据结构(如增加、删除列等操作)。否则可能导致后续无法同步。
  3. 上述辅助函数会将全部的列作为属性加入到图的属性中,如果希望调整加入到图中的属性,可以修改形如_sync_<表名>_row_to_properties函数的定义(可以通过psql的\df+命令等方式查看到函数的sql定义),其常规定义如下面所示,需要修改加入的列或对列的值进行修改的,可以修改select val.id, val.id_a, val.id_b部分。例如如果希望将id_a和id_b两列进行拼接,可以改为select val.id_a::text || val.id_b::text AS id
CREATE OR REPLACE FUNCTION _sync_raw_C_row_to_properties(val raw_C)
RETURNS agtype
AS 'SELECT row_to_json((select x FROM (select val.id, val.id_a, val.id_b) x))::text::agtype'
LANGUAGE SQL;
  1. 在写入时,对于一条边,需要确保其两侧的点已经插入,再写入这条边,否则可能造成图数据库在查询时因为找不到对应的点而产生错误
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
11天前
|
SQL 关系型数据库 分布式数据库
基于PolarDB的图分析:银行金融领域图分析实践
本文介绍了如何使用阿里云PolarDB PostgreSQL版及其图数据库引擎(兼容Apache AGE,A Graph Extension)进行图数据分析,特别针对金融交易欺诈检测场景。PolarDB PostgreSQL版支持图数据的高效处理和查询,包括Cypher查询语言的使用。文章详细描述了从数据准备、图结构创建到具体查询示例的过程,展示了如何通过图查询发现欺诈交易的关联关系,计算交易间的Jaccard相似度,从而进行欺诈预警。
基于PolarDB的图分析:银行金融领域图分析实践
|
14天前
|
SQL 关系型数据库 分布式数据库
夺冠在即 | PolarDB数据库创新设计赛(天池杯)决赛答辩通知
2024年全国大学生计算机系统能力大赛PolarDB数据库创新设计赛(天池杯)于8月21日启动,吸引了200多所高校近千支队伍参赛。经过激烈角逐,60支队伍晋级决赛第一阶段,36支队伍脱颖而出进入现场答辩,将于12月29日在武汉大学争夺最终奖项。决赛要求选手基于PolarDB-PG开源代码部署集群并优化TPCH查询性能。完赛率超90%,成绩表现出明显梯度,前20名均在500秒内完成。评委来自学术界和工业界,确保评选公正。预祝选手们取得优异成绩!
|
29天前
|
Cloud Native 关系型数据库 分布式数据库
PolarDB 分布式版 V2.0,安全可靠的集中分布式一体化数据库管理软件
阿里云PolarDB数据库管理软件(分布式版)V2.0 ,安全可靠的集中分布式一体化数据库管理软件。
|
1月前
|
存储 数据采集 监控
阿里云DTS踩坑经验分享系列|SLS同步至ClickHouse集群
作为强大的日志服务引擎,SLS 积累了用户海量的数据。为了实现数据的自由流通,DTS 开发了以 SLS 为源的数据同步插件。目前,该插件已经支持将数据从 SLS 同步到 ClickHouse。通过这条高效的同步链路,客户不仅能够利用 SLS 卓越的数据采集和处理能力,还能够充分发挥 ClickHouse 在数据分析和查询性能方面的优势,帮助企业显著提高数据查询速度,同时有效降低存储成本,从而在数据驱动决策和资源优化配置上取得更大成效。
131 9
|
11天前
|
NoSQL 关系型数据库 分布式数据库
PolarDB图数据库快速入门
图数据库(Graph Database)专门存储图数据,适合处理社交网络、知识图谱等复杂关系。它使用图查询语言(如Cypher、Gremlin)进行操作。PolarDB兼容OpenCypher语法,支持创建、查询、更新和删除图数据,包括模式匹配、过滤、MERGE避免重复、可视化工具等功能,简化了图数据的管理和应用。
|
3月前
|
关系型数据库 MySQL 分布式数据库
零基础教你用云数据库PolarDB搭建企业网站,完成就送桌面收纳桶!
零基础教你用云数据库PolarDB搭建企业网站,完成就送桌面收纳桶,邀请好友完成更有机会获得​小米Watch S3、小米体重称​等诸多好礼!
零基础教你用云数据库PolarDB搭建企业网站,完成就送桌面收纳桶!
|
4月前
|
关系型数据库 MySQL Serverless
探索PolarDB MySQL版:Serverless数据库的灵活性与性能
本文介绍了个人开发者对阿里云PolarDB MySQL版,特别是其Serverless特性的详细评测体验。评测涵盖了产品初体验、性能观测、Serverless特性深度评测及成本效益分析等方面。尽管试用过程中遇到一些小问题,但总体而言,PolarDB MySQL版表现出色,提供了高性能、高可用性和灵活的资源管理,是个人开发者和企业用户的优秀选择。
|
5月前
|
关系型数据库 MySQL 分布式数据库
PolarDB 与传统数据库的性能对比分析
【8月更文第27天】随着云计算技术的发展,越来越多的企业开始将数据管理和存储迁移到云端。阿里云的 PolarDB 作为一款兼容 MySQL 和 PostgreSQL 的关系型数据库服务,提供了高性能、高可用和弹性伸缩的能力。本文将从不同角度对比 PolarDB 与本地部署的传统数据库(如 MySQL、PostgreSQL)在性能上的差异。
354 1
|
2月前
|
关系型数据库 分布式数据库 数据库
锦鲤附体 | PolarDB数据库创新设计赛,好礼不停!
锦鲤附体 | PolarDB数据库创新设计赛,好礼不停!

相关产品

  • 云原生数据库 PolarDB