Apache Doris Binlog Load使用方法及示例

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 下面会介绍一些Apache Doris Binlog Load使用方法及示例。

1. 安装配置 Mysql

  1. 安装Mysql
    快速使用Docker安装配置Mysql,具体参照下面的连接

    如果是在物理机上安装可以参考下面的连接:

    进入 Docker 容器或者物理机上修改/etc/my.cnf 文件,在 [mysqld] 下面添加以下内容,
log_bin=mysql_bin
binlog-format=Row
server-id=1
  1. 然后重启Mysql


systemctl restart mysqld
  1. 创建 Mysql 表


create database demo;
 CREATE TABLE `test_cdc` (
  `id` int NOT NULL AUTO_INCREMENT,
  `sex` TINYINT(1) DEFAULT NULL,
  `name` varchar(20) DEFAULT NULL,
  `address` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
 ) ENGINE=InnoDB

2. 安装配置Canal


  1. 解压Canal到指定目录:
tar zxvf canal.deployer-1.1.5.tar.gz -C ./canal
  1. 在conf文件夹下新建目录并重命名,作为instance的根目录,目录名你可以自己命名便于识别即可


例如我这里的命名是和我的数据库库名一致:demo

vi conf/demo/instance.properties
  1. 下面给出的是一个我的示例配置:


这里面的参数说明请参考Canal官方文档:QuickStart

#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=12115
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=10.220.146.11:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=zhangfeng
canal.instance.dbPassword=zhangfeng800729)(*Q
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=demo\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
#canal.mq.topic=
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
  1. 启动Canal
sh bin/startup.sh

注意:canal instance user/passwd

1.1.5 版本,在canal.properties里加上这两个配置

canal.user = canal canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

默认密码为canal/canal,canal.passwd的密码值可以通过select password(“xxx”) 来获取

  1. 验证是否启动成功


tail -200f logs/demo/demo.log
  1. image.png

3.开始同步数据


3.1 创建Doris目标表


用户需要先在Doris端创建好与Mysql端对应的目标表


Binlog Load只能支持Unique类型的目标表,且必须激活目标表的Batch Delete功能。


开启Batch Delete的方法可以参考help alter table中的批量删除功能。

CREATE TABLE `doris_mysql_binlog_demo` (
  `id` int NOT NULL,
  `sex` TINYINT(1),
  `name` varchar(20),
  `address` varchar(255) 
) ENGINE=OLAP
UNIQUE KEY(`id`,sex)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`sex`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);
-- enable batch delete
ALTER TABLE test_2.doris_mysql_binlog_demo ENABLE FEATURE "BATCH_DELETE";

3.1 创建同步作业


3.1.1 Create Sync Job 语法说明


Name: ‘CREATE SYNC JOB’ Description:


数据同步(Sync Job)功能,支持用户提交一个常驻的数据同步作业,通过从指定的远端地址读取Binlog日志,增量同步用户在Mysql数据库的对数据更新操作的CDC(Change Data Capture)功能。


目前数据同步作业只支持对接Canal,从Canal Server上获取解析好的Binlog数据,导入到Doris内。


用户可通过 SHOW SYNC JOB 查看数据同步作业状态。


语法:

CREATE SYNC [db.]job_name
 (
    channel_desc, 
    channel_desc
    ...
 )
binlog_desc
  1. job_name
    同步作业名称,是作业在当前数据库内的唯一标识,相同job_name的作业只能有一个在运行。


  1. channel_desc作业下的数据通道,用来描述mysql源表到doris目标表的映射关系。语法:


FROM mysql_db.src_tbl INTO des_tbl
 [partitions]
 [columns_mapping]
  1. mysql_db.src_tbl
    指定mysql端的数据库和源表。
  2. des_tbl
    指定doris端的目标表,只支持Unique表,且需开启表的batch delete功能(开启方法请看help alter table的’批量删除功能’)。
  3. partitions
    指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。
    示例:
PARTITION(p1, p2, p3)
  1. column_mapping

指定mysql源表和doris目标表的列之间的映射关系。如果不指定,FE会默认源表和目标表的列按顺序一一对应。


不支持 col_name = expr 的形式表示列。
示例:

假设目标表列为(k1, k2, v1),
 改变列k1和k2的顺序
 COLUMNS(k2, k1, v1)
 忽略源数据的第四列
 COLUMNS(k2, k1, v1, dummy_column)
  1. binlog_desc用来描述远端数据源,目前仅支持canal一种。语法:
FROM BINLOG
 (
     "key1" = "value1", 
     "key2" = "value2"
 )
  1. Canal 数据源对应的属性,以canal.为前缀
  1. canal.server.ip: canal server的地址
  2. canal.server.port: canal server的端口
  3. canal.destination: instance的标识
  4. canal.batchSize: 获取的batch大小的最大值,默认8192
  5. canal.username: instance的用户名
  6. canal.password: instance的密码
  7. canal.debug: 可选,设置为true时,会将batch和每一行数据的详细信息都打印出来 Examples:
  1. 简单为 test_dbtest_tbl 创建一个名为 job1 的数据同步作业,连接本地的Canal服务器,对应Mysql源表 mysql_db1.tbl1
CREATE SYNC `test_db`.`job1`
 (
     FROM `mysql_db1`.`tbl1` INTO `test_tbl `
 )
 FROM BINLOG 
 (
     "type" = "canal",
     "canal.server.ip" = "127.0.0.1",
     "canal.server.port" = "11111",
     "canal.destination" = "example",
     "canal.username" = "",
     "canal.password" = ""
 );
  1. test_db 的多张表创建一个名为 job1 的数据同步作业,一一对应多张Mysql源表,并显式的指定列映射。
CREATE SYNC `test_db`.`job1` 
 (
     FROM `mysql_db`.`t1` INTO `test1` COLUMNS(k1, k2, v1) PARTITIONS (p1, p2),
     FROM `mysql_db`.`t2` INTO `test2` COLUMNS(k3, k4, v2) PARTITION p1
 ) 
 FROM BINLOG 
 (
     "type" = "canal", 
     "canal.server.ip" = "xx.xxx.xxx.xx", 
     "canal.server.port" = "12111", 
     "canal.destination" = "example",  
     "canal.username" = "username", 
     "canal.password" = "password"
 );

3.1.2 开始同步mysql表里数据到Doris


注意:

创建同步任务之前,首先要在fe.conf里配置enable_create_sync_job=true,这个默认是false不启用,否则就不能创建同步任务

CREATE SYNC test_2.doris_mysql_binlog_demo_job 
(
    FROM demo.test_cdc INTO doris_mysql_binlog_demo
) 
FROM BINLOG 
(
    "type" = "canal", 
    "canal.server.ip" = "10.220.146.10", 
    "canal.server.port" = "11111", 
    "canal.destination" = "demo",  
    "canal.username" = "canal", 
    "canal.password" = "canal"
);

3.1.3 查看同步任务


SHOW SYNC JOB from test_2;

image.png


3.1.4 查看表里的数据


select * from doris_mysql_binlog_demo;

image.png


3.1.5 删除数据


我们在Mysql 数据表里删除数据,然后看Doris表里的变化


delete from test_cdc where id in (12,13)

我们在去看Doris表里,id是12,13这两条数据已经被删除


image.png


3.1.6 多表同步


多表同步只需要像下面这样写法就可以了


CREATE SYNC test_2.doris_mysql_binlog_demo_job 
(
    FROM demo.test_cdc INTO doris_mysql_binlog_demo,
    FROM demo.test_cdc_1 INTO doris_mysql_binlog_demo,
    FROM demo.test_cdc_2 INTO doris_mysql_binlog_demo,
    FROM demo.test_cdc_3 INTO doris_mysql_binlog_demo
)




相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
8天前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
7天前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
14天前
|
SQL 消息中间件 Java
兼容Trino Connector,扩展Apache Doris数据源接入能力|Lakehouse 使用手册(四)
通过兼容 Connector 插件,Apache Doris 能够支持 Trino/Presto 可对接的所有数据源,而无需改动 Doris 的内核代码。
兼容Trino Connector,扩展Apache Doris数据源接入能力|Lakehouse 使用手册(四)
|
15天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
61 11
|
25天前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
31 1
|
23天前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
86 2
|
25天前
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
34 3
|
25天前
|
消息中间件 运维 Kafka
Apache Flink 实践问题之达到网卡的最大速度如何解决
Apache Flink 实践问题之达到网卡的最大速度如何解决
34 2
|
26天前
|
消息中间件 前端开发 Kafka
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
|
25天前
|
SQL 运维 分布式计算
Apache Flink 实践问题之避免用户作业包中包含Flink的core包如何解决
Apache Flink 实践问题之避免用户作业包中包含Flink的core包如何解决
37 1
Apache Flink 实践问题之避免用户作业包中包含Flink的core包如何解决