我使用Flink来丰富输入流
case class Input( key: String, message: String )
预先计算得分
case class Score( key: String, score: Int )
并产生一个输出
case class Output( key: String, message: String, score: Int )
输入和分数流都从Kafka主题中读取,结果输出流也发布到Kafka
val processed = inputStream.connect( scoreStream )
.flatMap( new ScoreEnrichmentFunction )
.addSink( producer )
使用以下ScoreEnrichmentFunction:
class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output]
{
val scoreStateDescriptor = new ValueStateDescriptor[Score]( "saved scores", classOf[Score] )
lazy val scoreState: ValueState[Score] = getRuntimeContext.getState( scoreStateDescriptor )
override def flatMap1( input: Input, out: Collector[Output] ): Unit =
{
Option( scoreState.value ) match {
case None => out.collect( Output( input.key, input.message, -1 ) )
case Some( score ) => out.collect( Output( input.key, input.message, score.score ) )
}
}
override def flatMap2( score: Score, out: Collector[Output] ): Unit =
{
scoreState.update( score )
}
}
这很好用。但是,如果我采取安全点并取消Flink作业,则当我从保存点恢复作业时,存储在ValueState中的分数将丢失。
据我了解,似乎需要使用CheckPointedFunction扩展ScoreEnrichmentFunction
class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output] with CheckpointedFunction
但我很难理解如何实现snapshotState和initializeState方法来处理键控状态
override def snapshotState( context: FunctionSnapshotContext ): Unit = ???
override def initializeState( context: FunctionInitializationContext ): Unit = ???
请注意,我使用以下env:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism( 2 )
env.setBufferTimeout( 1 )
env.enableCheckpointing( 1000 )
env.getCheckpointConfig.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION )
env.getCheckpointConfig.setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE )
env.getCheckpointConfig.setMinPauseBetweenCheckpoints( 500 )
env.getCheckpointConfig.setCheckpointTimeout( 60000 )
env.getCheckpointConfig.setFailOnCheckpointingErrors( false )
env.getCheckpointConfig.setMaxConcurrentCheckpoints( 1 )
scala apache-flink 保存点
为检查点和保存点使用单独的目录,这导致保存点目录和FsStateBackend目录不同。
使用相同的目录
val backend = new FsStateBackend( "file:/data", true )
env.setStateBackend( backend )
并在采取保存点时
bin/flink cancel d75f4712346cadb4df90ec06ef257636 -s file:/data
解决了这个问题。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。