SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
简介: 通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。

SpringBoot 集成 Flink CDC 实时追踪 MySQL 数据变动

一、概述

Flink CDC 是一个基于 Apache Flink 的数据捕获工具,能够实时捕获和处理数据库的变动事件。通过集成 Flink CDC,可以实时追踪 MySQL 数据库中的数据变动,构建高效的数据处理和分析应用。本文将介绍如何在 SpringBoot 项目中集成 Flink CDC,并实现对 MySQL 数据变动的实时追踪。

二、准备工作

1. 环境准备

  • JDK 1.8+
  • Maven 3.6+
  • MySQL 数据库
  • Apache Flink 1.12+
  • SpringBoot 2.5+

2. 创建 MySQL 数据库和表

CREATE DATABASE test_db;

USE test_db;

CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
​

三、集成步骤

1. 引入依赖

在 SpringBoot 项目的 pom.xml 中添加必要的依赖:

<dependencies>
    <!-- Spring Boot Dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>

    <!-- Flink Dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>

    <!-- Flink CDC Dependencies -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.0.0</version>
    </dependency>
</dependencies>
​

2. 配置 Flink CDC

在 SpringBoot 项目中创建 Flink CDC 配置类:

import com.ververica.cdc.connectors.mysql.MySQLSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FlinkCdcConfig {

    @Bean
    public DataStreamSource<String> mysqlSource(StreamExecutionEnvironment env) {
        MySQLSource<String> source = MySQLSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("test_db")
            .tableList("test_db.users")
            .username("root")
            .password("password")
            .deserializer(new JsonDebeziumDeserializationSchema())
            .startupOptions(StartupOptions.initial())
            .build();

        return env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source");
    }
}
​

3. 创建 Flink 作业

在 SpringBoot 项目中创建 Flink 作业:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class FlinkJobRunner implements CommandLineRunner {

    private final StreamExecutionEnvironment env;
    private final DataStreamSource<String> mysqlSource;

    public FlinkJobRunner(StreamExecutionEnvironment env, DataStreamSource<String> mysqlSource) {
        this.env = env;
        this.mysqlSource = mysqlSource;
    }

    @Override
    public void run(String... args) throws Exception {
        mysqlSource.print();
        env.execute("Flink CDC Job");
    }
}
​

4. 启动 SpringBoot 应用

运行 SpringBoot 应用,启动后会自动执行 Flink 作业,并打印 MySQL 数据库中 users 表的变动。

四、验证和测试

1. 插入测试数据

向 MySQL 数据库中插入数据:

INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com');
INSERT INTO users (name, email) VALUES ('Bob', 'bob@example.com');
​

2. 验证输出

查看 SpringBoot 应用的控制台输出,确认是否正确捕获并打印了 MySQL 数据库中的变动。

五、总结

通过以上步骤,我们在 SpringBoot 项目中集成了 Flink CDC,实现了对 MySQL 数据变动的实时追踪。这种方法可以用于构建高效的实时数据处理和分析系统,适用于各种需要数据实时同步和处理的场景。

思维导图

- SpringBoot 集成 Flink CDC 实时追踪 MySQL 数据变动
  - 准备工作
    - 环境准备
    - 创建 MySQL 数据库和表
  - 集成步骤
    - 引入依赖
    - 配置 Flink CDC
    - 创建 Flink 作业
    - 启动 SpringBoot 应用
  - 验证和测试
    - 插入测试数据
    - 验证输出
  - 总结
​

通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
1天前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
188 10
Flink CDC YAML:面向数据集成的 API 设计
|
2月前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
171 17
|
2月前
|
存储 关系型数据库 MySQL
mysql怎么查询longblob类型数据的大小
通过本文的介绍,希望您能深入理解如何查询MySQL中 `LONG BLOB`类型数据的大小,并结合优化技术提升查询性能,以满足实际业务需求。
156 6
|
2月前
|
SQL 关系型数据库 MySQL
mysql分页读取数据重复问题
在服务端开发中,与MySQL数据库进行数据交互时,常因数据量大、网络延迟等因素需分页读取数据。文章介绍了使用`limit`和`offset`参数实现分页的方法,并针对分页过程中可能出现的数据重复问题进行了详细分析,提出了利用时间戳或确保排序规则绝对性等解决方案。
|
4月前
|
JavaScript 安全 Java
如何使用 Spring Boot 和 Ant Design Pro Vue 实现动态路由和菜单功能,快速搭建前后端分离的应用框架
本文介绍了如何使用 Spring Boot 和 Ant Design Pro Vue 实现动态路由和菜单功能,快速搭建前后端分离的应用框架。首先,确保开发环境已安装必要的工具,然后创建并配置 Spring Boot 项目,包括添加依赖和配置 Spring Security。接着,创建后端 API 和前端项目,配置动态路由和菜单。最后,运行项目并分享实践心得,包括版本兼容性、安全性、性能调优等方面。
241 1
|
15天前
|
JavaScript Java 测试技术
基于SpringBoot+Vue实现的留守儿童爱心网站设计与实现(计算机毕设项目实战+源码+文档)
博主是一位全网粉丝超过100万的CSDN特邀作者、博客专家,专注于Java、Python、PHP等技术领域。提供SpringBoot、Vue、HTML、Uniapp、PHP、Python、NodeJS、爬虫、数据可视化等技术服务,涵盖免费选题、功能设计、开题报告、论文辅导、答辩PPT等。系统采用SpringBoot后端框架和Vue前端框架,确保高效开发与良好用户体验。所有代码由博主亲自开发,并提供全程录音录屏讲解服务,保障学习效果。欢迎点赞、收藏、关注、评论,获取更多精品案例源码。
51 10
|
15天前
|
JavaScript Java 测试技术
基于SpringBoot+Vue实现的家政服务管理平台设计与实现(计算机毕设项目实战+源码+文档)
面向大学生毕业选题、开题、任务书、程序设计开发、论文辅导提供一站式服务。主要服务:程序设计开发、代码修改、成品部署、支持定制、论文辅导,助力毕设!
35 8
|
15天前
|
JavaScript 搜索推荐 Java
基于SpringBoot+Vue实现的家乡特色推荐系统设计与实现(源码+文档+部署)
面向大学生毕业选题、开题、任务书、程序设计开发、论文辅导提供一站式服务。主要服务:程序设计开发、代码修改、成品部署、支持定制、论文辅导,助力毕设!
28 8
|
15天前
|
JavaScript NoSQL Java
基于SpringBoot+Vue实现的大学生就业服务平台设计与实现(系统源码+文档+数据库+部署等)
面向大学生毕业选题、开题、任务书、程序设计开发、论文辅导提供一站式服务。主要服务:程序设计开发、代码修改、成品部署、支持定制、论文辅导,助力毕设!
44 6
|
15天前
|
JavaScript Java 测试技术
基于Java+SpringBoot+Vue实现的车辆充电桩系统设计与实现(系统源码+文档+部署讲解等)
面向大学生毕业选题、开题、任务书、程序设计开发、论文辅导提供一站式服务。主要服务:程序设计开发、代码修改、成品部署、支持定制、论文辅导,助力毕设!
39 6