1. 环境信息
类型 | 版本/描述 |
docker | 20.10.9 |
Postgresql | 10.6 |
初始化账号密码:postgres/postgres 普通用户:test1/test123 数据库:test_db |
|
flink | 1.13.6 |
2. 安装
step1: 拉取 PostgreSQL 10.6 版本的镜像:
docker pull postgres:10.6
step2:创建并启动 PostgreSQL
容器,在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为postgres
,并将 pgoutput
插件加载到 PostgreSQL
实例中:
docker run -d -p 30028:5432 --name postgres-10.6 -e POSTGRES_PASSWORD=postgres postgres:10.6 -c 'shared_preload_libraries=pgoutput'
step3: 查看容器是否创建成功:
docker ps | grep postgres-10.6
3. 配置
step1:docker进去Postgresql数据的容器:
docker exec -it postgres-10.6 bash
step2:编辑postgresql.conf
配置文件:
vi /var/lib/postgresql/data/postgresql.conf
配置内容如下:
# 更改wal日志方式为logical(方式有:minimal、replica 、logical ) wal_level = logical # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots max_replication_slots = 20 # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样 max_wal_senders = 20 # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用) wal_sender_timeout = 180s
step3:重启容器:
docker restart postgres-10.6
连接数据库,如果查询一下语句,返回logical
表示修改成功:
SHOW wal_level;
4. 新建用户并赋权
使用创建容器时的账号密码(postgres/postgres
)登录Postgresql数据库。
先创建数据库和表:
-- 创建数据库 test_db CREATE DATABASE test_db; -- 连接到新创建的数据库 test_db \c test_db -- 创建 t_user 表 CREATE TABLE "public"."t_user" ( "id" int8 NOT NULL, "name" varchar(255), "age" int2, PRIMARY KEY ("id") );
新建用户并且给用户权限:
-- pg新建用户 CREATE USER test1 WITH PASSWORD 'test123'; -- 给用户复制流权限 ALTER ROLE test1 replication; -- 给用户登录数据库权限 GRANT CONNECT ON DATABASE test_db to test1; -- 把当前库public下所有表查询权限赋给用户 GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;
5. 发布表
-- 设置发布为true update pg_publication set puballtables=true where pubname is not null; -- 把所有表进行发布 CREATE PUBLICATION dbz_publication FOR ALL TABLES; -- 查询哪些表已经发布 select * from pg_publication_tables;
更改表的复制标识包含更新和删除的值:
-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性) ALTER TABLE t_user REPLICA IDENTITY FULL; -- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置) select relreplident from pg_class where relname='t_user';
6. flink sql
-- 源表定义 CREATE TABLE `table_source_pg` ( id BIGINT, name STRING, age INT ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = '10.194.183.120', 'port' = '30028', 'username' = 'test1', 'password' = 'test123', 'database-name' = 'test_db', 'schema-name' = 'public', 'table-name' = 't_user', 'decoding.plugin.name' = 'pgoutput' ) -- 目标表表定义 CREATE TABLE `table_sink_mysql` ( id BIGINT, name STRING, age INT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.194.183.120:30306/test', 'username' = 'root', 'password' = 'root', 'table-name' = 't_user_copy' ) -- insert语句 INSERT INTO `table_sink_mysql` (`id`, `name`, `age`) (SELECT `id`, `name`, `age` FROM `table_source_pg`)
7. 命令汇总
-- pg新建用户 CREATE USER test1 WITH PASSWORD 'test123'; -- 给用户复制流权限 ALTER ROLE ODPS_ETL replication; -- 给用户数据库权限 GRANT CONNECT ON DATABASE test_db to test1; -- 设置发布开关 update pg_publication set puballtables=true where pubname is not null; -- 把所有表进行发布 CREATE PUBLICATION dbz_publication FOR ALL TABLES; -- 查询哪些表已经发布 select * from pg_publication_tables; -- 给表查询权限 grant select on TABLE aa to ODPS_ETL; -- 给用户读写权限 grant select,insert,update,delete ON ALL TABLES IN SCHEMA public to bd_test; -- 把当前库所有表查询权限赋给用户 GRANT SELECT ON ALL TABLES IN SCHEMA public TO ODPS_ETL; -- 把当前库以后新建的表查询权限赋给用户 alter default privileges in schema public grant select on tables to ODPS_ETL; -- 更改复制标识包含更新和删除之前值 ALTER TABLE test0425 REPLICA IDENTITY FULL; -- 查看复制标识 select relreplident from pg_class where relname='test0425'; -- 查看solt使用情况 SELECT * FROM pg_replication_slots; -- 删除solt SELECT pg_drop_replication_slot('zd_org_goods_solt'); -- 查询用户当前连接数 select usename, count(*) from pg_stat_activity group by usename order by count(*) desc; -- 设置用户最大连接数 alter role odps_etl connection limit 200;
附: