就是我在使用flink 将数据写入kafka 的时候出现
Failed to send data to Kafka: Expiring 10 record(s) for data-CalcFlush-7:120000 ms has passed since batch creation
org.apache.kafka.common.errors.TimeoutException: Expiring 10 record(s) for data-CalcFlush1234-7:120000 ms has passed since batch creation
这个问题咋解决呀?
可以参考文章:
https://blog.csdn.net/qq_21383435/article/details/118499398
【Flink】Flink 写入 kafka 报错 Failed to send data to Kafka: Expiring 4 record(s) for 20001 ms has passed
这个问题可能是由于Flink的Kafka生产者在等待Kafka服务器响应时超时所引起的。下面是一些可能的解决方案:
这个错误意味着你的Flink作业在尝试将数据写入Kafka时遇到了超时错误。这可能是由于以下几种情况造成的:
针对以上可能的原因,你可以采取以下措施进行排查和优化:
这个错误信息表明,Flink无法在指定的时间内将数据发送到Kafka,可能是因为网络延迟、Kafka服务器响应慢或者Flink的任务管理器压力过大等原因。
以下是一些可能的解决方案:
增加发送数据的超时时间:你可以尝试增加Flink发送数据给Kafka的超时时间。在Flink的配置文件中,你可以设置stream.timeout
参数来控制这个超时时间。
优化网络环境:如果你的网络环境不稳定,可能会导致数据发送失败。你可以尝试优化你的网络环境,例如使用更稳定的网络连接,或者增加网络带宽。
增加Flink的任务管理器的资源:如果你的任务管理器的压力过大,可能会导致数据发送失败。你可以尝试增加Flink的任务管理器的资源,例如增加CPU和内存。
优化Kafka服务器:如果你的Kafka服务器的压力过大,也可能会导致数据发送失败。你可以尝试优化你的Kafka服务器,例如增加Kafka服务器的资源,或者优化Kafka的服务配置。
使用Flink的内置连接器:Flink提供了内置的Kafka连接器,相比于使用第三方的连接器,它可能会有更好的性能和稳定性。
批次中的记录在创建后120000毫秒(120秒)内没有发送完成,因此Kafka认为这些记录已经过期,并抛出了超时异常。
要解决这个问题,您可以尝试以下方法:
flink.kafka.producer.timeout.ms=30000
CopyCopy
DataStream source = ...;
source
.keyBy(x -> x)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new WindowFunction() {
@Override
public void apply(String key, TimeWindow window, Iterable input, Collector out) {
// 发送数据到Kafka
out.collect("key:" + key);
}
});
这个问题是由于Flink在将数据写入Kafka时,批量发送的数据超过了设定的超时时间(120秒),导致部分数据未能成功发送。为了解决这个问题,你可以尝试以下方法:
1、增加Kafka的超时时间。在Flink的配置文件中,找到flink-conf.yaml
文件,设置taskmanager.network.memory.min
和taskmanager.network.memory.max
参数,增加网络缓冲区的大小。例如,将它们设置为64MB和256MB:
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 256mb
然后重启Flink集群。
2、调整Flink的批处理大小。在Flink的配置文件中,找到flink-conf.yaml
文件,设置execution.batch.size
参数,减小批处理的大小。例如,将其设置为1000:
execution.batch.size: 1000
然后重启Flink集群。
3、如果问题仍然存在,可以考虑使用Flink的异步发送模式。在Flink的配置文件中,找到flink-conf.yaml
文件,设置sink.flush.max-rows
参数,增大异步发送的最大行数。例如,将其设置为10000:
sink.flush.max-rows: 10000
然后重启Flink集群。
CONNECTIONS_MAX_IDLE_MS_CONFIG 默认是9min,调小就可以了。此回答整理自钉群“【②群】Apache Flink China社区”
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。