TiDB实时同步数据到PostgreSQL(一) ---- 搭建kafka集群

简介: TiDB实时同步数据到PostgreSQL的第一篇,主要介绍kafka集群的搭建。

业务系统早期采用LNMP架构,随着业务发展,数据量逐渐增加,主要业务表数据数据量均已过亿,加之产品设计原因,无法界定什么数据是历史数据,无法做冷热分离;业务又比较复杂,经常需要多张大表join查询;由于一些业务的特殊性以及架构原因,分库分表也很困难。几年前将数据库从MySQL迁移到TiDB,由于使用云服务器,又因为种种原因无法大量增加硬件,经常出现个别大用户一个大查询导致数据库雪崩,虽然可以通过设置一些参数限制SQL执行时间,但这样又导致一些大用户连统计分析业务都无法正常完成。也考虑过使用TiFlash,但经过测试,同等硬件、同样的数据,对于我们大部分业务查询,PostgreSQL的性能更胜一筹,本着适合自己的才是最好的这一原则,最终选择将TiDB的数据实时同步到PostgreSQL,TiDB完成业务,由PostgreSQL完成财务、统计分析等查询业务。

TiDB还是非常不错的,通过TiCDC结合canal-json协议将数据库变更数据同步到kafka,再通过自行开发的同步软件消费kafka数据将数据同步到PostgreSQL中。所以万里长征的第一步就是要搭建kafka集群,曾经也想偷懒整个单机kafka,但发现单机kafka极其不稳定,经常莫明其妙就崩了,生产环境中是绝对不能容忍这种情况发生的,所以一定要集群!!!

这里使用三台服务器(192.168.0.1、192.168.0.2、192.168.0.3),分别安装zookeeper跟kafka,有条件的话建议zookeeper与kafka分开安装。这里使用kafka 2.7.0以及kafka自带的zookeeper 3.5.8为例说明kafka集群的搭建过程。

一、下载及解压kafka

(本例假设将kafka安装到/home/test/server/kafka目录下)

wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar zxvf kafka_2.13-2.7.0.tgz

二、配置zookeeper

cd /home/test/server/kafka/conf
cat zookeeper.properties
# the directory where the snapshot is stored.dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
# the port at which the clients will connectclientPort=2181# disable the per-ip limit on the number of connections since this is a non-production configmaxClientCnxns=0# Disable the adminserver by default to avoid port conflicts.# Set the port to something non-conflicting if choosing to enable thisadmin.enableServer=false# admin.serverPort=8080tickTime=2000initLimit=10syncLimit=5server.0=192.168.0.1:2888:3888
server.1=192.168.0.2:2888:3888
server.2=192.168.0.3:2888:3888

这里主要设置dataDir、dataLogDir、clientPort、server.0、server.1、server.2几个选项,其他的先用默认值。其余两台服务器的设置相同。

然后分别在三台服务器的/tmp/zookeeper/data目录下创建myid文件做为节点标识(分别对应配置文件中的server.0、server.1、server.2):

echo"0" > /tmp/zookeeper/data/myid #192.168.0.1echo"1" > /tmp/zookeeper/data/myid #192.168.0.2echo"2" > /tmp/zookeeper/data/myid #192.168.0.3

三、启动zookeeper

cd /home/test/server/kafka
#先不带-daemon参数直接启动,观察控制台输出是否有错,服务是否正常启动bin/zookeeper-server-start.sh config/zookeeper.properties
#正常启动无误后,可使用如下命令bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

四、配置kafka

kafka的配置文件有三个,分别是:producer.properties、consumer.properties、server.properties

cat /home/test/server/kafka/config/server.properties
# The id of the broker. This must be set to a unique integer for each broker.broker.id=0#三台服务器的id值不能想同,分别取0、1、2# The address the socket server listens on. It will get the value returned from# java.net.InetAddress.getCanonicalHostName() if not configured.#   FORMAT:#     listeners = listener_name://host_name:port#   EXAMPLE:#     listeners = PLAINTEXT://your.host.name:9092listeners=PLAINTEXT://192.168.0.1:9092  #本机ip地址# A comma separated list of directories under which to store log fileslog.dirs=/tmp/kafka-logs  #消息队列数据会保存在这里,这里偷懒放在/tmp目录下,生产环境可别这样做!!!# The minimum age of a log file to be eligible for deletion due to agelog.retention.hours=168#这个值是队列数据保存时间
cat /home/test/server/kafka/config/producer.properties
# list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...bootstrap.servers=192.168.0.1:9092,192:168.0.2:9092,192.168.0.3:9092
cat /home/test/server/kafka/config/consumer.properties
# list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...bootstrap.servers=192.168.0.1:9092,192:168.0.2:9092,192.168.0.3:9092

五、启动kafka

cd /home/test/server/kafka
bin/kafka-server-start.sh -daemon config/server.properties

启动完成后可用通过ps -ef检查zookeeper、kafka是否正常运行。


六、kafka集群管理


前台启动broker

bin/kafka-server-start.sh <path>/server.properties

Ctrl + C 关闭

后台启动broker

bin/kafka-server-start.sh -daemon <path>/server.properties

关闭broker

bin/kafka-server-stop.sh


七、kafkaTopic管理


创建topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 3 --topic topicname

删除topic

bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic topicname

查询topic列表

bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

查询topic详情

bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topicname

修改topic

bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --partitions 6 --topic topicname


八、Kafka Consumer-Groups管理


查询消费者组

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查询消费者组详情

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupname

重设消费者组位移

最早处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-earliest --execute
最新处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-latest --execute
某个位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-offset 2000 --execute
调整到某个时间之后的最早位移
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-datetime 2019-09-15T00:00:00.000

删除消费者组

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group groupname


九、kafka脚本工具


producer脚本

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicname
参数含义:
--compression-codec lz4  压缩类型
--request-required-acks all acks的值
--timeout 3000  linger.ms的值
--message-send-max-retries 10   retries的值
--max-partition-memory-bytes batch.size值

consumer脚本

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
指定groupid
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
--consumer-property group.id=old-consumer-group
指定分区
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
--partition 0

kafka-run-class脚本

kafka-run-class.sh kafka.tools.ConsoleConsumer   就是 kafka-console-consumer.sh
kafka-run-class.sh kafka.tools.ConsoleProducer   就是 kafka-console-producer.sh

获取topic当前消息数

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topicname --time -1

--time -1表示最大位移 --time -2表示最早位移

查询_consumer_offsets

bin/kafka-simple-consumer-shell.sh --topic _consumer_offsets --partition 12 --broker-list localhost:9092 --formatter "kafka.coorfinator.GroupMetadataManager\$OffsetsMessageFormatter"


十、MirrorMaker


跨机房灾备工具

bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist topicA|topicB


相关文章
|
3月前
|
安全 Oracle 关系型数据库
【赵渝强老师】基于PostgreSQL的MPP集群:Greenplum
Greenplum是基于PostgreSQL的MPP架构分布式数据库,由Master、Segment和Interconnect组成,支持海量数据并行处理。本文介绍其架构及集群安装配置全过程。
233 1
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
411 4
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
463 2
|
8月前
|
消息中间件 运维 Java
搭建Zookeeper、Kafka集群
本文详细介绍了Zookeeper和Kafka集群的搭建过程,涵盖系统环境配置、IP设置、主机名设定、防火墙与Selinux关闭、JDK安装等基础步骤。随后深入讲解了Zookeeper集群的安装与配置,包括数据目录创建、节点信息设置、SASL认证配置及服务启动管理。接着描述了Kafka集群的安装,涉及配置文件修改、安全认证设置、生产消费认证以及服务启停操作。最后通过创建Topic、发送与查看消息等测试验证集群功能。全网可搜《小陈运维》获取更多信息。
724 1
|
9月前
|
SQL 关系型数据库 PostgreSQL
【YashanDB 知识库】从 PostgreSQL 迁移到 YashanDB 如何进行数据行数比对
【YashanDB 知识库】从 PostgreSQL 迁移到 YashanDB 如何进行数据行数比对
|
9月前
|
SQL Oracle 关系型数据库
【YashanDB知识库】从PostgreSQL迁移到YashanDB如何进行数据行数比对
本文介绍了通过Oracle视图`v$sql`和`v$sql_plan`分析SQL性能的方法。首先,可通过`plan_hash_value`从`v$sql_plan`获取SQL执行计划,结合示例展示了具体查询方式。文章还创建了一个UDF函数`REPEAT`用于格式化输出,便于阅读复杂执行计划。最后,通过实例展示了如何根据`plan_hash_value`获取SQL文本及其内存中的执行计划,帮助优化性能问题。
|
12月前
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
2770 1
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
9月前
|
消息中间件 人工智能 安全
秒级灾备恢复:Kafka 2025 AI自愈集群下载及跨云Topic迁移终极教程
Apache Kafka 2025作为企业级实时数据中枢,实现五大革新:量子安全传输(CRYSTALS-Kyber抗量子加密算法)、联邦学习总线(支持TensorFlow Federated/Horizontal FL框架)、AI自愈集群(MTTR缩短至30秒内)、多模态数据处理(原生支持视频流、3D点云等)和跨云弹性扩展(AWS/GCP/Azure间自动迁移)。平台采用混合云基础设施矩阵与软件依赖拓扑设计,提供智能部署架构。安装流程涵盖抗量子安装包获取、量子密钥配置及联邦学习总线设置。
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL的数据文件
PostgreSQL的物理存储结构主要包括数据文件、日志文件等。数据文件按oid命名,超过1G时自动拆分。通过查询数据库和表的oid,可定位到具体的数据文件。例如,查询数据库oid后,再查询特定表的oid及relfilenode,即可找到该表对应的数据文件位置。
282 1

热门文章

最新文章

推荐镜像

更多