干货 | Debezium实现Mysql到Elasticsearch高效实时同步

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 来自Elasticsearch中文社区的问题——MySQL中表无唯一递增字段,也无唯一递增时间字段,该怎么使用logstash实现MySQL实时增量导数据到es中?

image.png

链接

logstash和kafka_connector都仅支持基于自增id或者时间戳更新的方式增量同步数据。

回到问题本身:如果库表里没有相关字段,该如何处理呢?


本文给出相关探讨和解决方案。


1、 binlog认知

1.1 啥是 binlog?

binlog是Mysql sever层维护的一种二进制日志,与innodb引擎中的redo/undo log是完全不同的日志;

其主要是用来记录对mysql数据更新或潜在发生更新的SQL语句,并以"事务"的形式保存在磁盘中;

作用主要有:


1)复制:达到master-slave数据一致的目的。

2)数据恢复:通过mysqlbinlog工具恢复数据。

3)增量备份。

1.2 阿里的Canal实现了增量Mysql同步

image.png

一图胜千言,canal是用java开发的基于数据库增量日志解析、提供增量数据订阅&消费的中间件。


目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。目的:增量数据订阅&消费。


综上,使用binlog可以突破logstash或者kafka-connector没有自增id或者没有时间戳字段的限制,实现增量同步。


2、基于binlog的同步方式

1)基于kafka Connect的Debezium 开源工程,地址:. https://debezium.io/

2)不依赖第三方的独立应用: Maxwell开源项目,地址:http://maxwells-daemon.io/


由于已经部署过conluent(kafka的企业版本,自带zookeeper、kafka、ksql、kafka-connector等),本文仅针对Debezium展开。


3、Debezium介绍

Debezium是捕获数据实时动态变化的开源的分布式同步平台。能实时捕获到数据源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、删除(deletes)操作,实时同步到Kafka,稳定性强且速度非常快。

特点:


1)简单。无需修改应用程序。可对外提供服务。

2)稳定。持续跟踪每一行的每一处变动。

3)快速。构建于kafka之上,可扩展,经官方验证可处理大容量的数据。

4、同步架构

image.png

如图,Mysql到ES的同步策略,采取“曲线救国”机制。

步骤1: 基Debezium的binlog机制,将Mysql数据同步到Kafka。

步骤2: 基于Kafka_connector机制,将kafka数据同步到Elasticsearch。


5、Debezium实现Mysql到ES增删改实时同步

软件版本:


confluent:5.1.2;

Debezium:0.9.2_Final;

Mysql:5.7.x.

Elasticsearch:6.6.1


5.1 Debezium安装

confluent的安装部署参见:http://t.cn/Ef5poZk,不再赘述。


Debezium的安装只需要把debezium-connector-mysql的压缩包解压放到Confluent的解压后的插件目录(share/java)中。


MySQL Connector plugin 压缩包的下载地址:https://debezium.io/docs/install/


注意重启一下confluent,以使得Debezium生效。


5.2 Mysql binlog等相关配置。

Debezium使用MySQL的binlog机制实现数据动态变化监测,所以需要Mysql提前配置binlog。

核心配置如下,在Mysql机器的/etc/my.cnf的mysqld下添加如下配置。


[mysqld]


server-id         = 223344

log_bin           = mysql-bin

binlog_format     = row

binlog_row_image  = full

expire_logs_days  = 10

1

2

3

4

5

6

7

然后,重启一下Mysql以使得binlog生效。


systemctl start mysqld.service

1

5.3 配置connector连接器。

配置confluent路径目录 : /etc

创建文件夹命令 :


mkdir kafka-connect-debezium

1

在mysql2kafka_debezium.json存放connector的配置信息 :


[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json

{

       "name" : "debezium-mysql-source-0223",

       "config":

       {

            "connector.class" : "io.debezium.connector.mysql.MySqlConnector",

            "database.hostname" : "192.168.1.22",

            "database.port" : "3306",

            "database.user" : "root",

            "database.password" : "XXXXXX",

            "database.whitelist" : "kafka_base_db",

            "table.whitlelist" : "accounts",

            "database.server.id" : "223344",

            "database.server.name" : "full",

            "database.history.kafka.bootstrap.servers" : "192.168.1.22:9092",

            "database.history.kafka.topic" : "account_topic",

            "include.schema.changes" : "true" ,

            "incrementing.column.name" : "id",

            "database.history.skip.unparseable.ddl" : "true",

            "transforms": "unwrap,changetopic",

            "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",

            "transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter",

            "transforms.changetopic.regex":"(.*)",

            "transforms.changetopic.replacement":"$1-smt"

       }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

注意如下配置:


“database.server.id”,对应Mysql中的server-id的配置。

“database.whitelist” : 待同步的Mysql数据库名。

“table.whitlelist” :待同步的Mysq表名。

重要:“database.history.kafka.topic”:存储数据库的Shcema的记录信息,而非写入数据的topic、

“database.server.name”:逻辑名称,每个connector确保唯一,作为写入数据的kafka topic的前缀名称。

坑一:transforms相关5行配置作用是写入数据格式转换。

如果没有,输入数据会包含:before、after记录修改前对比信息以及元数据信息(source,op,ts_ms等)。


这些信息在后续数据写入Elasticsearch是不需要的。(注意结合自己业务场景)。


格式转换相关原理:https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/


5.4 启动connector

curl -X POST -H "Content-Type:application/json"

--data @mysql2kafka_debezium.json.json

http://192.168.1.22:18083/connectors | jq

1

2

3

5.5 验证写入是否成功。

查看kafka-topic

kafka-topics --list --zookeeper localhost:2181

1

此处会看到写入数据topic的信息。


注意新写入数据topic的格式:database.schema.table-smt 三部分组成。


本示例topic名称:full.kafka_base_db.account-smt。


消费数据验证写入是否正常

./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning


至此,Debezium实现mysql同步kafka完成。


6、kafka-connector实现kafka同步Elasticsearch

6.1、Kafka-connector介绍

见官网:https://docs.confluent.io/current/connect.html


Kafka Connect是一个用于连接Kafka与外部系统(如数据库,键值存储,检索系统索引和文件系统)的框架。

连接器实现公共数据源数据(如Mysql、Mongo、Pgsql等)写入Kafka,或者Kafka数据写入目标数据库,也可以自己开发连接器。


6.2、kafka到ES connector同步配置

配置路径:


/home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

1

配置内容:


"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

"tasks.max": "1",

"topics": "full.kafka_base_db.account-smt",

"key.ignore": "true",

"connection.url": "http://192.168.1.22:9200",

"type.name": "_doc",

"name": "elasticsearch-sink-test"

1

2

3

4

5

6

7

6.3 kafka到ES启动connector

启动命令


confluent load  elasticsearch-sink-test

-d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

1

2

6.4 Kafka-connctor RESTFul API查看

Mysql2kafka,kafka2ES的connector详情信息可以借助postman或者浏览器或者命令行查看。


  curl -X GET http://localhost:8083/connectors

1

7、坑复盘。

坑2: 同步的过程中可能出现错误,比如:kafka topic没法消费到数据。

排解思路如下:


1)确认消费的topic是否是写入数据的topic;

2)确认同步的过程中没有出错。可以借助connector如下命令查看。

  curl -X GET http://localhost:8083/connectors-xxx/status

1

坑3: Mysql2ES出现日期格式不能识别。

是Mysql jar包的问题,解决方案:在my.cnf中配置时区信息即可。


坑4: kafka2ES,ES没有写入数据。

排解思路:


1)建议:先创建同topic名称一致的索引,注意:Mapping静态自定义,不要动态识别生成。

2)通过connetor/status排查出错原因,一步步分析。

8、小结

binlog的实现突破了字段的限制,实际上业界的go-mysql-elasticsearch已经实现。


对比:logstash、kafka-connector,虽然Debezium“曲线救国”两步实现了实时同步,但稳定性+实时性能相对不错。


推荐大家使用。大家有好的同步方式也欢迎留言讨论交流。


参考:

[1] https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/

[2] https://www.smwenku.com/a/5c0a7b61bd9eee6fb21356a1/zh-cn

[3] https://juejin.im/post/5b7c036bf265da43506e8cfd

[4] https://debezium.io/docs/connectors/mysql/#configuration

[5] https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc


image.png

相关文章
|
7天前
|
SQL DataWorks 关系型数据库
阿里云 DataWorks 正式支持 SelectDB & Apache Doris 数据源,实现 MySQL 整库实时同步
阿里云数据库 SelectDB 版是阿里云与飞轮科技联合基于 Apache Doris 内核打造的现代化数据仓库,支持大规模实时数据上的极速查询分析。通过实时、统一、弹性、开放的核心能力,能够为企业提供高性价比、简单易用、安全稳定、低成本的实时大数据分析支持。SelectDB 具备世界领先的实时分析能力,能够实现秒级的数据实时导入与同步,在宽表、复杂多表关联、高并发点查等不同场景下,提供超越一众国际知名的同类产品的优秀性能,多次登顶 ClickBench 全球数据库分析性能排行榜。
|
1月前
|
关系型数据库 MySQL API
MySQL 历史数据迁移到 Elasticsearch
MySQL 历史数据迁移到 Elasticsearch
61 4
|
2月前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
1月前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
70 0
|
3月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之MySQL到MySOL的批量实时同步该如何操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用问题之任务无法实时同步MySQL到StarRocks中修改的数据,是什么原因
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
29天前
|
存储 JSON Java
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
这篇文章是关于Elasticsearch的学习指南,包括了解Elasticsearch、版本对应、安装运行Elasticsearch和Kibana、安装head插件和elasticsearch-ik分词器的步骤。
96 0
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
|
3月前
|
数据可视化 Docker 容器
一文教会你如何通过Docker安装elasticsearch和kibana 【详细过程+图解】
这篇文章提供了通过Docker安装Elasticsearch和Kibana的详细过程和图解,包括下载镜像、创建和启动容器、处理可能遇到的启动失败情况(如权限不足和配置文件错误)、测试Elasticsearch和Kibana的连接,以及解决空间不足的问题。文章还特别指出了配置文件中空格的重要性以及环境变量中字母大小写的问题。
一文教会你如何通过Docker安装elasticsearch和kibana 【详细过程+图解】
|
3月前
|
JSON 自然语言处理 数据库
Elasticsearch从入门到项目部署 安装 分词器 索引库操作
这篇文章详细介绍了Elasticsearch的基本概念、倒排索引原理、安装部署、IK分词器的使用,以及如何在Elasticsearch中进行索引库的CRUD操作,旨在帮助读者从入门到项目部署全面掌握Elasticsearch的使用。
|
3月前
|
Ubuntu Oracle Java
如何在 Ubuntu VPS 上安装 Elasticsearch
如何在 Ubuntu VPS 上安装 Elasticsearch
37 0
下一篇
无影云桌面