通过 KoP 将 Kafka 应用迁移到 Pulsar
版权声明:原文出自 https://github.com/streamnative/kop ,由 Redisant 进行整理和翻译
[TOC]
什么是 KoP
KoP(Pulsar on Kafka)通过在 Pulsar Broker 上引入 Kafka 协议处理程序,为 Apache Pulsar 带来原生 Apache Kafka 协议支持。 通过将 KoP 协议处理程序添加到您现有的 Pulsar 集群,您可以将现有的 Kafka 应用程序和服务迁移到 Pulsar,而无需修改代码。 这使 Kafka 应用程序能够利用 Pulsar 的强大功能,例如:
- 通过企业级多租户简化运营
- 使用rebalance-free架构简化操作
- 使用 Apache BookKeeper 分层存储
- 使用 Pulsar Functions 进行Serverless事件处理
KoP 作为 Pulsar 协议处理插件,在 Pulsar broker 启动时加载。 它通过在 Apache Pulsar 上提供原生 Kafka 协议支持,帮助减少人们采用 Pulsar 实现业务的障碍。
通过整合两个流行的事件流生态系统,KoP 解锁了新的用例。 您可以利用每个生态系统的优势,使用 Apache Pulsar 构建一个真正统一的事件流平台,以加速实时应用程序和服务的开发。
KoP 利用 Pulsar 已有的组件(例如主题发现、分布式日志库 - ManagedLedger、游标等)在 Pulsar 上实现了 Kafka wire 协议。
下图说明了 KoP 是如何在 Pulsar 中实现的:
安装 KoP
如果您有 Apache Pulsar 集群,则可以通过直接下载 KoP 协议处理程序并将其安装到 Pulsar Broker,在现有 Pulsar 集群上启用 Kafka-on-Pulsar。 它需要三个步骤:
- 下载 KoP 协议处理程序,然后将其复制到您的 Pulsar
protocols
目录。 - 在 Pulsar
broker.conf
或standalone.conf
文件中设置 KoP 协议处理程序的配置。 - 重启 Pulsar broker 以加载 KoP 协议处理程序。
然后你可以启动你的Broker并使用 KoP。 以下是每个步骤的详细说明。
下载 KoP 协议处理程序
您可以在这里直接下载 KoP 协议处理程序。
chen_ubuntu@LAPTOP-IH0640SI:~/start_pulsar/apache-pulsar-2.10.3/protocols$ ls
pulsar-protocol-handler-kafka-2.10.3.3.nar
配置 KoP
将 .nar
文件复制到 Pulsar protocols
目录后,您需要通过在 Pulsar 配置文件 broker.conf
或 standalone.conf
中添加配置来配置 Pulsar broker 以插件形式运行 KoP 协议处理程序。
在
broker.conf
或standalone.conf
文件中设置 KoP 协议处理程序的配置。messagingProtocols=kafka protocolHandlerDirectory=./protocols allowAutoTopicCreationType=partitioned narExtractionDirectory=./unpacked
属性名 默认值 建议值 messagingProtocols kafka protocolHandlerDirectory ./protocols Location of KoP NAR file allowAutoTopicCreationType non-partitioned partitioned narExtractionDirectory /tmp/pulsar-nar Location of unpacked KoP NAR file 默认情况下,
allowAutoTopicCreationType
设置为未分区。 由于主题在 Kafka 中默认是分区的,因此最好避免为 Kafka 客户端创建非分区主题,除非 Kafka 客户端需要与现有的非分区主题进行交互。默认情况下,
/tmp/pulsar-nar
目录位于/tmp
目录下。 如果我们将 KoP NAR 文件解包到/tmp
目录,一些类可能会被系统自动删除,这将产生一个ClassNotFoundException
或NoClassDefFoundError
错误。 因此,建议将narExtractionDirectory
选项设置为其他路径。设置 Kafka listeners
# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0 kafkaListeners=PLAINTEXT://127.0.0.1:9092 # This config is not required unless you want to expose another address to the Kafka client. # If it’s not configured, it will be the same with `kafkaListeners` config by default kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
kafkaListeners
是一个以逗号分隔的侦听器列表以及 Kafka 绑定以进行侦听的主机/IP 和端口。kafkaAdvertisedListeners
是一个以逗号分隔的侦听器列表及其主机/IP 和端口。如下设置偏移量管理,因为 KoP 的偏移量管理取决于 “Broker Entry Metadata”。 KoP 2.8.0 或更高版本需要它。
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
禁止删除非活动主题。 这不是必需的,但在 KoP 中非常重要。 目前,Pulsar 会删除分区主题的非活动分区,而不会删除分区主题的元数据。 在这种情况下,KoP 无法创建丢失的分区。
brokerDeleteInactiveTopicsEnabled=false
启动 Pulsar
./bin/pulsar-daemon standalone
测试 KoP
- 使用 Kafka Assistant 连接到 KoP
- 创建主题并发送一些消息
- 使用 Pulsar Assistant 连接到 Pulsar Broker 并接收消息