开发者社区> 问答> 正文

求助flink Buffer pool is destroyed异常

hi, 大家好! 今天运行flink时抛出了Buffer pool is destroyed异常,数据源是kafka;消费前kafka队列中堆积了8G左右的数据。详细报错如下:

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at java.util.ArrayList.forEach(ArrayList.java:1257)

at ai.momenta.supernova.galactic.telemetrystreaming.flink.riskidentification.map.TelemetryEventMapper.lambda$map$ad330a96$1(TelemetryEventMapper.java:74)

at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)

at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)

at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)

at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)

at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:187)

at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:152)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)

at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

... 25 more

Caused by: java.lang.RuntimeException: Buffer pool is destroyed.

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at ai.momenta.supernova.galactic.telemetrystreaming.flink.riskidentification.watermark.TelemetryEventWatermarker$1.processElement(TelemetryEventWatermarker.java:30)

at ai.momenta.supernova.galactic.telemetrystreaming.flink.riskidentification.watermark.TelemetryEventWatermarker$1.processElement(TelemetryEventWatermarker.java:24)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

... 31 more

Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.

at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:244)

at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)

at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:236)

at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:229)

at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:149)

at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)

at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:101)

at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)

... 40 more

有没有大佬遇到过类似的问题,给点建议。 谢谢。

*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-07 14:08:38 1614 0
1 条回答
写回答
取消 提交回答
  • 一般情况下是内存太小了,导致的问题*来自志愿者整理的flink

    2021-12-07 15:23:57
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载