Flink Mysql CDC结合Doris flink connector实现数据实时入库

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: Apache doris通过扩展支持通过 Flink 读写 doris 数仓中的数据表,目前 doris 支持 Flink 1.11.x ,1.12.x,1.13.x,Scala版本:2.12.x目前Flink doris connector目前控制入库通过两个参数:1. sink.batch.size :每多少条写入一次,默认100条2. sink.batch.interval :每个多少秒写入一下,默认1秒这两参数同时起作用,那个条件先到就触发写doris表操作。

Apache Doris 代码仓库地址:apache/incubator-doris 欢迎大家关注加星


Apache doris通过扩展支持通过 Flink 读写 doris 数仓中的数据表,


目前 doris 支持 Flink 1.11.x ,1.12.x,1.13.x,Scala版本:2.12.x


目前Flink doris connector目前控制入库通过两个参数:


  1. sink.batch.size  :每多少条写入一次,默认100条


  1. sink.batch.interval :每个多少秒写入一下,默认1秒


这两参数同时起作用,那个条件先到就触发写doris表操作,


注意:


这里注意的是要启用 http v2 版本,具体在 fe.conf 中配置


enable_http_server_v2=true,同时因为是通过 fe http rest api 获取 be 列表,这俩需要配置的用户有 admin 权限。


Flink Doris Connector 编译


在 doris 的 docker 编译环境 apache/incubator-doris:build-env-1.2 下进行编译,因为 1.3 下面的JDK 版本是 11,会存在编译问题。


在 extension/flink-doris-connector/ 源码目录下执行:

sh build.sh

编译成功后,会在 output/ 目录下生成文件 doris-flink-1.0.0-SNAPSHOT.jar。将此文件复制到 FlinkClassPath 中即可使用 Flink-Doris-Connector。例如,Local 模式运行的 Flink,将此文件放入 jars/ 文件夹下。Yarn集群模式运行的Flink,则将此文件放入预部署包中。


针对Flink 1.13.x版本适配问题

<properties>
        <scala.version>2.12</scala.version>
        <flink.version>1.11.2</flink.version>
        <libthrift.version>0.9.3</libthrift.version>
        <arrow.version>0.15.1</arrow.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <doris.home>${basedir}/../../</doris.home>
        <doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty>
    </properties>

只需要将这里的 flink.version 改成和你 Flink 集群版本一致,重新编辑即可


使用示例


通过flink cdc实现mysql binlog日志数据的消费,然后通过flink doris connector sql实时导入mysql数据到doris表数据中


这个代码已经提交到apache doris的示例代码库里


org.apache.doris.demo.flink.FlinkConnectorMysqlCDCDemo

注意: 由于Flink doris connector jar包不在Maven中央仓库中,需要单独编译并添加到你项目的classpath中。参考Flink doris connector的编译和使用: Flink doris connector


  1. 首先Mysql 要开启 binlog


具体如何打开binlog请自行搜索或到Mysql官方文档查询


  1. 安装Flink,Flink的安装和使用这里不做介绍,只是在开发环境中给出代码示例


  1. 创建Mysql数据库表
CREATE TABLE `test` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
 ) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
  1. 创建doris表


CREATE TABLE `doris_test` (
  `id` int NULL COMMENT "",
  `name` varchar(100) NULL COMMENT ""
 ) ENGINE=OLAP
 DUPLICATE KEY(`id`)
 COMMENT "OLAP"
 DISTRIBUTED BY HASH(`id`) BUCKETS 1
 PROPERTIES (
 "replication_num" = "3",
 "in_memory" = "false",
 "storage_format" = "V2"
 );
  1. 创建Flink Mysql CDC


tEnv.executeSql(
  "CREATE TABLE orders (\n" +
  "  id INT,\n" +
  "  name STRING\n" +
  ") WITH (\n" +
  "  'connector' = 'mysql-cdc',\n" +
  "  'hostname' = 'localhost',\n" +
  "  'port' = '3306',\n" +
  "  'username' = 'root',\n" +
  "  'password' = 'zhangfeng',\n" +
  "  'database-name' = 'demo',\n" +
  "  'table-name' = 'test'\n" +
  ")");
  1. 创建Flink Doris Table 映射表


tEnv.executeSql(
  "CREATE TABLE doris_test_sink (" +
  "id INT," +
  "name STRING" +
  ") " +
  "WITH (\n" +
  "  'connector' = 'doris',\n" +
  "  'fenodes' = '10.220.146.10:8030',\n" +
  "  'table.identifier' = 'test_2.doris_test',\n" +
  "  'sink.batch.size' = '2',\n" +
  "  'username' = 'root',\n" +
  "  'password' = ''\n" +
  ")");
  1. 执行插入操作


tEnv.executeSql("INSERT INTO doris_test_sink select id,name from orders");




相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
24天前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
52 9
|
3月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
627 2
Flink CDC:新一代实时数据集成框架
|
3月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
568 14
Flink CDC 在货拉拉的落地与实践
|
19天前
|
SQL 关系型数据库 MySQL
12 PHP配置数据库MySQL
路老师分享了PHP操作MySQL数据库的方法,包括安装并连接MySQL服务器、选择数据库、执行SQL语句(如插入、更新、删除和查询),以及将结果集返回到数组。通过具体示例代码,详细介绍了每一步的操作流程,帮助读者快速入门PHP与MySQL的交互。
34 1
|
22天前
|
SQL 关系型数据库 MySQL
go语言数据库中mysql驱动安装
【11月更文挑战第2天】
36 4
|
2月前
|
存储 关系型数据库 MySQL
Mysql(4)—数据库索引
数据库索引是用于提高数据检索效率的数据结构,类似于书籍中的索引。它允许用户快速找到数据,而无需扫描整个表。MySQL中的索引可以显著提升查询速度,使数据库操作更加高效。索引的发展经历了从无索引、简单索引到B-树、哈希索引、位图索引、全文索引等多个阶段。
66 3
Mysql(4)—数据库索引
|
28天前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第27天】本文深入探讨了MySQL的索引策略和查询性能调优技巧。通过介绍B-Tree索引、哈希索引和全文索引等不同类型,以及如何创建和维护索引,结合实战案例分析查询执行计划,帮助读者掌握提升查询性能的方法。定期优化索引和调整查询语句是提高数据库性能的关键。
160 1
|
1月前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。同时,文章还对比了编译源码安装与使用 RPM 包安装的优缺点,帮助读者根据需求选择最合适的方法。通过具体案例,展示了编译源码安装的灵活性和定制性。
88 2
|
1月前
|
存储 关系型数据库 MySQL
MySQL vs. PostgreSQL:选择适合你的开源数据库
在众多开源数据库中,MySQL和PostgreSQL无疑是最受欢迎的两个。它们都有着强大的功能、广泛的社区支持和丰富的生态系统。然而,它们在设计理念、性能特点、功能特性等方面存在着显著的差异。本文将从这三个方面对MySQL和PostgreSQL进行比较,以帮助您选择更适合您需求的开源数据库。
122 4
|
16天前
|
运维 关系型数据库 MySQL
安装MySQL8数据库
本文介绍了MySQL的不同版本及其特点,并详细描述了如何通过Yum源安装MySQL 8.4社区版,包括配置Yum源、安装MySQL、启动服务、设置开机自启动、修改root用户密码以及设置远程登录等步骤。最后还提供了测试连接的方法。适用于初学者和运维人员。
125 0