内容简介和数据格式需求
在图数据库的应用场景中,部分客户会选择先将数据写入到其它数据库中,再进一步将数据同步到图数据库中进行图查询。
PolarDB-PostgreSQL兼容版(简称PolarDB-PG)有着强大的一库统管能力,鼓励用户将多模异构的数据统一放在PolarDB-PG中进行处理。同时,也可以单独使用其多模的能力。
本文以MySQL数据源为例,介绍了通过DTS任务将MySQL写入的数据,通过搭建实时同步链路,同步到PolarDB-PG所管理的图数据库的全过程。
这里我们要求我们的写入的节点和边数据有一列在其类型(label)中唯一的,小于2^48的id。而边数据除唯一id之外,还有两列分别指定其起点和终点对应的节点的id。
对于没有唯一id、或者唯一id不为整数类型的节点和边,通常可以添加一列serial类型的列作为唯一id。如果没有唯一id的帮助,我们将无法同步对数据的更改和删除。这列唯一id可以在数据表中使用serial的特性自动生成而不必手动插入,同时可以选择不将此列id加入到图。
样例场景
我们开通一个PolarDB-MySQL实例和一个PolarDB-PG实例。在PolarDB-MySQL实例上,我们将新增的数据写入到表中,通过DTS任务同步到PolarDB-PostgreSQL的图数据库中。
假设我们要同步的图数据由三部分组成:两个点表A和B,记录图上的点,各自有一个唯一id;一个边表C,记录其起点在A中和终点在B中的唯一ID。要在PolarDB-PostgreSQL中建立一张名为gra的图,包含A,B两种类型的节点和C这种类型的边。
在PolarDB-MySQL中的表定义为
CREATE TABLE raw_A(id integer, name text, `desc` text, time_created timestamp); CREATE TABLE raw_B(id integer, name text, `desc` text, `value` integer, time_created timestamp); CREATE TABLE raw_C(id integer, id_a integer, id_b integer);
对于MySQL数据库,在使用DTS前,需要开启binlog功能。
之后,需要先在目标数据库库创建图。首先创建插件
CREATE EXTENSION age; SET search_path = "$user", public, ag_catalog; ALTER DATABASE <dbname> SET search_path = "$user", public, ag_catalog; ALTER DATABASE <dbname> SET session_preload_libraries TO 'age';
然后,使用SQL语句创建图、点和边
SELECT create_graph('gra'); SELECT create_vlabel('gra', 'label_a'); SELECT create_vlabel('gra', 'label_b'); SELECT create_elabel('gra', 'edge_c');
通过DTS将数据同步到PolarDB-PG
在DTS控制台中,选择数据同步项目,并选择创建任务
在创建界面,选择源库为MySQL,目标库为PostgreSQL,并均选择 “专线/VPN网关/智能网关”方式接入,
然后根据实际的实例地区,vpc网段,集群地址,端口,用户名和密码。
之后在同步对象选择中,选择对应数据库下,TABLE栏目中的raw_A, raw_B, raw_C三张表。之后一直确认,等待同步完成。
通过触发器将数据同步到图
首先创建如下辅助函数:
CREATE OR REPLACE FUNCTION age_name_to_idx_start(graph_name text, kind_name text, label_name text) RETURNS bigint AS 'SELECT id::bigint<<48 FROM ag_catalog.ag_label WHERE kind = kind_name and name = label_name and graph = (SELECT graphid FROM ag_catalog.ag_graph WHERE name = graph_name)' language SQL IMMUTABLE STRICT PARALLEL SAFE; CREATE OR REPLACE FUNCTION build_age_triggers_for_vertex(table_name text, table_id_col text, graph_name text, graph_label text) RETURNS BOOL AS $outer$ DECLARE column_names TEXT; sql TEXT; BEGIN SELECT string_agg(format('val.%I', column_name), ', ') INTO column_names FROM information_schema.columns WHERE columns.table_name = build_age_triggers_for_vertex.table_name; sql := $$ CREATE OR REPLACE FUNCTION _sync_$$ || table_name || $$_row_to_id(id bigint) RETURNS graphid AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''v'', ''$$ || graph_label|| $$'') + id)::text::graphid' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || table_name || $$_row_to_properties(val $$ || table_name || $$) RETURNS agtype AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || table_name || $$() RETURNS TRIGGER AS $inner$ BEGIN IF TG_OP = 'INSERT' THEN INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, properties) VALUES (_sync_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || table_name || $$_row_to_properties(NEW)); RETURN NEW; ELSIF TG_OP = 'UPDATE' THEN UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET properties = _sync_raw_A_row_to_properties(NEW) WHERE id = _sync_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN NEW; ELSIF TG_OP = 'DELETE' THEN DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN OLD; END IF; RETURN NULL; END; $inner$ LANGUAGE plpgsql; CREATE OR REPLACE TRIGGER _sync_$$ || table_name || $$_insert AFTER INSERT ON $$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || table_name || $$_update AFTER UPDATE ON $$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || table_name || $$_delete AFTER DELETE ON $$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || table_name || $$(); ALTER TABLE $$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || table_name || $$_insert; ALTER TABLE $$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || table_name || $$_update; ALTER TABLE $$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || table_name || $$_delete; $$; EXECUTE sql; RETURN true; END; $outer$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION build_age_triggers_for_edge(table_name text, table_id_col text, start_table_name text, start_id_col text, end_table_name text, end_id_col text, graph_name text, graph_label text) RETURNS BOOL AS $outer$ DECLARE column_names TEXT; sql TEXT; BEGIN SELECT string_agg(format('val.%I', column_name), ', ') INTO column_names FROM information_schema.columns WHERE columns.table_name = build_age_triggers_for_edge.table_name; sql := $$ CREATE OR REPLACE FUNCTION _sync_$$ || table_name || $$_row_to_id(id bigint) RETURNS graphid AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''e'', ''$$ || graph_label|| $$'') + id)::text::graphid' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || table_name || $$_row_to_properties(val $$ || table_name || $$) RETURNS agtype AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || table_name || $$() RETURNS TRIGGER AS $inner$ BEGIN IF TG_OP = 'INSERT' THEN INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, start_id, end_id, properties) VALUES (_sync_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), _sync_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), _sync_$$ || table_name || $$_row_to_properties(NEW)); RETURN NEW; ELSIF TG_OP = 'UPDATE' THEN UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET start_id = _sync_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), end_id = _sync_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), properties = _sync_raw_A_row_to_properties(NEW) WHERE id = _sync_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN NEW; ELSIF TG_OP = 'DELETE' THEN DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN OLD; END IF; RETURN NULL; END; $inner$ LANGUAGE plpgsql; CREATE OR REPLACE TRIGGER _sync_$$ || table_name || $$_insert AFTER INSERT ON $$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || table_name || $$_update AFTER UPDATE ON $$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || table_name || $$_delete AFTER DELETE ON $$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || table_name || $$(); ALTER TABLE $$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || table_name || $$_insert; ALTER TABLE $$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || table_name || $$_update; ALTER TABLE $$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || table_name || $$_delete; $$; EXECUTE sql; RETURN true; END; $outer$ LANGUAGE plpgsql;
然后只需要执行
select build_age_triggers_for_vertex('raw_a', 'id', 'gra', 'label_a'); select build_age_triggers_for_vertex('raw_b', 'id', 'gra', 'label_b'); select build_age_triggers_for_edge('raw_c', 'id', 'raw_a', 'id_a', 'raw_b', 'id_b', 'gra', 'edge_c');
即可构建从同步表到图中的触发器。(注意:请统一使用小写,大小写敏感)
上述触发器只能同步增量数据,对于有存量数据的表,需要在创建上述触发器之后执行
INSERT INTO "gra"."label_a" (id, properties) SELECT sync_a_row_to_id(raw_A.id), sync_a_row_to_properties(raw_A) FROM raw_A; INSERT INTO "gra"."label_b" (id, properties) SELECT sync_b_row_to_id(raw_B.id), sync_b_row_to_properties(raw_B) FROM raw_B; INSERT INTO "gra"."edge_c" (id, start_id, end_id, properties) SELECT sync_c_row_to_id(raw_C.id), sync_a_row_to_id(raw_C.id_a), sync_b_row_to_id(raw_C.id_b), sync_c_row_to_properties(raw_C) FROM raw_C;
测试验证
在MySQL中,先向同步的表中插入测试数据:
INSERT INTO raw_a values(1,1,1,'2000-01-01'); INSERT INTO raw_b values(1,1,1,1,'2000-01-01'); INSERT INTO raw_c values(1,1,1);
然后使用cypher语言进行图查询,验证数据插入成功:
SELECT * FROM cypher('gra', $$ MATCH (v) RETURN v $$) as (v agtype); ------ {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "1", "time_created": "2000-01-01T00:00:00"}}::vertex {"id": 1125899906842625, "label": "label_b", "properties": {"id": 1, "desc": "1", "name": "1", "value": 1, "time_created": "2000-01-01T00:00:00"}}::vertex SELECT * FROM cypher('gra', $$ MATCH (v)-[e]->(v2) RETURN e $$) as (e agtype); ------ {"id": 1407374883553281, "label": "edge_c", "end_id": 1125899906842625, "start_id": 844424930131969, "properties": {"id": "11"}}::edge
接着验证修改属性
-- 在源库上 UPDATE raw_a SET name = '2' WHERE id = 1;
在图中使用SQL进行查询,看到边的属性已被修改
-- 在图中 SELECT * FROM cypher('gra', $$ MATCH (v:label_a {id:1}) RETURN v $$) as (v agtype); ----- {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "2", "time_created": "2000-01-01T00:00:00"}}::vertex
最后验证删除数据
-- 在源库上 DELETE FROM raw_c WHERE id = 1;
看到图中的数据也同步地删除了
-- 在图中 SELECT * FROM cypher('gra', $$ MATCH (v)-[e]->(v2) RETURN e $$) as (e agtype); -----
补充和注意事项
- 在搭建同步链路的过程中不可以写入数据,否则这部分数据会无法导入图。
- 在搭建完成DTS链路后,不能再修改同步表的数据结构(如增加、删除列等操作)。否则可能导致后续无法同步。
- 上述辅助函数会将全部的列作为属性加入到图的属性中,如果希望调整加入到图中的属性,可以修改形如_sync_<表名>_row_to_properties函数的定义(可以通过psql的\df+命令等方式查看到函数的sql定义),其常规定义如下面所示,需要修改加入的列或对列的值进行修改的,可以修改
select val.id, val.id_a, val.id_b
部分。例如如果希望将id_a和id_b两列进行拼接,可以改为select val.id_a::text || val.id_b::text AS id
CREATE OR REPLACE FUNCTION _sync_raw_C_row_to_properties(val raw_C) RETURNS agtype AS 'SELECT row_to_json((select x FROM (select val.id, val.id_a, val.id_b) x))::text::agtype' LANGUAGE SQL;
- 在写入时,对于一条边,需要确保其两侧的点已经插入,再写入这条边,否则可能造成图数据库在查询时因为找不到对应的点而产生错误