【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山

简介: 本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。

本文内容来自YashanDB官网,原文内容请见 https://www.yashandb.com/newsinfo/7396983.html?templateId=1718516

概述

本文主要介绍通过flink cdc实现oracle数据实时同步到崖山,支持全量和增量,DML支持新增、修改和删除。

环境

JDK版本:11

Flink版本:1.18.1

flink-sql-connector-oracle-cdc版本:3.1.1

flink-connector-yashandb版本:1.18.1.1

Streampark版本:2.1.4

YMP版本:23.2.1.5

源Oracle版本:11.2.0.2.0

目标YashanDB版本:23.2.2.100

操作步骤

Oracle启用日志归档

Step1:以DBA权限登录Oracle数据库

sqlplus /nolog

CONNECT sys/system AS SYSDBA

Step2:启用日志归档

-- 确认归档日志是否已开启,未开启则需开启

archive log list;

-- 查看db_recovery_file_dest参数

show parameter db_recovery_file_dest;

-- 设置数据库恢复文件目标大小为10G

alter system set db_recovery_file_dest_size = 10G;

-- 设置数据库恢复文件目标路径

alter system set db_recovery_file_dest = '/u01/app/oracle/fast_recovery_area' scope=spfile;

-- 立即关闭数据库

shutdown immediate;

-- 以mount模式启动数据库

startup mount;

-- 启用数据库归档日志模式

alter database archivelog;

-- 打开数据库,允许用户访问

alter database open;

-- 再次确认归档日志是否已开启

archive log list;

用户赋权

Step1:创建表空间

-- 创建一个名为"logminer_tbs"的表空间

CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/XE/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

Step2:创建用户并赋予权限

-- 创建一个名为"flinkuser"的用户,密码为"flinkpw",将其默认表空间设置为"LOGMINER_TBS",并在该表空间上设置无限配额。

CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINERTBS;

 

-- 允许"flinkuser"用户创建会话,即允许该用户连接到数据库。

GRANT CREATE SESSION TO flinkuser;

 

-- (不支持Oracle 11g)允许"flinkuser"用户在多租户数据库(CDB)中设置容器。

-- GRANT SET CONTAINER TO flinkuser;

 

-- 允许"flinkuser"用户查询V$DATABASE视图,该视图包含有关数据库实例的信息。

GRANT SELECT ON V_$DATABASE TO flinkuser;

 

-- 允许"flinkuser"用户执行任何表的闪回操作。

GRANT FLASHBACK ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户查询任何表的数据。

GRANT SELECT ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户拥有SELECT_CATALOG_ROLE角色,该角色允许查询数据字典和元数据。

GRANT SELECT_CATALOG_ROLE TO flinkuser;

 

-- 允许"flinkuser"用户拥有EXECUTE_CATALOG_ROLE角色,该角色允许执行一些数据字典中的过程和函数。

GRANT EXECUTE_CATALOG_ROLE TO flinkuser;

 

-- 允许"flinkuser"用户查询任何事务。

GRANT SELECT ANY TRANSACTION TO flinkuser;

 

-- (不支持Oracle 11g)允许"flinkuser"用户进行数据变更追踪(LogMiner)。

-- GRANT LOGMINING TO flinkuser;

 

-- 允许"flinkuser"用户创建表。

GRANT CREATE TABLE TO flinkuser;

 

-- 允许"flinkuser"用户锁定任何表。

GRANT LOCK ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户修改任何表。

GRANT ALTER ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户创建序列。

GRANT CREATE SEQUENCE TO flinkuser;

 

-- 允许"flinkuser"用户执行DBMS_LOGMNR包中的过程。

GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;

 

-- 允许"flinkuser"用户执行DBMS_LOGMNR_D包中的过程。

GRANT EXECUTE ON DBMS_LOGMNRD TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOG视图,该视图包含有关数据库日志文件的信息。

GRANT SELECT ON V$LOG TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGHISTORY视图,该视图包含有关数据库历史日志文件的信息。

GRANT SELECT ON V$LOGHISTORY TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGMNRLOGS视图,该视图包含有关LogMiner日志文件的信息。

GRANT SELECT ON V$LOGMNRLOGS TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGMNRCONTENTS视图,该视图包含LogMiner日志文件的内容。

GRANT SELECT ON V$LOGMNRCONTENTS TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGMNRPARAMETERS视图,该视图包含有关LogMiner的参数信息。

GRANT SELECT ON V$LOGMNRPARAMETERS TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGFILE视图,该视图包含有关数据库日志文件的信息。

GRANT SELECT ON V$LOGFILE TO flinkuser;

 

-- 允许"flinkuser"用户查询V$ARCHIVEDLOG视图,该视图包含已归档的数据库日志文件的信息。

GRANT SELECT ON V$ARCHIVEDLOG TO flinkuser;

 

-- 允许"flinkuser"用户查询V$ARCHIVE_DESTSTATUS视图,该视图包含有关归档目标状态的信息。

GRANT SELECT ON V$ARCHIVE_DEST_STATUS TO flinkuser;

启用增量日志记录

-- 为数据库启用增强日志记录:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

迁移oracle元数据到YashanDB

可通过崖山迁移平台YMP进行迁移,迁移范围只需选择“元数据迁移1 ”和“元数据迁移2”即可,“数据迁移”不用选。

image2024-7-2_16-27-56

安装flink

Step1:创建flink安装用户

adduser -d /home/flink -m flink

passwd flink

flink

Step2:授权

chown -R flink:flink /data/flink

Step3:设置免密

cd ~

ssh-keygen # 一直按回车,按默认设置创建密钥对

ssh-copy-id flink@192.168.133.18

Step4:解压flink安装包

cd /data/flink

tar -zxvf flink-1.18.1-bin-scala_2.12.tgz

Step5:修改flink-conf.yaml配置:

cd /data/flink/flink-1.8.1/conf

vi flink-conf.yaml

1) xxx.bind-host和xxx.bind-address都设置成0.0.0.0

2)taskmanager.numberOfTaskSlots修改为和CPU核数一致:

taskmanager.numberOfTaskSlots: 8

3) 去掉注释并修改checkpoint和savepoints路径配置:

state.checkpoints.dir: file:///data/flink/flink-checkpoints

state.savepoints.dir: file:///data/flink/flink-savepoints

4) 去掉注释并修改classloader.resolve-order配置:

classloader.resolve-order: parent-first

Step6:安装flink-oracle-cdc和flink-connector-yashandb相关的jar包到flink

cp /tmp/flink/flink-sql-connector-oracle-cdc-3.1.1.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/ojdbc8-19.3.0.0.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/xdb-19.3.0.0.jar.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/flink-connector-yashandb-1.18.1.1.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/yashandb-jdbc-1.7.1.jar /data/flink/flink-1.18.1/lib

Step7:设置环境变量

vi ~/.bashrc

export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.19.0.7-1.el7_9.x86_64

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tool.jar

export FLINK_HOME=/data/flink/flink-1.18.1

export PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH

source ~/.bashrc

Step8:启动flink

cd /data/flink/flink-1.8.1/bin

./start-cluster.sh

生成flinksql文件

Step1:解压flinksql生成工具gen-flinksql-1.0-bin.zip

unzip gen-flinksql-1.0-bin.zip

Step2:修改gen-flinksql/conf/jdbc.properties配置文件

source.oracle.url = jdbc:oracle:thin:@//192.168.133.18:1521/xe

source.oracle.user = flinkuser

source.oracle.password = flinkpw

source.oracle.schema = SEARCHUSER #此处为需要同步的源库名

sink.yashandb.url = jdbc:yasdb://192.168.133.18:1688/yashandb

sink.yashandb.user = SEARCHUSER

sink.yashandb.password = yasdb_123

sink.yashandb.schema = SEARCHUSER #此处为需要同步的目标库名

Step3:执行生成flinksql文件命令:

cd gen-flinksql/bin

./gen-flinksql.sh oracle2yashandb /data/flink

执行完成后,会在/data/flink目录生成以schema命名的flink sql文件:SEARCHUSER.sql

安装streampark

Step1:解压streampark安装包

cd /data/flink

tar -zxvf apache-streampark_2.12-2.1.4-incubating-bin.tar.gz

Step2:启动streampark

cd /data/flink/apache-streampark_2.12-2.1.4-incubating-bin/bin

./startup.sh

访问地址:http://192.168.133.18/10000

admin/streampark

Step3:配置Flink Home

进入菜单setting - > Flink Home,点击Add New按钮:

image2024-7-2_16-48-48

Step4:配置Flink Cluster

进入菜单setting - > Flink Cluster,点击Add New按钮:

image2024-7-2_16-50-27

创建实时同步任务

Step1:进入菜单Apache Flink -> Application,Add New一个任务,Excution Mode选standalone,然后再选择对应的Flink Version和Flink Cluster,FlinkSQL输入gen-flinksql工具生成的sql内容,最后输入Job Name点submit按钮进行保存;

image2024-7-2_16-55-58

Step2:在任务列表界面Release Job进行job发布,再点Start Job按钮启动同步任务;

image2024-7-2_16-58-8

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
9月前
|
SQL DataX HIVE
【YashanDB知识库】DataX迁移Hive到崖山分布式
本文来自YashanDB官网,介绍通过DataX将Hive数据迁移到YashanDB的实现方法。源环境为Hive 3.1.3,目标为YashanDB 23.2.3.100。文章提供了Hive与YashanDB的建表脚本、数据类型映射及DataX配置示例,包含reader和writer插件参数设置,并通过`datax.py`执行同步任务。内容详尽展示了数据迁移的全流程。
【YashanDB知识库】DataX迁移Hive到崖山分布式
|
9月前
|
SQL Oracle 关系型数据库
【YashanDB知识库】共享利用Python脚本解决Oracle的SQL脚本@@用法
【YashanDB知识库】共享利用Python脚本解决Oracle的SQL脚本@@用法
|
9月前
|
Java 数据库连接
【YashanDB知识库】Springboot启动找不到崖山jdbc驱动的问题处理
本文来自YashanDB官网,主要解决SpringBoot应用启动时出现“找不到崖山JDBC驱动”的问题,尽管lib目录下已有yashandb-jdbc-1.6.9.jar文件。错误信息显示`java.lang.ClassNotFoundException: com.yashandb.jdbc.Driver`。解决方案为:通过`which java`等命令定位Java安装路径,将驱动jar包复制到JRE的`lib/ext`目录下,最后重启应用即可。
|
9月前
|
SQL Oracle 关系型数据库
|
9月前
|
SQL 关系型数据库 PostgreSQL
【YashanDB 知识库】从 PostgreSQL 迁移到 YashanDB 如何进行数据行数比对
【YashanDB 知识库】从 PostgreSQL 迁移到 YashanDB 如何进行数据行数比对
|
9月前
|
JSON 分布式计算 DataX
【YashanDB知识库】使用DataX工具迁移yashan数据到maxcompute
本文介绍使用崖山适配的DataX工具进行数据库迁移的方法,包括单表迁移和批量表迁移。单表迁移需配置json文件并执行同步命令;批量迁移则通过脚本自动化生成json配置文件并完成数据迁移,最后提供数据比对功能验证迁移结果。具体步骤涵盖连接信息配置、表清单获取、json文件生成、数据迁移执行及日志记录,确保数据一致性。相关工具和脚本简化了复杂迁移过程,提升效率。
|
9月前
|
SQL Oracle 关系型数据库
【YashanDB知识库】从PostgreSQL迁移到YashanDB如何进行数据行数比对
本文介绍了通过Oracle视图`v$sql`和`v$sql_plan`分析SQL性能的方法。首先,可通过`plan_hash_value`从`v$sql_plan`获取SQL执行计划,结合示例展示了具体查询方式。文章还创建了一个UDF函数`REPEAT`用于格式化输出,便于阅读复杂执行计划。最后,通过实例展示了如何根据`plan_hash_value`获取SQL文本及其内存中的执行计划,帮助优化性能问题。
|
9月前
|
SQL Oracle 关系型数据库
【YashanDB知识库】共享利用Python脚本解决Oracle的SQL脚本@@用法
本文来自YashanDB官网,介绍如何处理Oracle客户端sql*plus中使用@@调用同级目录SQL脚本的场景。崖山数据库23.2.x.100已支持@@用法,但旧版本可通过Python脚本批量重写SQL文件,将@@替换为绝对路径。文章通过Oracle示例展示了具体用法,并提供Python脚本实现自动化处理,最后调整批处理脚本以适配YashanDB运行环境。
|
4月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
511 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄

热门文章

最新文章

推荐镜像

更多