一文带你秒懂 Kafka工作原理!

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: Apache Kafka 是一个高吞吐量、低延迟的分布式消息系统,广泛应用于实时数据处理、日志收集和消息队列等领域。它最初由LinkedIn开发,2011年成为Apache项目。Kafka支持消息的发布与订阅,具备高效的消息持久化能力,适用于TB级数据的处理。

思维导图

image.png

kafka简介

Apache Kafka 是一个分布式流处理平台,广泛应用于实时数据处理、日志收集、消息队列等领域。

Kafka 是一个高吞吐量、低延迟的分布式消息系统,它最初由 LinkedIn 开发,并在 2011 年成为 Apache 项目。Kafka 主要用于构建实时数据管道和流应用程序,它能够发布和订阅消息流,持久化消息以便后续处理。

其主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能高吞吐率。

  • 即使在非常廉价的机器上也能做到单机支持每秒100K条消息的传输

  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输,同时支持离线数据处理和实时数据处理

为什么要用消息系统?

Kafka 本质上是一个 MQ(Message Queue),使用消息队列的好处?

  • 解耦:允许我们独立修改队列两边的处理过程而互不影响。

  • 冗余:有些情况下,我们在处理数据的过程会失败造成数据丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险, 确保你的数据被安全的保存直到你使用完毕。

  • 峰值处理能力:不会因为突发的流量请求导致系统崩溃,消息队列能够使服务顶住突发的访问压力, 有助于解决生产消息和消费消息的处理速度不一致的情况

  • 异步通信:消息队列允许用户把消息放入队列但不立即处理它, 等待后续进行消费处理。

用机油装箱举个例子。

image.png

所以消息系统就是如上图我们所说的仓库,能在中间过程作为缓存,并且实现解耦合的作用。

引入一个场景,我们知道中国移动,中国联通,中国电信的日志处理,是交给外包去做大数据分析的,假设现在它们的日志都交给了你做的系统去做用户画像分析。

image.png

按照刚刚前面提到的消息系统的作用,我们知道了消息系统其实就是一个模拟缓存,且仅仅是起到了缓存的作用而并不是真正的缓存,数据仍然是存储在磁盘上面而不是内存。

kafka基础知识

Topic 主题

kafka 学习了数据库里面的设计,在里面设计了topic(主题),这个东西类似于关系型数据库的表。

image.png

此时我需要获取中国移动的数据,那就直接监听 TopicA 即可。

Partition 分区

kafka还有一个概念叫Partition(分区),分区具体在服务器上面表现起初就是一个目录,一个主题下面有多个分区,这些分区会存储到不同的服务器上面,或者说,其实就是在不同的主机上建了不同的目录。这些分区主要的信息就存在了.log文件里面。跟数据库里面的分区差不多,是为了提高性能。

image.png

至于为什么提高了性能,很简单,多个分区多个线程,多个线程并行处理肯定会比单线程好得多

Topic 和 partition 像是 HBASE 里的 table 和 region 的概念,table 只是一个逻辑上的概念,真正存储数据的是 region,这些 region 会分布式地存储在各个服务器上面,对应于kafka,也是一样,Topic 也是逻辑概念,而 partition 就是分布式存储单元。

这个设计是保证了海量数据处理的基础。我们可以对比一下,如果 HDFS 没有 block 的设计,一个 100T 的文件也只能单独放在一个服务器上面,那就直接占满整个服务器了,引入 block后,大文件可以分散存储在不同的服务器上。

注意:

  • 分区会有单点故障问题,所以我们会为每个分区设置副本数

  • 分区的编号是从0开始的

Producer 生产者

往消息系统里面发送数据的就是生产者

image.png

Consumer 消费者

从 kafka 里读取数据的就是消费者

image.png

Message 消息

kafka 里面的我们处理的数据叫做消息

kafka的集群架构

创建一个 TopicA 的主题,3个分区分别存储在不同的服务器,也就是 broker 下面。Topic 是一个逻辑上的概念,并不能直接在图中把 Topic 的相关单元画出

image.png

需要注意:kafka在0.8版本以前是没有副本机制的,所以在面对服务器宕机的突发情况时会丢失数据,所以尽量避免使用这个版本之前的kafka

Replica 副本

kafka 中的 partition 为了保证数据安全,所以每个 partition 可以设置多个副本。

此时我们对分区 0,1,2 分别设置 3 个副本(其实设置两个副本是比较合适的)

image.png

而且其实每个副本都是有角色之分的,它们会选取一个副本作为 leader,而其余的作为follower,我们的生产者在发送数据的时候,是直接发送到 leader partition 里面,然后follower partition 会去 leader 那里自行同步数据,消费者消费数据的时候,也是从leader那去消费数据的。

image.png

Consumer Group 消费者组

我们在消费数据时会在代码里面指定一个 group.id,这个 id 代表的是消费组的名字,而且这个 group.id 就算不设置,系统也会默认设置。

conf.setProperty("group.id","tellYourDream")

我们所熟知的一些消息系统一般来说会这样设计,就是只要有一个消费者去消费了消息系统里面的数据,那么其余所有的消费者都不能再去消费这个数据。可是 kafka 并不是这样,比如现在 consumerA 去消费了一个 topicA 里面的数据。

consumerA:    
    group.id = a
consumerB:    
    group.id = a
consumerC:    
    group.id = b
consumerD:    
    group.id = b

再让 consumerB 也去消费 TopicA 的数据,它是消费不到了,但是我们在 consumerC中重新指定一个另外的 group.id,consumerC 是可以消费到 topicA 的数据的。而consumerD 也是消费不到的,所以在 kafka 中,不同组可有唯一的一个消费者去消费同一主题的数据。

所以消费者组就是让多个消费者并行消费信息而存在的,而且它们不会消费到同一个消息,如下,consumerA,B,C是不会互相干扰的

consumer group:a    
      consumerA    
      consumerB    
      consumerC

image.png

如图,因为前面提到过了消费者会直接和leader建立联系,所以它们分别消费了三个leader,所以一个分区不会让消费者组里面的多个消费者去消费,但是在消费者不饱和的情况下,一个消费者是可以去消费多个分区的数据的。

Controller

熟知一个规律:在大数据分布式文件系统里面,95%的都是主从式的架构,个别是对等式的架构,比如 ElasticSearch。

kafka也是主从式的架构,主节点就叫controller,其余的为从节点,controller是需要和zookeeper 进行配合管理整个kafka集群。

kafka和zookeeper如何配合工作

kafka严重依赖于zookeeper集群(所以之前的zookeeper文章还是有点用的)。所有的broker在启动的时候都会往zookeeper进行注册,目的就是选举出一个controller,这个选举过程非常简单粗暴,就是一个谁先谁当的过程,不涉及什么算法问题。

那成为controller之后要做啥呢,它会监听zookeeper里面的多个目录,例如有一个目录/brokers/,其他从节点往这个目录上注册(就是往这个目录上创建属于自己的子目录而已)自己,这时命名规则一般是它们的id编号,比如/brokers/0,1,2

注册时各个节点必定会暴露自己的主机名,端口号等等的信息,此时controller就要去读取注册上来的从节点的数据(通过监听机制),生成集群的元数据信息,之后把这些信息都分发给其他的服务器,让其他服务器能感知到集群中其它成员的存在。

此时模拟一个场景,我们创建一个主题(其实就是在zookeeper上/topics/topicA这样创建一个目录而已),kafka会把分区方案生成在这个目录中,此时controller就监听到了这一改变,它会去同步这个目录的元信息,然后同样下放给它的从节点,通过这个方法让整个集群都得知这个分区方案,此时从节点就各自创建好目录等待创建分区副本即可。这也是整个集群的管理机制。

kafka的优势

Kafka性能好在什么地方?

顺序写

操作系统每次从磁盘读写数据的时候,需要先寻址,也就是先要找到数据在磁盘上的物理位置,然后再进行数据读写,如果是机械硬盘,寻址就需要较长的时间。

kafka的设计中,数据其实是存储在磁盘上面,一般来说,会把数据存储在内存上面性能才会好。但是kafka用的是顺序写,追加数据是追加到末尾,磁盘顺序写的性能极高,在磁盘个数一定,转数达到一定的情况下,基本和内存速度一致随机写的话是在文件的某个位置修改数据,性能会较低。

零拷贝

先来看看非零拷贝的情况

image.png

可以看到数据的拷贝从内存拷贝到 kafka 服务进程那块,又拷贝到socket缓存那块,整个过程耗费的时间比较高,kafka 利用了 Linux 的 sendFile 技术(NIO),省去了进程切换和一次数据拷贝,让性能变得更好。

image.png

日志分段存储

Kafka规定了一个分区内的.log文件最大为1G,做这个限制目的是为了方便把.log加载到内存去操作

00000000000000000000.index00000000000000000000.log00000000000000000000.timeindex

00000000000005367851.index00000000000005367851.log00000000000005367851.timeindex

00000000000009936472.index00000000000009936472.log00000000000009936472.timeindex

Kafka的网络设计

kafka的网络设计和Kafka的调优有关,这也是为什么它能支持高并发的原因

image.png

首先客户端发送请求全部会先发送给一个Acceptor,broker里面会存在3个线程(默认是3个),这3个线程都是叫做processor,Acceptor不会对客户端的请求做任何的处理,直接封装成一个个socketChannel发送给这些processor形成一个队列,发送的方式是轮询,就是先给第一个processor发送,然后再给第二个,第三个,然后又回到第一个。消费者线程去消费这些socketChannel时,会获取一个个request请求,这些request请求中就会伴随着数据。

线程池里面默认有8个线程,这些线程是用来处理request的,解析请求,如果request是写请求,就写到磁盘里。读的话返回结果。processor会从response中读取响应数据,然后再返回给客户端。这就是Kafka的网络三层架构。

所以如果我们需要对kafka进行增强调优,增加processor并增加线程池里面的处理线程,就可以达到效果。request和response那一块部分其实就是起到了一个缓存的效果,是考虑到processor们生成请求太快,线程数不够不能及时处理的问题。

所以这就是一个加强版的reactor网络线程模型。

Kafka 安装与配置

安装 Kafka

使用包管理器(如 yum)安装

# 安装 Java
sudo yum install java-1.8.0-openjdk-devel -y

# 下载并解压 Kafka
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xzf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0

使用 Docker 安装

# 启动 Kafka 和 ZooKeeper
docker-compose up -d

docker-compose.yml 文件内容如下:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE

配置 Kafka

kafka 配置文件位于 config/server.properties。主要配置项包括:

  • broker.id:Broker 的唯一标识。
  • listeners:Broker 监听的地址和端口。
  • log.dirs:日志存储的目录。
  • zookeeper.connect:ZooKeeper 集群的地址。
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181

启动 Kafka

# 启动 ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# 启动 Kafka Broker
bin/kafka-server-start.sh config/server.properties

Kafka 使用教程

创建主题

使用kafka-topics.sh脚本(在Windows上是kafka-topics.bat)来创建主题。以下是一个基本命令的例子:

kafka-topics.sh --create --topic my-topic-name --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
  • --create: 表示我们想要创建一个新主题。
  • --topic my-topic-name: 指定要创建的主题名称。
  • --bootstrap-server localhost:9092: 指定Kafka broker的地址。
  • --partitions 3: 设置主题的分区数量。分区的数量决定了该主题能够并行处理的消息量。
  • --replication-factor 1: 设置副本因子。这是为了确保即使某些节点失败,数据仍然安全。对于生产环境,推荐设置为大于1的值。

生产消息

使用kafka-console-producer.sh脚本(在Windows上是kafka-console-producer.bat)来启动一个控制台生产者,该生产者可以从标准输入读取消息并将其发送到指定的主题。 下面是一个基本命令的例子:

kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic-name
  • --broker-list localhost:9092: 指定Kafka broker的地址列表。如果你有多个broker,你可以在这里提供它们的地址,用逗号分隔。
  • --topic my-topic-name: 指定要发送消息的目标主题名称。

输入消息后按 Enter 键发送:

Hello Kafka

消费消息

使用kafka-console-consumer.sh脚本(在Windows上是kafka-console-consumer.bat)来启动一个控制台消费者,该消费者可以从指定的主题中读取消息并将其输出到标准输出(即命令行界面)。 下面是一个基本命令的例子:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic-name --from-beginning
  • --bootstrap-server localhost:9092: 指定Kafka broker的地址列表。
  • --topic my-topic-name: 指定要消费消息的目标主题名称。
  • --from-beginning: 这个参数告诉消费者从最早的消息开始读取。如果不加这个参数,默认情况下消费者将只接收从它启动之后发送到主题的新消息。

Java 客户端示例

生产者示例

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class SimpleKafkaProducer {
   

    public static void main(String[] args) {
   
        // 设置生产者配置属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka broker地址
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建生产者实例
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
   

            // 构造消息记录
            ProducerRecord<String, String> record = new ProducerRecord<>("my-topic-name", "key", "Hello, Kafka!");

            // 发送消息并等待结果(同步)
            try {
   
                RecordMetadata metadata = producer.send(record).get();
                System.out.printf("Message sent to topic:%s partition:%d offset:%d%n",
                        metadata.topic(), metadata.partition(), metadata.offset());
            } catch (ExecutionException | InterruptedException e) {
   
                e.printStackTrace();
            }

            // 或者异步发送消息并提供回调函数
            producer.send(record, new Callback() {
   
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
   
                    if (exception == null) {
   
                        System.out.printf("Message sent to topic:%s partition:%d offset:%d%n",
                                metadata.topic(), metadata.partition(), metadata.offset());
                    } else {
   
                        exception.printStackTrace();
                    }
                }
            });

            // 强制发送所有消息并关闭生产者
            producer.flush();
        }
    }
}

消费者示例

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleKafkaConsumer {
   

    public static void main(String[] args) {
   
        // 设置消费者配置属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka broker地址
        props.put("group.id", "test-group"); // 消费者组ID
        props.put("enable.auto.commit", "true"); // 自动提交偏移量
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建消费者实例
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
   

            // 订阅主题
            consumer.subscribe(Collections.singletonList("my-topic-name"));

            // 开始循环消费消息
            while (true) {
   
                // 拉取消息
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                // 遍历记录集中的每条记录
                for (ConsumerRecord<String, String> record : records) {
   
                    System.out.printf("offset = %d, key = %s, value = %s%n",
                            record.offset(), record.key(), record.value());
                }
            }
        }
    }
}
相关文章
|
2月前
|
消息中间件 存储 缓存
大厂面试高频:Kafka 工作原理 ( 详细图解 )
本文详细解析了 Kafka 的核心架构和实现原理,消息中间件是亿级互联网架构的基石,大厂面试高频,非常重要,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka 工作原理 ( 详细图解 )
|
8月前
|
消息中间件 存储 负载均衡
kafka底层原理分析
kafka底层原理分析
123 2
|
3月前
|
消息中间件 缓存 分布式计算
大数据-59 Kafka 高级特性 消息发送03-自定义拦截器、整体原理剖析
大数据-59 Kafka 高级特性 消息发送03-自定义拦截器、整体原理剖析
47 2
|
3月前
|
消息中间件 缓存 大数据
大数据-57 Kafka 高级特性 消息发送相关01-基本流程与原理剖析
大数据-57 Kafka 高级特性 消息发送相关01-基本流程与原理剖析
63 3
|
3月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
248 0
|
5月前
|
消息中间件 Kafka 数据库
深入理解Kafka的数据一致性原理及其与传统数据库的对比
【8月更文挑战第24天】在分布式系统中,确保数据一致性至关重要。传统数据库利用ACID原则保障事务完整性;相比之下,Kafka作为高性能消息队列,采用副本机制与日志结构确保数据一致性。通过同步所有副本上的数据、维护消息顺序以及支持生产者的幂等性操作,Kafka在不牺牲性能的前提下实现了高可用性和数据可靠性。这些特性使Kafka成为处理大规模数据流的理想工具。
117 6
|
5月前
|
消息中间件 存储 SQL
Kafka架构及其原理
Kafka架构及其原理
156 1
|
5月前
|
消息中间件 存储 缓存
这么酷的Kafka,背后的原理了解一下又不会死!
这么酷的Kafka,背后的原理了解一下又不会死!
216 2
|
6月前
|
消息中间件 存储 缓存
深入理解Kafka核心设计及原理(五):消息存储
深入理解Kafka核心设计及原理(五):消息存储
179 8
|
6月前
|
消息中间件 存储 Kafka
深入理解Kafka核心设计及原理(四):主题管理
深入理解Kafka核心设计及原理(四):主题管理
96 8

热门文章

最新文章