public class VideoOrderSourceV2 extends RichParallelSourceFunction<VideoOrder> { private volatile Boolean flag = true; private Random random = new Random(); private static List<VideoOrder> list = new ArrayList<>(); static { list.add(new VideoOrder("","java",10,0,null)); list.add(new VideoOrder("","spring boot",15,0,null)); } /** * run 方法调用前 用于初始化连接 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { System.out.println("-----open-----"); } /** * 用于清理之前 * @throws Exception */ @Override public void close() throws Exception { System.out.println("-----close-----"); } /** * 产生数据的逻辑 * @param ctx * @throws Exception */ @Override public void run(SourceContext<VideoOrder> ctx) throws Exception { while (flag){ Thread.sleep(1000); String id = UUID.randomUUID().toString().substring(30); int userId = random.nextInt(10); int videoNum = random.nextInt(list.size()); VideoOrder videoOrder = list.get(videoNum); videoOrder.setUserId(userId); videoOrder.setCreateTime(new Date()); videoOrder.setTradeNo(id); System.out.println("产生:"+videoOrder.getTitle()+",价格:"+videoOrder.getMoney()+", 时间:"+ TimeUtil.format(videoOrder.getCreateTime())); ctx.collect(videoOrder); } } /** * 控制任务取消 */ @Override public void cancel() { flag = false; } }
public class FlinkKeyByReduceApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(5000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //这是我本机的ip地址 env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://192.168.192.100:8020/checkpoint")); DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder value) throws Exception { return value.getTitle(); } }); SingleOutputStreamOperator<VideoOrder> reduce = videoOrderStringKeyedStream.reduce(new ReduceFunction<VideoOrder>() { @Override public VideoOrder reduce(VideoOrder value1, VideoOrder value2) throws Exception { VideoOrder videoOrder = new VideoOrder(); videoOrder.setTitle(value1.getTitle()); videoOrder.setMoney(value1.getMoney() + value2.getMoney()); return videoOrder; } }); reduce.print(); env.execute("job"); } }
在本地测试运行结果,可以看到数据根据订单分组不断的进行滚动计算进入服务器的HDFS查看检查点数据是否存在之后将应用进行打包,上传到服务器进行测试,可以使用Flink的Web页面进行手动提交jar包运行,也可以使用命令进行提交,之后可以看到程序运行过程中的相关日志输出
-s : 指定检查点的元数据的位置,这个位置记录着宕机前程序的计算状态 ./bin/flink run -s /checkpoint/id号/chk-23/_metadata -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /root/xdclass-flink.jar
可以看到出现一次close的时候,代表我们的程序以及停止,服务器已经宕机,这个时候订单的计算结果如上图的红色方框。在我们运行了上面那条命令后再次查看日志的数据,从open开始可以看到这次就不是从订单最初的状态开始进行的了,而是从上一次宕机前计算的结果,继续往下计算,到这里Checkponit的实战应用测试就完成了。