kafka常用指令总结

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_26654727/article/details/82752988 文章目录kafka指令1.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_26654727/article/details/82752988

文章目录

kafka指令

1.常用指令

1.1 Adding and removing topics

add:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
       --partitions 20 --replication-factor 3 --config x=y

remove:

1.2 Modifying topics

add partition:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name
       --partitions 40

add configes:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y

To remove a config:

> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x

delete config (不使用):

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

1.3 balancing leadership

 bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot

前提是需要加上配置

 auto.leader.rebalance.enable=true

1.4 checking consumer position

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
Group           Topic                          Pid Offset          logSize         Lag             Owner
my-group        my-topic                       0   0               0               0               test_jkreps-mn-1394154511599-60744496-0
my-group        my-topic                       1   0               0               0               test_jkreps-mn-1394154521217-1a0be913-0

1.5 Expanding your cluster(水平扩展集群)

  • –generate: 在此模式下,给定主题列表和代理列表,该工具会生成候选重新分配,以将指定主题的所有分区移动到新代理。此选项仅提供了一种方便的方法,可在给定主题和目标代理列表的情况下生成分区重新分配计划。
  • –execute:在此模式下,该工具将根据用户提供的重新分配计划启动分区的重新分配。 (使用–reassignment-json-file选项)。这可以是由管理员手工制作的自定义重新分配计划,也可以使用–generate选项提供
  • –verify: 在此模式下,该工具将验证最后一次–execute期间列出的所有分区的重新分配状态。状态可以是成功完成,失败或正在进行中

1.由于该工具接受主题的输入列表作为json文件,因此首先需要确定要移动的主题并创建json文件,如下所示 -

> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
            {"topic": "foo2"}],
 "version":1
}

2.一旦json文件准备就绪,使用分区重新分配工具生成候选分配 -

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Proposed partition reassignment configuration

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}

3.该工具生成一个候选分配,将所有分区从主题foo1,foo2移动到代理5,6。 但请注意,此时分区移动尚未开始,它只是告诉您当前的分配和建议的新分配。 应保存当前分配,以防您想要回滚它。 新的分配应保存在json文件中(例如expand-cluster-reassignment.json),以便使用–execute选项输入工具,如下所示 -

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}

4.检查状态

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully

1.6 自定义分区分配和迁移

1.创建分区文件 custom-reassignment.json

cat custom-reassignment.json
{
	"version": 1,
	"partitions": [{
		"topic": "foo1",
		"partition": 0,
		"replicas": [5, 6]
	}, {
		"topic": "foo2",
		"partition": 1,
		"replicas": [2, 3]
	}]
}

2.使用json文件执行分区操作

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
               {"topic":"foo2","partition":1,"replicas":[3,4]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

3.确认状态

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
               {"topic":"foo2","partition":1,"replicas":[3,4]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

1.7 Increasing replication factor(新增副本数)

逻辑和上两个一致 通过json文件 手动修改副本数据位置

提供json文件:

> cat increase-replication-factor.json
{
	"version": 1,
	"partitions": [{
		"topic": "foo",
		"partition": 0,
		"replicas": [5, 6, 7]
	}]
}

1.8 启动 消费 等指令

1.启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
2.启动kafka

bin/kafka-server-start.sh config/server.properties &

3.停止kafka

bin/kafka-server-stop.sh

4.停止zookeeper

bin/zookeeper-server-stop.sh

5.创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

6.展示topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

7.描述topic

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

8.生产者:

bin/kafka-console-producer.sh --broker-list 130.51.23.95:9092 --topic my-replicated-topic

9.消费者:

bin/kafka-console-consumer.sh --zookeeper 130.51.23.95:2181 --topic test --from-beginnin

目录
相关文章
|
8月前
|
消息中间件 Kafka
Kafka 指令
指令持续更新
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
144 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
68 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
400 9
|
5月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
90 3
|
5月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
213 0
|
5月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
消息中间件 存储 Kafka
kafka 在 zookeeper 中保存的数据内容
kafka 在 zookeeper 中保存的数据内容
65 3
|
5月前
|
消息中间件 缓存 Kafka
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?

热门文章

最新文章