Flink获取消费kafka的时候始终获取不到topic列表是啥原因啊?Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to
at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:283)
at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
... 7 more
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [board_quote_topic].
at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)
at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:268)
at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
... 7 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
... 10 more
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics 报错信息
在使用 Apache Flink 消费 Kafka 时,遇到无法获取 topic 列表的问题,报错信息为 Failed to list subscribed topic partitions。根据提供的错误日志,问题的可能原因包括以下几种:
Kafka Broker 配置问题:
TimeoutException: Timed out waiting for a node assignment 表示在请求描述主题(describeTopics)时,等待 Kafka broker 分配节点超时。这可能是因为 Kafka broker 没有正确启动或者配置错误。
检查 Kafka broker 是否在运行并且可以正常连接。
确认 Kafka broker 的配置文件中 listeners 和 advertised.listeners 设置正确,确保客户端可以访问。
网络连接问题:
确保 Flink 集群与 Kafka 集群之间的网络连接正常,没有防火墙或网络策略阻止通信。
检查 Flink 和 Kafka 的网络配置,确保没有网络延迟或丢包。
Kafka 客户端配置问题:
检查 Flink 应用程序中 Kafka 客户端的配置,特别是 Kafka broker 列表 (bootstrap.servers) 是否正确。
增加超时时间配置,例如 request.timeout.ms 和 session.timeout.ms,以应对可能的网络延迟问题。
Kafka 主题问题:
确认主题 board_quote_topic 是否存在且可以从 Kafka broker 获取到。
使用 Kafka 命令行工具或 Kafka 管理界面检查主题的状态和分区信息。
权限问题:
确认 Flink 应用程序的 Kafka 客户端有权限访问 board_quote_topic 主题。
检查 Kafka 的 ACL 配置,确保 Flink 客户端具有 Describe 权限。
这个问题通常是由于Flink与Kafka之间的网络连接问题导致的。您需要检查以下几个方面:
确认Flink作业与Kafka集群是否在同一个VPC内。
检查Kafka的bootstrap.servers配置是否正确,确保Flink能够连接到Kafka的broker。
确认Kafka的白名单配置,如果使用的是云消息队列Kafka版,确保Flink的IP在白名单内。
检查网络是否有代理、端口转发等设置,确保这些设置允许Flink访问Kafka。
使用Zookeeper命令行工具获取Kafka broker元信息,并验证与Flink的连通性。
可参考网络连接排查。
问题似乎与Kafka broker的配置有关,而不是直接与FlinK SQL任务设置table.exec.state.ttl相关。不过,为了帮助你排查问题,我们可以一起检查一下Kafka broker的配置以及相关的网络和安全设置。
Kafka Broker配置检查
网络连接:
确认Flink应用所在的网络可以访问Kafka broker的地址和端口。
检查网络策略或防火墙设置,确保没有阻止Flink应用与Kafka broker之间的通信。
Kafka ACL (Access Control Lists):
确认Flink应用使用的消费者组或用户具有所需的权限。
可以使用Kafka的kafka-acls.sh脚本来查看和管理ACL。
可能是Kafka Broker配置问题
确认Kafka broker的配置允许来自Flink应用所在网络的连接,没有因网络策略或防火墙设置而被阻止。
检查Kafka是否设置了ACL(访问控制列表),确保Flink应用使用的消费者组或用户具有读取所需主题的权限。
还有检查Topic是否存在: 首先确认Kafka集群中确实存在您所订阅的主题board_quote_topic。可以通过Kafka的命令行工具或是Kafka管理界面来验证。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。