1、下载
wget https://mirror-hk.koddos.net/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
2、解压
tar -xzf kafka_2.13-2.8.0.tgz $ cd kafka_2.13-2.8.0
配置文件:
broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能,新版没有找到
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
3、启动KAFKA环境
# Start the ZooKeeper service# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.$ bin/zookeeper-server-start.sh config/zookeeper.properties
打开另一个终端会话并运行:
# Start the Kafka broker service$ bin/kafka-server-start.sh config/server.properties
将一些事件写入主题
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 This is my first event This is my second event
将一些事件写入主题
写
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 This is my first event This is my second event
读
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
添加配置
bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
删除配置
bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
删除topic
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name
Kafka群集将自动检测任何代理关闭或故障,并为该计算机上的分区选择新的领导者。无论服务器发生故障还是为了维护或配置更改而有意将其关闭,都会发生这种情况。对于后一种情况,Kafka支持一种更优雅的机制来停止服务器,而不仅仅是杀死服务器。当服务器正常停止时,它具有两个优化功能,它将利用以下优势:
- 它将所有日志同步到磁盘上,以避免重新启动时需要进行任何日志恢复(即,验证日志尾部所有消息的校验和)。日志恢复需要时间,因此可以加快有意重启的速度。
- 在关闭之前,它将把服务器所领导的所有分区迁移到其他副本。这将使领导者转移更快,并将每个分区不可用的时间减少到几毫秒。
只要服务器停止运行(不是通过强行终止),就将自动同步日志,但是受控的领导者迁移需要使用特殊的设置:
controlled.shutdown.enable=true
请注意,只有在代理上托管的所有分区都具有副本(即,复制因子大于1并且这些副本中至少有一个处于活动状态)时,受控关闭才会成功。这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。
每当代理停止或崩溃时,该代理分区的领导权就会转移到其他副本。重新启动代理后,它将仅是其所有分区的关注者,这意味着它将不用于客户端读取和写入。
为了避免这种不平衡,Kafka提出了首选副本的概念。如果分区的副本列表为1,5,9,则首选节点1作为节点5或9的引导者,因为它在副本列表中较早。默认情况下,Kafka集群将尝试恢复对已还原副本的领导权。此行为配置有:
auto.leader.rebalance.enable=true
您也可以将其设置为false,但是随后您需要通过运行以下命令来手动将领导力还原到已还原的副本中:
> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_ho
有时查看消费者的位置很有用。我们有一个工具,可以显示所有使用者在使用者组中的位置以及他们在日志末尾之后的位置。要在名为my-group的使用者组上运行此工具并消耗名为my-topic的主题,该工具将如下所示:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my-topic 0 2 4 2 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1 my-topic 1 2 3 1 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1 my-topic