点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka (正在更新…)
章节内容
上节我们完成了:
Kafka介绍
ZK的基本环境
Kafka下载解压配置
Kafka启动配置
Kafka启动服务
Kafka启动
上节我们通过sh脚本启动,但是当我们的SSH关闭的时候,Kafka服务也退出。
这里我们可以使用 Kakfa 的守护进程的方式启动,就可以在后台运行了。
kafka-server-start.sh -daemon /opt/servers/kafka_2.12-2.7.2/config/server.properties
启动之后,我们可以通过 ps 工具看到:
ps aux | grep kafka
返回结果如下图:
sh脚本使用
topics.sh
kakfa-topics.sh 用于管理主题
查看所有
kafka-topics.sh --list --zookeeper h121.wzk.icu:2181
当前执行返回的是空的,因为我们没有任何主题。
创建主题
kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_1 --partitions 1 --replication-factor 1
执行结果中,我们可以观察到,已经顺利的完成了。
查看主题
kafka-topics.sh --zookeeper h121.wzk.icu:2181 --describe --topic wzk_topic_1
执行结果中,我们可以观察到,已经顺利的完成了。
删除主题
kafka-topics.sh --zookeeper h121.wzk.icu:2181 --delete --topic wzk_topic_1
新建主题(用于测试)
kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_test --partitions 1 --replication-factor 1
producer.sh
kafka-console-producer.sh 用于生产消息
生成数据
kafka-console-producer.sh --topic wzk_topic_test --broker-list h121.wzk.icu:9092
手动生成一批数据来进行测试:
consumer.sh
kafka-console-consumer.sh 用于消费消息
消费数据
kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test
此时,我们需要再开启一个 Producer 产生数据,它才会继续消费。
从头消费
kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test --from-beginning
从头开始消费的话,我们可以看到消费者已经把刚才我们写入的数据都消费了
Java API
架构图
POM
kafka-clients 是 Apache Kafka 提供的一个 Java 库,用于与 Kafka 进行交互。它是 Kafka 的核心组件之一,提供了对 Kafka 生产者和消费者的实现,使得 Java 应用程序可以方便地将数据发送到 Kafka 主题或从中读取数据。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.7.2</version> </dependency>
Producer(生产者): 生产者负责将消息发送到 Kafka 的指定主题(Topic)。每条消息由一个键值对(key-value pair)组成,Kafka 会根据键对消息进行分区(Partitioning)。
Consumer(消费者): 消费者从 Kafka 的主题中读取消息。消费者组(Consumer Group)允许多个消费者协调工作,共同处理来自主题的消息。
Topic(主题): Kafka 中的逻辑通道,用于存储消息。每个主题可以有多个分区(Partition),消息在分区内是有序的,但在不同分区间无序。
Partition(分区): 主题的物理分片,允许 Kafka 在分布式环境中扩展性能。每个分区可以有一个或多个副本(Replica),其中一个作为 Leader,其他作为 Follower。
常用配置
bootstrap.servers: Kafka broker 的地址列表,生产者和消费者通过这个地址连接到 Kafka 集群。
key.serializer / value.serializer: 生产者消息的键和值的序列化类。
key.deserializer / value.deserializer: 消费者消息的键和值的反序列化类。
acks: 生产者配置,用于指定 Kafka 对消息确认的级别(0, 1, all)。
enable.auto.commit: 消费者配置,是否自动提交偏移量。默认是 true。
auto.offset.reset: 消费者配置,当消费者组没有初始偏移量或偏移量不存在时,如何处理(earliest, latest, none)。
生产者1测试
public class TestProducer01 { public static void main(String[] args) throws Exception { Map<String, Object> configs = new HashMap<>(); configs.put("bootstrap.servers", "h121.wzk.icu:9092"); configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); configs.put("acks", "1"); KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs); ProducerRecord<Integer, String> record = new ProducerRecord<>( "wzk_topic_test", 0, 0, "hello world by java!" ); Future<RecordMetadata> future = producer.send(record); future.get(3_000, TimeUnit.SECONDS); producer.close(); } }
生产者1运行
运行结果如下图:
消费者01运行
public class TestConsumer01 { public static void main(String[] args) throws Exception { Map<String, Object> configs = new HashMap<>(); configs.put("bootstrap.servers", "h121.wzk.icu:9092"); configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); configs.put("group.id", "wzk-test"); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs); final List<String> topics = Arrays.asList("wzk_topic_test"); consumer.subscribe(topics, new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { collection.forEach(item -> { System.out.println("剥夺的分区: " + item.partition()); }); } @Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { collection.forEach(item -> { System.out.println("接收的分区: " + item.partition()); }); } }); final ConsumerRecords<Integer, String> records = consumer.poll(3_000); final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1"); topic1Iterable.forEach(record -> { System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray())); System.out.println("消息的key:" + record.key()); System.out.println("消息的偏移量:" + record.offset()); System.out.println("消息的分区号:" + record.partition()); System.out.println("消息的序列化key字节数:" + record.serializedKeySize()); System.out.println("消息的序列化value字节数:" + record.serializedValueSize()); System.out.println("消息的时间戳:" + record.timestamp()); System.out.println("消息的时间戳类型:" + record.timestampType()); System.out.println("消息的主题:" + record.topic()); System.out.println("消息的值:" + record.value()); }); consumer.close(); } }
消费者01测试
控制台运行截图如下: