开发者社区> 问答> 正文

Flink:如何持久化并恢复ValueState

我使用Flink来丰富输入流

case class Input( key: String, message: String )
预先计算得分

case class Score( key: String, score: Int )
并产生一个输出

case class Output( key: String, message: String, score: Int )
输入和分数流都从Kafka主题中读取,结果输出流也发布到Kafka

val processed = inputStream.connect( scoreStream )

                       .flatMap( new ScoreEnrichmentFunction )
                       .addSink( producer )

使用以下ScoreEnrichmentFunction:

class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output]
{

val scoreStateDescriptor = new ValueStateDescriptor[Score]( "saved scores", classOf[Score] )
lazy val scoreState: ValueState[Score] = getRuntimeContext.getState( scoreStateDescriptor )

override def flatMap1( input: Input, out: Collector[Output] ): Unit = 
{
    Option( scoreState.value ) match {
        case None => out.collect( Output( input.key, input.message, -1 ) )
        case Some( score ) => out.collect( Output( input.key, input.message, score.score ) )  
    }
}

override def flatMap2( score: Score, out: Collector[Output] ): Unit = 
{
    scoreState.update( score )
} 

}
这很好用。但是,如果我采取安全点并取消Flink作业,则当我从保存点恢复作业时,存储在ValueState中的分数将丢失。

据我了解,似乎需要使用CheckPointedFunction扩展ScoreEnrichmentFunction

class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output] with CheckpointedFunction
但我很难理解如何实现snapshotState和initializeState方法来处理键控状态

override def snapshotState( context: FunctionSnapshotContext ): Unit = ???

override def initializeState( context: FunctionInitializationContext ): Unit = ???
请注意,我使用以下env:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism( 2 )
env.setBufferTimeout( 1 )
env.enableCheckpointing( 1000 )
env.getCheckpointConfig.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION )
env.getCheckpointConfig.setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE )
env.getCheckpointConfig.setMinPauseBetweenCheckpoints( 500 )
env.getCheckpointConfig.setCheckpointTimeout( 60000 )
env.getCheckpointConfig.setFailOnCheckpointingErrors( false )
env.getCheckpointConfig.setMaxConcurrentCheckpoints( 1 )

scala apache-flink 保存点

展开
收起
flink小助手 2018-12-10 10:31:56 6185 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    为检查点和保存点使用单独的目录,这导致保存点目录和FsStateBackend目录不同。

    使用相同的目录

    val backend = new FsStateBackend( "file:/data", true )
    env.setStateBackend( backend )
    并在采取保存点时

    bin/flink cancel d75f4712346cadb4df90ec06ef257636 -s file:/data
    解决了这个问题。

    2019-07-17 23:19:04
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载