环境
- Centos 7.9
- JDK 11
- Kafka 版本:kafka_2.12-3.0.1.tgz
- kafka 配置知识(https://doc.knowstreaming.com/study-kafka/6-operation#611-%E4%BC%98%E9%9B%85%E5%85%B3%E6%9C%BA)
安装步骤
1. 准备
1.1. 服务器三台做集群部署
192.168.3.100 master
192.168.3.101 nameNode01
192.168.3.102 nameNode02
1.2. 关闭防火墙
systemctl stop firewalld.service systemctl disable firewalld.service
1.3. 配置hostname
cat << EOF >> /etc/hosts 192.168.3.100 master 192.168.3.101 nameNode01 192.168.3.102 nameNode02 EOF
2. 安装JDK
yum install -y java-1.8.0-openjdk-devel.x86_64
3. 安装和配置Kafka
3.1. 解压文件
#解压文件到当前目录 tar -xvzf kafka_2.12-3.3.1.tgz -C /data
3.2. 初始化相关路径
mkdir /data/kafka mkdir /data/kafka/zk mkdir /data/kafka/zk/data mkdir /data/kafka/zk/logs mkdir /data/kafka/data mkdir /data/kafka/log/ mkdir /data/kafka/log/kafka
3.3. Zookeeper配置&启动(使用的是Kafka 自带的zk)
3.3.1. zookeeper id编号配置
注意每台机器都myid的值不同,master 1, nameNode01 2,nameNode02 3(后续和zooker中的配置需要匹配上)
touch /data/kafka/zk/data/myid echo "1" > /data/kafka/zk/data/myid
3.3.2. zookeeper配置(conf/zookeeper.properties)
#计时周期时间,zookeeper中使用的基本时间单位, 毫秒值. tickTime=2000 #同步限制,该参数配置leader和follower之间发送消息, 请求和应答的最大时间长度. 此时该参数设置为5, 说明时间限制为5倍tickTime, 即10000ms. syncLimit=5 #初始化限制,zookeeper集群中的包含多台server, 其中一台为leader, 集群中其余的server为follower。 initLimit参数配置初始化连接时, follower和leader之间的最长心跳时间. 此时该参数设置为10, 说明时间限制为5倍tickTime, 即10*2000=20000ms=20s. initLimit=10 #最大客户端连接数 maxClientCnxns=1000 #最小会话超时 minSessionTimeout=4000 #最大会话超时 maxSessionTimeout=60000 dataDir=/data/kafka/zk/data dataLogDir=/data/kafka/zk/logs # the port at which the clients will connect clientPort=2181 # Disable the adminserver by default to avoid port conflicts. # Set the port to something non-conflicting if choosing to enable this admin.enableServer=false # admin.serverPort=8080 server.1=master:2888:3888 server.2=nameNode01:2888:3888 server.3=nameNode02:2888:3888
3.3.3. zookeeper 启动&验证
注意要进入kafka解压目录,/data/kafka_2.13-3.0.1
#启动命令 bin/zookeeper-server-start.sh -daemon config/zookeeper.properties #关闭命令 bin/zookeeper-server-stop.sh -daemon config/zookeeper.properties
查看zookeeper 启动是否成功
bin/kafka-run-class.sh org.apache.zookeeper.client.FourLetterWordMain localhost 2181 srvr
部署成功可以看到(可以看到Mode为leader,另外两个机器的状态flower,这种情况下为集群部署成功)
Zookeeper version: 3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT Latency min/avg/max: 0/0.7383/9 Received: 1471 Sent: 1470 Connections: 1 Outstanding: 0 Zxid: 0x20000015a Mode: leader Node count: 151 Proposal sizes last/min/max: 48/36/866
日志存储目录在kafka_2.13-3.0.1/logs,查看启动日志是否正常
tail -f logs/zookeeper.out
4. kafka配置&启动
4.1. kafka配置(conf/server.properties)
注意每台机器都broker.id各不相同,master 1, nameNode01 2,nameNode02 3(需要跟zookeeper中的myid匹配上),listeners=PLAINTEXT://xxx:9092 是外网的访问地址,通过指定IP方式,这里每台机器都需要根据实际的IP地址进行修改
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# #在执行第一次再平衡之前,group协调员将等待更多消费者加入group的时间。 延迟时间越长意味着重新平衡的可能性越小,但是等待处理开始的时间增加 group.initial.rebalance.delay.ms=0 #broker.id是kafka broker的编号,集群里每个broker的id需不同,只要保证不同,可以为任意数字 broker.id=2 #listeners是监听地址,需要提供外网服务的话,要设置本地的IP地址 listeners=PLAINTEXT://192.168.3.100:9092 #kafka数据存储目录 log.dirs=/data/kafka/data #设置Zookeeper集群地址 zookeeper.connect=master:2181,nameNode01:2181,nameNode02:2181 #为新建Topic的默认Partition数量 num.partitions=1 #follow从leader拉取消息进行同步数据的拉取线程数 num.replica.fetchers=1 #kafka日志存放路径 kafka.log4j.dir=/data/log/kafka #是否自动创建topic,当读写数据时,如果topic没有则会自动创建 auto.create.topics.enable=true #启用自动均衡 auto.leader.rebalance.enable=true #broker在关闭时会向controller报告自己关闭,这样controller可以对broker的下线及时做一些操作,比如partition的重新选举、分区副本的关闭、通知其他的broker元数据变动等 controlled.shutdown.enable=true #broker关闭时向contoller发送报告的重试次数 controlled.shutdown.max.retries=3 #ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大 zookeeper.session.timeout.ms=6000 #ZooKeeper的连接超时时间 zookeeper.connection.timeout.ms =18000 #topic默认分区的replication个数 ,不能大于集群中broker的个数。 default.replication.factor=2 #broker处理消息的最大线程数,一般情况下不需要去修改,默认3 num.network.threads=3 #broker处理磁盘IO的线程数,数值应该大于你的硬盘数 num.io.threads=8 #socket的发送缓冲区,socket的调优参数SO_SNDBUFF socket.send.buffer.bytes=102400 #socket的接受缓冲区,socket的调优参数SO_RCVBUFF socket.receive.buffer.bytes=102400 #socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖 socket.request.max.bytes=104857600 #broker启动的过程中会加载此节点上所有topic的log文件,如果数据量非常大会导致加载时间过长,通过修改处理线程数可以加快log的恢复速度。默认1 num.recovery.threads.per.data.dir=1 # topic的offset的备份份数(消费者offset)。建议设置更高的数字保证更高的可用性 offsets.topic.replication.factor=3 #事务日志Topic副本数 transaction.state.log.replication.factor=3 #指定事务日志Topic的ISR中的最小副本数是多少,用于服务min.insync.replicas,默认值为1 transaction.state.log.min.isr=2 #启用删除主题。 如果此配置已关闭,则通过管理工具删除主题将不起作用。删除topic是影响注册在/admin/delete_topics的监听 delete.topic.enable=true #日志达到删除大小的阈值,-1为不限制 log.retention.bytes=-1 #检查日志段文件的间隔时间,以确定是否文件属性是否到达删除要求 log.retention.check.interval.ms=300000 #每个日志文件删除之前保存的时间,单位小时 log.retention.hours=12 #topic 分区的日志存放在某个目录下诸多文件中,这些文件将partition的日志切分成一段一段的,这就是段文件(segment file);一个topic的一个分区对应的所有segment文件称为log。这个设置控制着一个segment文件的最大的大小,如果超过了此大小,就会生成一个新的segment文件 log.segment.bytes=1073741824 #这个设置会强制Kafka去新建一个新的log segment文件,即使当前使用的segment文件的大小还没有超过log.segment.bytes log.roll.hours=168 #分区rebalance检查的频率,由控制器触发,默认300 leader.imbalance.check.interval.seconds=300 #每个broker允许的不平衡的leader的百分比。如果每个broker超过了这个百分比,复制控制器会对分区进行重新的平衡。该值以百分比形式指定,默认10 leader.imbalance.per.broker.percentage=10 #日志压缩去重时候的缓存空间,在空间允许的情况下,越大越好,默认134217728 log.cleaner.dedupe.buffer.size=134217728 #对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据,默认86400000 log.cleaner.delete.retention.ms=86400000 #启用日志清理器进程在服务器上运行。使用了cleanup.policy = compact的主题,包括内部offsets主题,都应该启动该选项。如果被禁用的话,这些话题将不会被压缩,并且会不断增长,默认true log.cleaner.enable=true #控制了log compactor进行clean操作的频率。默认情况下,当log的50%以上已被clean时,就不用继续clean了。此配置可以被覆盖。默认0.5 log.cleaner.min.cleanable.ratio=0.5 #用于日志清理的后台线程的数量,默认1 log.cleaner.threads=1 #kafka允许的最大的一个批次的消息大小。 如果这个数字增加,并且有0.10.2版本以下的消费者,那么消费者的提取大小也必须增加,以便他们可以取得这么大的记录批次。在最新的消息格式版本中,记录总是被组合到一个批次以提高效率。 在以前的消息格式版本中,未压缩的记录不会分组到批次中,并且此限制仅适用于该情况下的单个记录。可以使用主题级别max.message.bytes来设置每个主题,默认1000012 message.max.bytes=1000000 #当生产者将ack设置为“全部”(或“-1”)时,min.insync.replicas指定必须确认写入被认为成功的最小副本数(必须确认每一个repica的写数据都是成功的)。 如果这个最小值不能满足,那么生产者将会引发一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。当一起使用时,min.insync.replicas和acks允许您强制更大的耐久性保证。 一个典型的情况是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并且生产者使用“all”选项。 这将确保如果大多数副本没有写入生产者则抛出异常。默认1 min.insync.replicas=1 #erver端处理请求时的I/O线程的数量,不要小于磁盘的数量。默认8 num.io.threads=8 #Offsets topic的分区数量(部署后不应更改),默认50 offsets.topic.num.partitions=50 #仅在未设置“listeners”时使用。 使用listeners来代替。 这个端口来监听和接受连接 #port=9092 #为每个分区设置获取的消息的字节数。 这不是绝对最大值,如果第一个非空分区中的第一个record batch大于此值,那么record batch仍将被返回以确保可以进行。 代理接受的最大记录批量大小通过message.max.bytes(broker config)或max.message.bytes(topic config)进行定义,默认1048576 replica.fetch.max.bytes=1048576 #如果一个follower在这个时间内没有发送fetch请求,leader将从ISR重移除这个follower,并认为这个follower已经挂了,默认10000 replica.lag.time.max.ms=10000 #指明了是否能够使不在ISR中replicas follower设置用来作为leader,默认aflse unclean.leader.election.enable=false #用于经纪人之间沟通的监听协议 security.inter.broker.protocol=PLAINTEXT #服务器是否允许自动生成broker.id;如果允许则产生的值会交由reserved.broker.max.id审核,默认false broker.id.generation.enable=false
4.2. kafka启动&验证
bin/kafka-server-start.sh -daemon config/server.properties bin/kafka-server-stop.sh -daemon config/server.properties
cat logs/kafkaServer.out
#创建topic bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic my-topic --replication-factor 3 --partitions 3 #推送数据测试 bin/kafka-verifiable-producer.sh --topic test --bootstrap-server master:9092 --max-messages 1000 #消费指令测试 sh bin/kafka-consumer-perf-test.sh -topic test --bootstrap-server master:9092 --messages 100
Kafka 部署完成