[实战系列]SelectDB Cloud Kafka Connect 最佳实践张家锋

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: [实战系列]SelectDB Cloud Kafka Connect 最佳实践张家锋

概述


企业正在经历其数据资产的爆炸式增长,这些数据包括批式或流式传输的结构化、半结构化以及非结构化数据,随着海量数据批量导入的场景的增多,企业对于 Data Pipeline 的需求也愈加复杂。新一代云原生实时数仓 SelectDB Cloud 作为一款运行于多云之上的云原生实时数据仓库,致力于通过开箱即用的能力为客户带来简单快速的数仓体验。在生态方面,SelectDB Cloud 提供了丰富的数据连接器插件(Connector)来连接各种来自周边大数据工具的数据源,内置 Kafka、Flink、Spark、DataX 等常见的 Connector。基于此,企业开发者能够更加便捷的将数据移动到 SelectDB Cloud 上,并利用 SelectDB Cloud 从数据资产中获取更高的价值。

SelectDB Cloud 基于 Apache Doris 研发的新一代云原生实时数仓 SelectDB,运行于多家云上,为客户提供极简运维和极致性价比的数仓服务。


Kafka Connect For SelectDB Cloud


Kafka Connect 是一款可扩展、可靠的在 Apache Kafka 和其他系统之间进行数据传输的工具。 可以定义 Connectors 来将大量数据迁入迁出Kafka。


SelectDB提供了 Sink Connector 插件,可以将Kafka Topic中的JSON数据保存到SelectDB数据库中。


架构

51.png在业务场景中,通常会通过Debezium Connector将数据库的变更数据实时写入Kafka,或者调用API往Kafka中推送JSON格式数据,使用SelectDB Connector即可将这些数据同步到SelectDB数据库中。


工作原理


Kafka Connector 通过以下过程订阅 Kafka topic 的数据,并将数据 sink 到 SelectDB 中。

52.png

SelectDB Connector 通过内部的 task 一对一或一对多的消费对应 topic partition 的数据。当达到阈值(时间或内存或消息数量)时,connector 会将该批次的 records 生成一个临时文件,并上传至 SelectDB 的对象储存中。


当临时文件数达到 50 个或 connector 向 Kafka 集群预提交已成功消费的 offset 时(默认 10s ),将对象存储中临时文件的通过 Copy-Into 的操作,导入至对应的 table 中。


Exactly-Once


Exactly-Once 语义是指即使在机器或应用出现故障的情况下,也不会重复处理数据或者丢失数据。


Selectdb-kafka-connector 通过 Kafka 集群与 SelectDB 实现 Exactly_Once,具体原理如下:


kafka-connector 在初始化时会主动向 Selectdb 获取当前所在 partition 已提交的 last_offset。


从 kafka 消费数据,只有当前 record 的 offset 大于从 Selectdb 获取的 last_offset 后,才能被正常消费。当消费的 record 达到阈值,会生成一个以 last_offset 命名的临时文件,并将该文件上传至对象存储中。

53.png

在 kafka 调用执行 preCommit 时,会将对象存储中的数据由 copy-into 操作导入至 SelectDB 中,此时 SelectDB会记录已提交成功的 last_offset。

若此时 Kafka-connector 执行 copy-into 失败,则会从 Kafka 中获取当前 partition 上一次执行成功的 offset,继续消费,从而保证数据不丢不重。

60.png

成功执行 copy-into 后,向 kafka 提交记录当前 partition 已成功消费的 offset。


若此时 kafka-connector 意外挂掉,重启该 task 或其他 task 在 kafka 的分区自平衡机制下继续消费该 partition。通过初始化阶段可获取到 SelectDB 中已提交成功的 last_offset,继续消费,直至下一个 preCommit 阶段再向 kafka 提交成功消费的 offset。

60.png

使用场景


环境准备


下载并解压

wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
tar -zxvf kafka_2.12-2.4.0.tgz
bin/zookeeper-server-start.sh -daemon  config/zookeeper.properties 
bin/kafka-server-start.sh -daemon  config/server.properties

快速同步JSON数据


在业务场景中,Kafka中会存放业务写入的数据流,通常格式为JSON(对象/数组),使用SelectDB Sink Connector可以快速的同步数据到SelectDB数据库中。


配置SelectDB Sink

name=selectdb-sink
connector.class=com.selectdb.kafka.connector.SelectdbSinkConnector
topics=test_topic
selectdb.topic2table.map=test_topic:test_tbl
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
selectdb.url=xxx.cn-beijing.privatelink.aliyuncs.com
selectdb.http.port=47057
selectdb.query.port=30523
selectdb.user=admin
selectdb.password=password
selectdb.database=test_db
selectdb.cluster=cluster_name
#配置convert
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
#配置死信队列,可选
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error
errors.deadletterqueue.context.headers.enable = true
errors.deadletterqueue.topic.replication.factor=1

启动Kafka Connect

bin/connect-standalone.sh -daemon config/connect-standalone.properties config/selectdb-sink.properties

使用Debezium数据同步MySQL数据到SelectDB


在很多业务场景中,经常需要从业务数据库中实时同步数据,在这个时候就需要使用数据库的变更数据捕获(Change Data Capture,简称 CDC)机制。


而Debezium是基于Kafka Connect的CDC工具,可以对接 MySQL、PostgreSQL、SQL Server、Oracle、MongoDB 等多种数据库,把数据库的数据持续以统一的格式发送到 Kafka 的Topic中,以供下游Sink端进行实时消费。


这里以MySQL为例


下载Debezium

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.5.4.Final/debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
tar -zxvf debezium-connector-mysql-1.5.4.Final-plugin.tar.gz

配置Debezium Source

name=mysql-source
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=127.0.0.1
database.port=3306
database.user=root
database.password=123456
database.server.id=1
# kafka中的该client的唯一标识
database.server.name=test
#需要同步的数据库,默认是同步所有数据库
database.include.list=db
database.history.kafka.bootstrap.servers=localhost:9092
#用于存储数据库表结构变化的 Kafka topic
database.history.kafka.topic=dbhistory
transforms=unwrap
#参考 https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
#记录删除事件
transforms.unwrap.delete.handling.mode=rewrite

配置好之后,Kafka中默认的Topic名称格式是 SERVER_NAME.DATABASE_NAME.TABLE_NAME

注:其他Debezium配置可参考


https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties


配置SelectDB Sink

name=selectdb-sink
connector.class=com.selectdb.kafka.connector.SelectdbSinkConnector
topics=test.db.table
selectdb.topic2table.map=test.db.table:test_tbl
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
selectdb.url=xxx.cn-beijing.privatelink.aliyuncs.com
selectdb.http.port=57338
selectdb.query.port=15392
selectdb.user=admin
selectdb.password=password
selectdb.database=test
selectdb.cluster=cluster_name
#配置convert
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
#配置死信队列,可选
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error
errors.deadletterqueue.context.headers.enable = true
errors.deadletterqueue.topic.replication.factor=1

同步到SelectDB时,需要先提前创建好库表。


启动Kafka Connect

bin/connect-standalone.sh -daemon config/connect-standalone.properties config/mysql-source.properties config/selectdb-sink.properties

启动后,可以观察日志 logs/connect.log 是否启动成功。


使用效果


在调研的使用场景中,使用 kafka 同步上游 JSON 数据。这里数据维持以每秒 10w 条的超高频导入,在 8c16g 的机器上,仅部署单节点 kafka 集群,同时在 topic 中配置 20 个 partition,以 distributed 模式启动 connector。在实际处理过程中,topic 中的总体消息平均积压在 120w 条左右,单个 partition 积压 6w 条消息,表现相当优秀。

666.png

总结


整体来看,Kafka-SelectDB Connector 打通了从 kafka 直接导入数据至 SelectDB 的数据链路,降低了通过 Flink 作为中间数据同步组件的链路复杂度;通过 Exactly once 实现数据的一次性精确导入,确保了数据的准确性;通过以 Kafka 集群作为载体,在超高频的数据导入场景中,性能表现非常优秀。

目录
相关文章
|
4月前
|
消息中间件 缓存 Java
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
68 0
|
1月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
54 8
|
2月前
|
消息中间件 分布式计算 NoSQL
EMR-Kafka Connect:高效数据迁移的革新实践与应用探索
Kafka Connect是Kafka官方提供的一个可扩展的数据传输框架,它允许用户以声明式的方式在Kafka与其他数据源之间进行数据迁移,无需编写复杂的数据传输代码。
|
3月前
|
消息中间件 存储 Kafka
go语言并发实战——日志收集系统(二) Kafka简介
go语言并发实战——日志收集系统(二) Kafka简介
|
3月前
|
消息中间件 Java Kafka
springboot整合kafka消费者最佳实践
springboot整合kafka消费者最佳实践
324 1
|
3月前
|
消息中间件 算法 Java
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
|
4月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
153 5
|
4月前
|
消息中间件 JSON Kafka
【十九】初学Kafka并实战整合SpringCloudStream进行使用
【十九】初学Kafka并实战整合SpringCloudStream进行使用
85 1
【十九】初学Kafka并实战整合SpringCloudStream进行使用
|
4月前
|
消息中间件 NoSQL Kafka
云原生最佳实践系列 5:基于函数计算 FC 实现阿里云 Kafka 消息内容控制 MongoDB DML 操作
该方案描述了一个大数据ETL流程,其中阿里云Kafka消息根据内容触发函数计算(FC)函数,执行针对MongoDB的增、删、改操作。