Maxwell - 增量数据同步工具(2)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: Maxwell - 增量数据同步工具

Maxwell - 增量数据同步工具(1)https://developer.aliyun.com/article/1532368

3.3、Maxwell 进程启动

3.3.1、使用命令行参数启动

bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout
  • user:连接 mysql 的用户(上面我们已经创建了)
  • password:连接 mysql 的用户密码
  • host: mysql 安装的主机名
  • producer:生产者模式(stdout:控制台,kafka:kafka 集群)

查看 maxwell 输出:

我们可以看到,我们的 insert 语句被 maxwell 转为了 json 被输出到了控制台。

3.3.2、定制化配置文件启动

编辑配置文件 config.properties:

log_level=info
 
producer=stdout
kafka.bootstrap.servers=localhost:9092
 
# mysql login info
host=hadoop102
user=maxwell
password=123456

启动 Maxwell:

4、Maxwell 使用

4.1、监控 MySQL 数据并在控制台打印

       上面我们已经演示过了,也就是首先在 mysql 的配置文件中指定需要打印 binlog 的数据库,然后通过命令或者配置文件来开启 Maxwell 进程:

bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout

接着我们对被监听的数据库的操作信息就会以 json 的格式打印出来:

4.2、监控 MySQL 数据并输出到 Kafka

       监控 MySQL 数据并输出到 Kafka,这种需求是我们实际工作用的最多的,这也是我们离线数仓和实时数仓所必须的。

4.2.1、实例

1)启动 Zookeeper 和 Kafka

2)启动 Maxwell 监控 binlog

bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=kafka --kafka.bootstrap.servers=hadoop102:9092 --kafka_topic=maxwell

注:这里我们制定了 Kafka 集群地址为 hadoop102:9092 ,主题为 maxwell,虽然主题并不存在,但是 kafka 会自动创建我们指定的主题

3)在 hadoop103 消费 maxwell 主题

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell

4)在 MySQL 中增加一条数据再删除:

可以看到我们的数据成功被 Kafka 消费,说明业务数据日志成功传输到 Kafka 。

4.2.2、Kafak 主题数据的分区控制

       上面的案例中,我们不论 MySQL 的那个数据库,哪张表,都被保存到了 Kafka 的 maxwell 主题,这样显然很乱。所以这里我们需要解决一下:

       在生产环境中,我们一般都会用 maxwell 监控多个 mysql 库的数据,然后将这些数据发往 Kafka 的一个主题,并且这个主题肯定是多分区的,为了提高并发度。那么如何控制这些数据的分区问题,那就变得至关重要,实现步骤如下:

(1)修改 maxwell 的配置文件,定制化启动 maxwell 进程

log_level=info
 
producer=kafka
kafka.bootstrap.servers=hadoop102:9092
 
# mysql login info
host=hadoop102
user=maxwell
password=123456
 
# kafka 主题
kafka_topic=maxwell3
 
# 默认配好的
kafka.compression.type=snappy
kafka.retries=0
kafka.acks=1
 
#producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column]
# 常用的按照库分区、按照表分区,这里我们测试按照库分区
producer_partition_by=database

(2)手动创建 Kafka 多分区主题

kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 2 --partitions 3 --topic maxwell3

查看主题信息:

这样,我们不同数据库的操作记录就会保存到 Kafka 不同的分区当中, 分区可以使得集群负载均衡,还可以提高提高读写效率。

4.3、监控 MySQL 指定表输出控制台

(1)运行 maxwell 来监控 mysql 指定表数据更新

监控 test 库下的 staff 表

bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout --filter 'exclude: *.*,include:test.staff'

(2)向 staff 表中插入数据,查看 maxwell 控制台输出

(3)向 ws2 表中插入一条数据,查看

注:表 ws2 并不是我们指定监控的表

结果:maxwell 控制台并没有任何输出

注:还可以设置 include.test.* 来表示监控 test 库下的所有表

4.4、监控 MySQL 指定表全量输出到控制台

       Maxwell 进程默认只能监控 mysql 的 binlog 日志的新增以及变化的,但是 Maxwell 是支持数据初始化的,可以通过修改 Maxwell 的元数据来对 MySQL 中的某张表进行数据初始化,也就是我们常说的全量同步。具体操作如下:

       需求:将 test 库下的 staff 表中的数据全量导入到 maxwell 控制台打印。

我们可以在数据库 maxwell (这个数据库是我们初始化 maxwell 元数据库时创建的)中查看元数据表 positions 的内容:

这里的 server_id 是我们在 mysql 的配置文件 /etc/my.cnf 中配置的;这里的 binlog_file 代表的是最新的 binlog 文件 mysql-bin.000003 ;这里的 binlog_position 的值为 2802 代表目前已经读到了第 2802 个字节的位置,意思就说我们现在是从这个偏移量之后开始监听的。

注:也可以通过 SQL:show master status; 来查看当前的 binlog 状态。

1. 向元数据表 maxwell.bootstrap 中插入数据

insert into maxwell.bootstrap(database_name,table_name) values('test','staff');

这就相当于告诉 maxwell,下一次启动 maxwell 作业时,就会调用一个初始化作业,把我们指定的这个 test 数据库下的 staff 表进行一次增量同步。

我们可以看到,通过 bootstrap 表插入数据来完成初始化,这样插入的数据是 json 中 type 字段的值是 "bootstrap-insert" 而不是 "insert"。

再次查看 bootstrap 表:

我们可以发现,is_complete 字段的值从默认值 0 变成了 1,inserted_rows 从 0 变成了 6,后面的 started_at 和 completed_at 字段的值也不再为空了。

2. 使用 maxwell-bootstrap 脚本工具

除了上面的直接向 maxwell 元数据库中的 bootstrap 表插入数据,也可以使用 maxwell 提供的 maxwell-bootstrap 脚本,本质其实是一样的,具体看你觉得哪个省事。

bin/maxwell-bootstrap --database gmall --table user_info --config ./config.properties

这样,这个初始化作业就算完成了,当我们下次打开 maxwell 进程时,它并不会再次执行这个初始化任务,因为它已经完成过了。

总结

       至此,Maxwell 的学习也已经结束了,待会吧离线数仓项目里的 maxwell 部分完成。总体来说,这个工具还是很好用且简单的,希望有一天自己也可以用屎山堆砌出这么一个自己的框架!

相关文章
|
6月前
|
SQL 存储 关系型数据库
DataX - 全量数据同步工具(2)
DataX - 全量数据同步工具
|
3月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
745 4
|
4月前
|
运维 监控 Unix
运维必看,Linux 远程数据同步工具详解。
运维必看,Linux 远程数据同步工具详解。
|
4月前
|
关系型数据库 MySQL 大数据
DataX:数据同步的超音速英雄!阿里开源工具带你飞越数据传输的银河系,告别等待和故障的恐惧!快来见证这一数据工程的奇迹!
【8月更文挑战第13天】DataX是由阿里巴巴开源的一款专为大规模数据同步设计的工具,在数据工程领域展现强大竞争力。它采用插件化架构,支持多种数据源间的高效迁移。相较于Apache Sqoop和Flume,DataX通过并发写入和流处理实现了高性能同步,并简化了配置流程。DataX还支持故障恢复,能够在同步中断后继续执行,节省时间和资源。这些特性使其成为构建高效可靠数据同步方案的理想选择。
339 2
|
4月前
|
存储 JSON NoSQL
【redis数据同步】redis-shake数据同步全量+增量
【redis数据同步】redis-shake数据同步全量+增量
|
6月前
|
SQL Oracle 关系型数据库
多环境数据同步(Navicat工具)
多环境数据同步(Navicat工具)
156 0
|
6月前
|
SQL 关系型数据库 MySQL
DataX - 全量数据同步工具(1)
DataX - 全量数据同步工具
|
4月前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
2月前
|
监控 关系型数据库 MySQL
深入了解MySQL主从复制:构建高效稳定的数据同步架构
深入了解MySQL主从复制:构建高效稳定的数据同步架构
130 1
|
4月前
|
关系型数据库 MySQL 数据库
【MySQL】手把手教你MySQL数据同步
【MySQL】手把手教你MySQL数据同步

热门文章

最新文章