Kafka

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: kafka是什么 kafka是采用scala语言开发的一个 多分区 、 多副本 且 基于zookeeper协调的 分布式 消息系统。 kafka是 高吞吐、可持久化、可水平扩展、支持流数据等多种特性的分布式流式处理平台 kafka扮演的三大角色:消息系统、存储系统、流式处理平台.

kafka是什么

  • kafka是采用scala语言开发的一个 多分区 、 多副本 且 基于zookeeper协调的 分布式 消息系统。
  • kafka是 高吞吐、可持久化、可水平扩展、支持流数据等多种特性的分布式流式处理平台
  • kafka扮演的三大角色:消息系统、存储系统、流式处理平台

基本概念

  • Producer:
  • Consumer:
  • broker:

docker安装kafka

  • 安装docker-compose
sudo curl -L https://github.com/docker/compose/releases/download/1.16.1/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose

#daocloud镜像源
#curl -L https://get.daocloud.io/docker/compose/releases/download/1.22.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose

chmod +x /usr/local/bin/docker-compose
docker-compose --version

下载zookeeper和kafka镜像

docker pull wurstmeister/kafka:2.12-2.2.0
docker pull wurstmeister/zookeeper:3.4.6

启动zookeeper集群和kafka集群

docker-compose.yml的内容如下

version: '3.1'

services:
  zoo1:
    image: wurstmeister/zookeeper:3.4.6
    restart: always
    hostname: zoo1
    container_name: zoo1
    #domainname: 
    ports:
      - 2181:2181
    volumes:
      - /usr/local/docker_app/zookeeper/zoo1/data:/data
      - /usr/local/docker_app/zookeeper/zoo1/datalog:/datalog
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888

  zoo2:
    image: wurstmeister/zookeeper:3.4.6
    restart: always
    hostname: zoo2
    container_name: zoo2
    ports:
      - 2182:2181
    volumes:
      - /usr/local/docker_app/zookeeper/zoo2/data:/data
      - /usr/local/docker_app/zookeeper/zoo2/datalog:/datalog
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888

  zoo3:
    image: wurstmeister/zookeeper:3.4.6
    restart: always
    hostname: zoo3
    container_name: zoo3
    ports:
      - 2183:2181
    volumes:
      - /usr/local/docker_app/zookeeper/zoo3/data:/data
      - /usr/local/docker_app/zookeeper/zoo3/datalog:/datalog
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888


  kafka1:
    image: wurstmeister/kafka:2.12-2.2.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka1
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
      KAFKA_BROKER_ID: 1
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    container_name: kafka1
    hostname: kafka1

  kafka2:
    image: wurstmeister/kafka:2.12-2.2.0
    ports:
      - "9093:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka2
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
      KAFKA_BROKER_ID: 2
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    container_name: kafka2
    hostname: kafka2


  kafka3:
    image: wurstmeister/kafka:2.12-2.2.0
    ports:
      - "9094:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka3
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
      KAFKA_BROKER_ID: 3
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    container_name: kafka3
    hostname: kafka3

启动集群

#启动集群
docker-compose -f docker-compose.yml up -d

#查看启动状态
docker-compose -f docker-compose.yml ps

如下图
image

测试集群

docker exec -it kafka1 /bin/bash 
kafka-topics.sh -zookeeper zoo1:2181 --create --topic topic-demo --replication-factor 1 --partitions 2
kafka-topics.sh -zookeeper zoo1:2181 --describe --topic topic-demo
kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic topic-demo

docker exec -it kafka1 /bin/bash
kafka-console-producer.sh --broker-list kafka1:9092 --topic topic-demo
>hello
>hello kafka

image
image

使用java客户端连接kafka

  • spring boot项目,pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>spring-boot-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-kafka</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  • java代码
package com.example.demo;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootKafkaApplication {
    public static final String brokerList = "kafka1:9092";

    public static final String topic = "topic-demo";
    
    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaApplication.class, args);
        
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "hello, renchenglin");
        
        try {
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        producer.close();
    }
    

}

遇到的坑

在启动kafka时,
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
配置为
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
时,在安装docker的虚拟机上(IP为192.168.31.109)可以正常测试,在宿主机(安装docker的虚拟机的物理机,IP为192.168.31.201)上使用java程序无法访问

后来改为
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
时仍然无法访问,最终在物理机上配置上配置DNS
C:WindowsSystem32driversetchosts 文件中追加如下配置OK

192.168.31.109 kafka1
192.168.31.109 kafka2
192.168.31.109 kafka3

kafka sasl


touch kafka_server_jaas.conf
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_alice="alice";
};



touch kafka_client_jaas.conf
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin";
};



更改server.properties配置文件:
listeners=SASL_PLAINTEXT://localhost:9092
advertised.host.name=kafka1
advertised.listeners=SASL_PLAINTEXT://192.168.31.109:9092
zookeeper.connect=192.168.31.109:2181
# 使用的认证协议 
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制 
sasl.enabled.mechanisms=PLAIN  
sasl.mechanism.inter.broker.protocol=PLAIN   
# 完成身份验证的类 
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。 
super.users=User:admin


在kafka-console-consumer.sh和kafka-console-producer.sh中添加:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN


touch kafka_zoo_jaas.conf
ZKServer{
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin";
};


kafka-server-start.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=/root/kafka/kafka_2.12-2.2.0/config/kafka_server_jaas.conf"


kafka-console-consumer.sh和kafka-console-producer.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=/root/kafka/kafka_2.12-2.2.0/config/kafka_client_jaas.conf"



bin/zookeeper-server-start.sh config/zookeeper.properties & (&代表后台运行)

bin/kafka-server-start.sh config/server.properties &


bin/kafka-console-producer.sh --broker-list 192.168.31.109:9092 --topic test --producer.config config/producer.properties
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.109:9092 --topic test --from-beginning --consumer.config config/consumer.properties

相关文章
|
消息中间件 存储 Kafka
Kafka详解
当今数字化世界中,数据的流动变得至关重要。为了满足不断增长的数据需求,企业需要强大而可靠的数据处理工具。Apache Kafka就是这样一个工具,它在数据流处理领域表现出色。本文将详细介绍Apache Kafka,探讨它的核心概念、用途以及如何使用它来构建强大的数据流应用。
|
3月前
|
消息中间件 存储 缓存
kafka(一)
kafka(一)
|
3月前
|
消息中间件 存储 算法
kafka(二)
kafka(二)
|
4月前
|
消息中间件 Kafka
kafka里的acks是什么
【8月更文挑战第3天】kafka里的acks是什么
211 0
|
7月前
|
消息中间件 分布式计算 Java
|
7月前
|
消息中间件 存储 分布式计算
kafka 详细介绍
kafka 详细介绍
|
7月前
|
消息中间件 存储 Java
玩转Kafka—初步使用
玩转Kafka—初步使用
53 0
|
消息中间件 缓存 算法
Kafka为什么这么快?
Kafka 是一个基于发布-订阅模式的消息系统,它可以在多个生产者和消费者之间传递大量的数据。Kafka 的一个显著特点是它的高吞吐率,即每秒可以处理百万级别的消息。那么 Kafka 是如何实现这样高得性能呢?本文将从七个方面来分析 Kafka 的速度优势。
80 1
|
消息中间件 开发框架 Java
113 Kafka介绍
113 Kafka介绍
86 0
|
消息中间件 缓存 Java
Kafka介绍
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。 Kafka是一种高吞吐量的分布式发布订阅消息系统,作为消息中间件来说都起到了系统间解耦、异步、削峰等作用,同时又提供了Kafka streaming插件包在应用端实现实时在线流处理,它可以收集并处理用户在网站中的所有动作流数据以及物联网设备的采样信息
172 0