Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL

简介: 本篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。
本篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。

Flink-CDC 项目地址:
https://github.com/ververica/flink-cdc-connectors

本教程的演示基于 Docker 环境,都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。

假设我们正在经营电子商务业务,商品和订单的数据存储在 MySQL 中,订单对应的物流信息存储在 Postgres 中。

对于订单表,为了方便进行分析,我们希望让它关联上其对应的商品和物流信息,构成一张宽表,并且实时把它写到 ElasticSearch 中。

接下来的内容将介绍如何使用 Flink Mysql/Postgres CDC 来实现这个需求,系统的整体架构如下图所示:

一、准备阶段

准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。

1.1 准备教程所需要的组件

接下来的教程将以 docker-compose 的方式准备所需要的组件。

使用下面的内容创建一个 docker-compose.yml 文件:

version: '2.1'
services:
  postgres:
    image: debezium/example-postgres:1.1
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_PASSWORD=1234
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
  elasticsearch:
    image: elastic/elasticsearch:7.6.0
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
  kibana:
    image: elastic/kibana:7.6.0
    ports:
      - "5601:5601"

该 Docker Compose 中包含的容器有:

  • MySQL:商品表 products 和 订单表 orders 将存储在该数据库中, 这两张表将和 Postgres 数据库中的物流表 shipments 进行关联,得到一张包含更多信息的订单表 enriched_orders
  • Postgres:物流表 shipments 将存储在该数据库中;
  • Elasticsearch:最终的订单表 enriched_orders 将写到 Elasticsearch;
  • Kibana:用来可视化 ElasticSearch 的数据。

docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:

docker-compose up -d

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。

注:本教程接下来用到的容器相关的命令也都需要在 docker-compose.yml 所在目录下执行。

1.2 下载 Flink 和所需要的依赖包

  1. 下载 Flink 1.13.2 [1] 并将其解压至目录 flink-1.13.2
  2. 下载下面列出的依赖包,并将它们放到目录 flink-1.13.2/lib/

[1] https://downloads.apache.org/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.11.tgz

1.3 准备数据

1.3.1 在 MySQL 数据库中准备数据

  1. 进入 MySQL 容器:

    docker-compose exec mysql mysql -uroot -p123456
  2. 创建数据库和表 productsorders,并插入数据:

    -- MySQL
    CREATE DATABASE mydb;
    USE mydb;
    CREATE TABLE products (
      id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      name VARCHAR(255) NOT NULL,
      description VARCHAR(512)
    );
    ALTER TABLE products AUTO_INCREMENT = 101;
    
    INSERT INTO products
    VALUES (default,"scooter","Small 2-wheel scooter"),
           (default,"car battery","12V car battery"),
           (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
           (default,"hammer","12oz carpenter's hammer"),
           (default,"hammer","14oz carpenter's hammer"),
           (default,"hammer","16oz carpenter's hammer"),
           (default,"rocks","box of assorted rocks"),
           (default,"jacket","water resistent black wind breaker"),
           (default,"spare tire","24 inch spare tire");
    
    CREATE TABLE orders (
      order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      order_date DATETIME NOT NULL,
      customer_name VARCHAR(255) NOT NULL,
      price DECIMAL(10, 5) NOT NULL,
      product_id INTEGER NOT NULL,
      order_status BOOLEAN NOT NULL -- Whether order has been placed
    ) AUTO_INCREMENT = 10001;
    
    INSERT INTO orders
    VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
           (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
           (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

1.3.2 在 Postgres 数据库中准备数据

  1. 进入 Postgres 容器:

    docker-compose exec postgres psql -h localhost -U postgres
  2. 创建表 shipments,并插入数据:

    -- PG
    CREATE TABLE shipments (
       shipment_id SERIAL NOT NULL PRIMARY KEY,
       order_id SERIAL NOT NULL,
       origin VARCHAR(255) NOT NULL,
       destination VARCHAR(255) NOT NULL,
       is_arrived BOOLEAN NOT NULL
     );
     ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
     ALTER TABLE public.shipments REPLICA IDENTITY FULL;
     INSERT INTO shipments
     VALUES (default,10001,'Beijing','Shanghai',false),
            (default,10002,'Hangzhou','Shanghai',false),
            (default,10003,'Shanghai','Hangzhou',false);

二、启动 Flink 集群和 Flink SQL CLI

  1. 使用下面的命令跳转至 Flink 目录下:

    cd flink-1.13.2
  2. 使用下面的命令启动 Flink 集群:

    ./bin/start-cluster.sh

    启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:

    image-20211117195311158

  3. 使用下面的命令启动 Flink SQL CLI

    ./bin/sql-client.sh

    启动成功后,可以看到如下的页面:

img

三、在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,开启 checkpoint,每隔 3 秒做一次 checkpoint。

-- Flink SQL                   Flink SQL> SET execution.checkpointing.interval = 3s;

然后, 对于数据库中的表 products, orders, shipments,使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据。

-- Flink SQLFlink SQL> CREATE TABLE products (    id INT,    name STRING,    description STRING,    PRIMARY KEY (id) NOT ENFORCED  ) WITH (    'connector' = 'mysql-cdc',    'hostname' = 'localhost',    'port' = '3306',    'username' = 'root',    'password' = '123456',    'database-name' = 'mydb',    'table-name' = 'products'  );Flink SQL> CREATE TABLE orders (   order_id INT,   order_date TIMESTAMP(0),   customer_name STRING,   price DECIMAL(10, 5),   product_id INT,   order_status BOOLEAN,   PRIMARY KEY (order_id) NOT ENFORCED ) WITH (   'connector' = 'mysql-cdc',   'hostname' = 'localhost',   'port' = '3306',   'username' = 'root',   'password' = '123456',   'database-name' = 'mydb',   'table-name' = 'orders' );Flink SQL> CREATE TABLE shipments (   shipment_id INT,   order_id INT,   origin STRING,   destination STRING,   is_arrived BOOLEAN,   PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH (   'connector' = 'postgres-cdc',   'hostname' = 'localhost',   'port' = '5432',   'username' = 'postgres',   'password' = 'postgres',   'database-name' = 'postgres',   'schema-name' = 'public',   'table-name' = 'shipments' );

最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中。

-- Flink SQLFlink SQL> CREATE TABLE enriched_orders (   order_id INT,   order_date TIMESTAMP(0),   customer_name STRING,   price DECIMAL(10, 5),   product_id INT,   order_status BOOLEAN,   product_name STRING,   product_description STRING,   shipment_id INT,   origin STRING,   destination STRING,   is_arrived BOOLEAN,   PRIMARY KEY (order_id) NOT ENFORCED ) WITH (     'connector' = 'elasticsearch-7',     'hosts' = 'http://localhost:9200',     'index' = 'enriched_orders' );

四、关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中。

-- Flink SQLFlink SQL> INSERT INTO enriched_orders SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived FROM orders AS o LEFT JOIN products AS p ON o.product_id = p.id LEFT JOIN shipments AS s ON o.order_id = s.order_id;

启动成功后,可以访问 http://localhost:8081/#/job/running 在 Flink Web UI 上看到正在运行的 Flink Streaming Job,如下图所示:

img

现在,就可以在 Kibana 中看到包含商品和物流信息的订单数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders

kibana-create-index-pattern

然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了。

kibana-detailed-orders

接下来,修改 MySQL 和 Postgres 数据库中表的数据,Kibana 中显示的订单数据也将实时更新。

  1. 在 MySQL 的 orders 表中插入一条数据:

    --MySQLINSERT INTO ordersVALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
  2. 在 Postgres 的 shipment 表中插入一条数据:

    --PGINSERT INTO shipmentsVALUES (default,10004,'Shanghai','Beijing',false);
  3. 在 MySQL 的 orders 表中更新订单的状态:

    --MySQLUPDATE orders SET order_status = true WHERE order_id = 10004;
  4. 在 Postgres 的 shipment 表中更新物流的状态:

    --PGUPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
  5. 在 MYSQL 的 orders 表中删除一条数据:

    --MySQLDELETE FROM orders WHERE order_id = 10004;

    每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新,如下所示:

    kibana-detailed-orders-changes

五、环境清理

本教程结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:

docker-compose down

在 Flink 所在目录 flink-1.13.2 下执行如下命令停止 Flink 集群:

./bin/stop-cluster.sh

六、总结

在本文中,我们以一个简单的业务场景展示了如何使用 Flink CDC 快速构建 Streaming ETL。希望通过本文,能够帮助读者快速上手 Flink CDC ,也希望 Flink CDC 能满足你的业务需求。

更多 Flink CDC 相关技术问题,可扫码加入社区钉钉交流群~

img


近期热点

img


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

image.png

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
消息中间件 存储 传感器
241 0
|
9月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
Flink Materialized Table:构建流批一体 ETL
186 3
|
9月前
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
644 2
|
9月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
986 0
|
10月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
789 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
10月前
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
本文整理自鹰角网络大数据开发工程师朱正军在Flink Forward Asia 2024上的分享,主要涵盖四个方面:鹰角数据平台架构、数据湖选型、湖仓一体建设及未来展望。文章详细介绍了鹰角如何构建基于Paimon的数据湖,解决了Hudi入湖的痛点,并通过Trino引擎和Ranger权限管理实现高效的数据查询与管控。此外,还探讨了湖仓一体平台的落地效果及未来技术发展方向,包括Trino与Paimon的集成增强、StarRocks的应用以及Paimon全面替换Hive的计划。
1252 1
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
|
3月前
|
关系型数据库 MySQL 分布式数据库
阿里云PolarDB云原生数据库收费价格:MySQL和PostgreSQL详细介绍
阿里云PolarDB兼容MySQL、PostgreSQL及Oracle语法,支持集中式与分布式架构。标准版2核4G年费1116元起,企业版最高性能达4核16G,支持HTAP与多级高可用,广泛应用于金融、政务、互联网等领域,TCO成本降低50%。
|
3月前
|
SQL 关系型数据库 MySQL
Mysql数据恢复—Mysql数据库delete删除后数据恢复案例
本地服务器,操作系统为windows server。服务器上部署mysql单实例,innodb引擎,独立表空间。未进行数据库备份,未开启binlog。 人为误操作使用Delete命令删除数据时未添加where子句,导致全表数据被删除。删除后未对该表进行任何操作。需要恢复误删除的数据。 在本案例中的mysql数据库未进行备份,也未开启binlog日志,无法直接还原数据库。
|
3月前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS费用价格:MySQL、SQL Server、PostgreSQL和MariaDB引擎收费标准
阿里云RDS数据库支持MySQL、SQL Server、PostgreSQL、MariaDB,多种引擎优惠上线!MySQL倚天版88元/年,SQL Server 2核4G仅299元/年,PostgreSQL 227元/年起。高可用、可弹性伸缩,安全稳定。详情见官网活动页。
802 152
|
3月前
|
关系型数据库 分布式数据库 数据库
阿里云数据库收费价格:MySQL、PostgreSQL、SQL Server和MariaDB引擎费用整理
阿里云数据库提供多种类型,包括关系型与NoSQL,主流如PolarDB、RDS MySQL/PostgreSQL、Redis等。价格低至21元/月起,支持按需付费与优惠套餐,适用于各类应用场景。

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多