SpringBoot 集成 Flink CDC 实时追踪 MySQL 数据变动
一、概述
Flink CDC 是一个基于 Apache Flink 的数据捕获工具,能够实时捕获和处理数据库的变动事件。通过集成 Flink CDC,可以实时追踪 MySQL 数据库中的数据变动,构建高效的数据处理和分析应用。本文将介绍如何在 SpringBoot 项目中集成 Flink CDC,并实现对 MySQL 数据变动的实时追踪。
二、准备工作
1. 环境准备
- JDK 1.8+
- Maven 3.6+
- MySQL 数据库
- Apache Flink 1.12+
- SpringBoot 2.5+
2. 创建 MySQL 数据库和表
CREATE DATABASE test_db;
USE test_db;
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
三、集成步骤
1. 引入依赖
在 SpringBoot 项目的 pom.xml
中添加必要的依赖:
<dependencies>
<!-- Spring Boot Dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- Flink Dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<!-- Flink CDC Dependencies -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
2. 配置 Flink CDC
在 SpringBoot 项目中创建 Flink CDC 配置类:
import com.ververica.cdc.connectors.mysql.MySQLSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FlinkCdcConfig {
@Bean
public DataStreamSource<String> mysqlSource(StreamExecutionEnvironment env) {
MySQLSource<String> source = MySQLSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("test_db")
.tableList("test_db.users")
.username("root")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
return env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source");
}
}
3. 创建 Flink 作业
在 SpringBoot 项目中创建 Flink 作业:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class FlinkJobRunner implements CommandLineRunner {
private final StreamExecutionEnvironment env;
private final DataStreamSource<String> mysqlSource;
public FlinkJobRunner(StreamExecutionEnvironment env, DataStreamSource<String> mysqlSource) {
this.env = env;
this.mysqlSource = mysqlSource;
}
@Override
public void run(String... args) throws Exception {
mysqlSource.print();
env.execute("Flink CDC Job");
}
}
4. 启动 SpringBoot 应用
运行 SpringBoot 应用,启动后会自动执行 Flink 作业,并打印 MySQL 数据库中 users
表的变动。
四、验证和测试
1. 插入测试数据
向 MySQL 数据库中插入数据:
INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com');
INSERT INTO users (name, email) VALUES ('Bob', 'bob@example.com');
2. 验证输出
查看 SpringBoot 应用的控制台输出,确认是否正确捕获并打印了 MySQL 数据库中的变动。
五、总结
通过以上步骤,我们在 SpringBoot 项目中集成了 Flink CDC,实现了对 MySQL 数据变动的实时追踪。这种方法可以用于构建高效的实时数据处理和分析系统,适用于各种需要数据实时同步和处理的场景。
思维导图
- SpringBoot 集成 Flink CDC 实时追踪 MySQL 数据变动
- 准备工作
- 环境准备
- 创建 MySQL 数据库和表
- 集成步骤
- 引入依赖
- 配置 Flink CDC
- 创建 Flink 作业
- 启动 SpringBoot 应用
- 验证和测试
- 插入测试数据
- 验证输出
- 总结
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。