【大数据计算引擎】流式计算引擎Flink3

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 【大数据计算引擎】流式计算引擎Flink

8.Flink增量聚合和全窗口函数

8.1.AggregateFunction增量聚合函数

  • 增量聚合函数
aggregate(agg函数,WindowFunction(){  })
  • 窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中
  • 常见的增量聚合函数有 reduceFunction、aggregateFunction
  • min、max、sum 都是简单的聚合操作,不需要自定义规则
AggregateFunction<IN, ACC, OUT>
IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
  • 滚动窗口聚合案例
/**
 * @author lixiang
 * Tumbling-Window滚动窗口
 */
public class FlinkWindow1Demo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());
        WindowedStream<VideoOrderDO, String, TimeWindow> stream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        stream.aggregate(new AggregateFunction<VideoOrderDO, Map<String, Object>, Map<String, Object>>() {
            //初始化累加器
            @Override
            public Map<String, Object> createAccumulator() {
                return new HashMap<>();
            }
            //聚合方式
            @Override
            public Map<String, Object> add(VideoOrderDO value, Map<String, Object> accumulator) {
                if (accumulator.size() == 0) {
                    accumulator.put("title", value.getTitle());
                    accumulator.put("money", value.getMoney());
                    accumulator.put("num", 1);
                    accumulator.put("createTime", value.getCreateTime());
                } else {
                    accumulator.put("title", value.getTitle());
                    accumulator.put("money", value.getMoney() + Integer.parseInt(accumulator.get("money").toString()));
                    accumulator.put("num", 1 + Integer.parseInt(accumulator.get("num").toString()));
                    accumulator.put("createTime", value.getCreateTime());
                }
                return accumulator;
            }
            //返回结果
            @Override
            public Map<String, Object> getResult(Map<String, Object> accumulator) {
                return accumulator;
            }
            //合并内容
            @Override
            public Map<String, Object> merge(Map<String, Object> a, Map<String, Object> b) {
                return null;
            }
        }).print();
        env.execute("Tumbling Window job");
    }
}

d47e2532fdeb4d189ce95a5ffa663c8a.jpg

8.2.WindowFunction全窗口函数

  • 全窗口函数
apply(new WindowFunction(){ })
  • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
  • 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 
WindowFunction<IN, OUT, KEY, W extends Window>
  • 案例实战
/**
 * @author lixiang
 * apply
 */
public class FlinkWindow2Demo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());
        WindowedStream<VideoOrderDO, String, TimeWindow> stream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        stream.apply(new WindowFunction<VideoOrderDO, Map<String,Object>, String, TimeWindow>() {
            @Override
            public void apply(String key, TimeWindow timeWindow, Iterable<VideoOrderDO> iterable, Collector<Map<String, Object>> collector) throws Exception {
                List<VideoOrderDO> list = IterableUtils.toStream(iterable).collect(Collectors.toList());
                long sum = list.stream().collect(Collectors.summarizingInt(VideoOrderDO::getMoney)).getSum();
                Map<String,Object> map = new HashMap<>();
                map.put("sumMoney",sum);
                map.put("title",key);
                collector.collect(map);
            }
        }).print();
        env.execute("apply Window job");
    }
}


958b3dee8c2e43cfa0de360a9bc01849.jpg

8.3.processWindowFunction全窗口函数

  • 全窗口函数
process(new ProcessWindowFunction(){})
  • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
  • 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 
ProcessWindowFunction<IN, OUT, KEY, W extends Window>
  • 案例实战
/**
 * @author lixiang
 * process-Window滚动窗口
 */
public class FlinkWindow3Demo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());
        WindowedStream<VideoOrderDO, String, TimeWindow> stream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        stream.process(new ProcessWindowFunction<VideoOrderDO, Map<String,Object>, String, TimeWindow>() {
            @Override
            public void process(String key, ProcessWindowFunction<VideoOrderDO, Map<String, Object>, String, TimeWindow>.Context context, Iterable<VideoOrderDO> iterable, Collector<Map<String, Object>> collector) throws Exception {
                List<VideoOrderDO> list = IterableUtils.toStream(iterable).collect(Collectors.toList());
                long sum = list.stream().collect(Collectors.summarizingInt(VideoOrderDO::getMoney)).getSum();
                Map<String,Object> map = new HashMap<>();
                map.put("sumMoney",sum);
                map.put("title",key);
                collector.collect(map);
            }
        }).print();
        env.execute("process Window job");
    }
}


d4d4294eb1f146c983f7e8735a3ea666.jpg

窗口函数对比

  • 增量聚合
aggregate(new AggregateFunction(){});
  • 全窗口聚合
apply(new WindowFunction(){})
process(new ProcessWindowFunction(){}) //比WindowFunction功能强大

9.迟到无序数据处理watermark

9.1.Watermark简介和应用

(1)基本概念

  • Window:Window是处理无界流的关键,Windows将流拆分为一个个有限大小的buckets,可以可以在每一个buckets中进行计算
  • start_time,end_time:当Window时时间窗口的时候,每个window都会有一个开始时间和结束时间(前开后闭),这个时间是系统时间
  • event-time: 事件发生时间,是事件发生所在设备的当地时间,
  • 比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间
  • Watermarks:可以把他理解为一个水位线,等于evevtTime - delay(比如规定为20分钟),一旦Watermarks大于了某个window的end_time,

就会触发此window的计算,Watermarks就是用来触发1window计算的。

  • Watermaker = 当前计算窗口最大的事件时间 - 允许乱序延迟的时间

推迟窗口触发的时间,实现方式:通过当前窗口中最大的eventTime-延迟时间所得到的Watermark与窗口原始触发时间进行对比,当Watermark大于窗口原始触发时间时则触发窗口执行!!!我们知道,流处理从事件产生,

到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,

就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。

c62015c208e843c7b16a70925a15fee9.jpg

那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。

(2)Watermark水位线介绍

  • 由flink的某个operator操作生成后,就在整个程序中随event数据流转
  • With Periodic Watermarks(周期生成,可以定义一个最大允许乱序的时间,用的很多)
  • With Punctuated Watermarks(标点水位线,根据数据流中某些特殊标记事件来生成,相对少)
  • 衡量数据是否乱序的时间,什么时候不用等早之前的数据
  • 是一个全局时间戳,不是某一个key下的值
  • 是一个特殊字段,单调递增的方式,主要是和数据本身的时间戳做比较
  • 用来确定什么时候不再等待更早的数据了,可以触发窗口进行计算,忍耐是有限度的,给迟到的数据一些机会
  • 注意
  • Watermark 设置太小会影响数据准确性,设置太大会影响数据的实时性,更加会加重Flink作业的负担
  • 需要经过测试,和业务相关联,得出一个较合适的值即可
  • 触发计算后,其他窗口内数据再到达也被丢弃

9.2.Watermark案例实战

  • 概念很抽象,下面用一个案例给解释下watermark的作用。
  • 需求:每10s分组统计不同视频的成交总价,数据有乱序延迟,允许5秒的时间。

(1)时间工具类

public class TimeUtil {
    /**
     * 时间处理
     * @param date
     * @return
     */
    public static String toDate(Date date){
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        ZoneId zoneId = ZoneId.systemDefault();
        return formatter.format(date.toInstant().atZone(zoneId));
    }
    /**
     * 字符串转日期类型
     * @param time
     * @return
     */
    public static Date strToDate(String time){
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        LocalDateTime dateTime = LocalDateTime.parse(time, formatter);
        return Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant());
    }
    /**
     * 时间处理
     * @param date
     * @return
     */
    public static String format(long date){
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        ZoneId zoneId = ZoneId.systemDefault();
        return formatter.format(new Date(date).toInstant().atZone(zoneId));
    }
}

(2)Flink入口函数

/**
 * @author lixiang
 */
public class FlinkWaterDemo {
    public static void main(String[] args) throws Exception {
        //初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //监听socket输入
        DataStreamSource<String> source = env.socketTextStream("192.168.139.20", 8888);
        //一对多转换,将输入的字符串转成Tuple类型
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMap = source.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] split = value.split(",");
                out.collect(new Tuple3<String,String,Integer>(split[0],split[1],Integer.parseInt(split[2])));
            }
        });
        //设置watermark,官方文档直接拿来的,注意修改自己的时间参数
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((element, recordTimestamp) -> {
                    return TimeUtil.strToDate(element.f1).getTime();
                }));
        //根据标题进行分组
        watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
            //滚动窗口,10s一统计,全窗口函数
            @Override
            public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> iterable, Collector<String> collector) throws Exception {
                List<String> eventTimeList = new ArrayList<>();
                int total = 0;
                for (Tuple3<String, String, Integer> order : iterable) {
                    eventTimeList.add(order.f1);
                    total = total + order.f2;
                }
                String outStr = "分组key:"+key+",总价:"+total+",窗口开始时间:"+TimeUtil.format(timeWindow.getStart())+",窗口结束时间:"+TimeUtil.format(timeWindow.getEnd())+",窗口所有事件时间:"+eventTimeList;
                collector.collect(outStr);
            }
        }).print();
        env.execute("watermark job");
    }
}

(3)测试数据,nc -lk 8888监听8888端口,一条一条的输入

[root@flink ~]# nc -lk 8888
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
mysql,2022-11-11 23:12:13,20
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:17,10
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:20,10
java,2022-11-11 23:12:22,10
java,2022-11-11 23:12:25,10


0531d8e63a2d4cbea40fcc26c96571c3.jpg


d2008403aace4ae1b7309510dc8ae276.jpg

9.3.二次兜底延迟数据处理

  • 超过了watermark的等待后,还有延迟数据到达怎么办?
  • 上一个案例我们发现,第七个窗口本是[0-10s)窗口的数据,
  • 但是[0-10s)窗口的数据已经被统计了,所以数据丢失了,这就需要allowedLateness 来做二次兜底延迟数据处理。
  • 编码很简单,只需要在开窗函数那设置allowedLateness即可
  • 60b28a975ea34c3d9658ccd2957dcf04.jpg
/**
 * @author lixiang
 */
public class FlinkWaterDemo {
    public static void main(String[] args) throws Exception {
        //初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //监听socket输入
        DataStreamSource<String> source = env.socketTextStream("192.168.139.20", 8888);
        //一对多转换,将输入的字符串转成Tuple类型
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMap = source.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] split = value.split(",");
                out.collect(new Tuple3<String,String,Integer>(split[0],split[1],Integer.parseInt(split[2])));
            }
        });
        //设置watermark,官方文档直接拿来的,注意修改自己的时间参数
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((element, recordTimestamp) -> {
                    return TimeUtil.strToDate(element.f1).getTime();
                }));
        //根据标题进行分组
        watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(10)))
                //1min的容忍时间,即使时间段窗口被统计了,只要数据没有超过1min就可以再次被统计进去
                .allowedLateness(Time.minutes(1))
                .apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
            //滚动窗口,10s一统计,全窗口函数
            @Override
            public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> iterable, Collector<String> collector) throws Exception {
                List<String> eventTimeList = new ArrayList<>();
                int total = 0;
                for (Tuple3<String, String, Integer> order : iterable) {
                    eventTimeList.add(order.f1);
                    total = total + order.f2;
                }
                String outStr = "分组key:"+key+",总价:"+total+",窗口开始时间:"+TimeUtil.format(timeWindow.getStart())+",窗口结束时间:"+TimeUtil.format(timeWindow.getEnd())+",窗口所有事件时间:"+eventTimeList;
                collector.collect(outStr);
            }
        }).print();
        env.execute("watermark job");
    }
}
  • 测试

07d07273944c428f9f2d52ae901750b6.jpg

9.4.最后的兜底延迟数据处理

  • watermark先输出,然后配置allowedLateness 再延长时间,然后到了后更新之前的窗口数据。
  • 数据超过了allowedLateness 后,怎么办?这会就用侧输出流 SideOutput,最终的一个兜底。
  • 侧输出流不会在统计到之前的窗口上,类似于独立存储起来,就是说没有被统计的数据会被单独存放在一个容器中,自定义的去进行最终一致性的操作。我们的数据输出会存放到redis或者mysql,侧输出的那一部分也存放在redis或者mysql,这样等统计之后,我们可以手动的讲结果进行整合。
  • 编码实战
  • 3a6e3c5bbf9642bdbd025444ecc6bb6a.jpg
/**
 * @author lixiang
 */
public class FlinkWaterDemo {
    public static void main(String[] args) throws Exception {
        //初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //监听socket输入
        DataStreamSource<String> source = env.socketTextStream("192.168.139.20", 8888);
        //一对多转换,将输入的字符串转成Tuple类型
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMap = source.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] split = value.split(",");
                out.collect(new Tuple3<String,String,Integer>(split[0],split[1],Integer.parseInt(split[2])));
            }
        });
        //设置watermark,官方文档直接拿来的,注意修改自己的时间参数
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((element, recordTimestamp) -> {
                    return TimeUtil.strToDate(element.f1).getTime();
                }));
        //new 一个OutputTag Bean
        OutputTag<Tuple3<String,String,Integer>> lateData = new OutputTag<Tuple3<String,String,Integer>>("lateData"){};
        //根据标题进行分组
        SingleOutputStreamOperator<String> operator = watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                        return value.f0;
                    }
                }).window(TumblingEventTimeWindows.of(Time.seconds(10)))
                //1min的容忍时间,即使时间段窗口被统计了,只要数据没有超过1min就可以再次被统计进去
                .allowedLateness(Time.minutes(1))
                //侧输入,最后的兜底数据
                .sideOutputLateData(lateData)
                .apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
                    //滚动窗口,10s一统计,全窗口函数
                    @Override
                    public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> iterable, Collector<String> collector) throws Exception {
                        List<String> eventTimeList = new ArrayList<>();
                        int total = 0;
                        for (Tuple3<String, String, Integer> order : iterable) {
                            eventTimeList.add(order.f1);
                            total = total + order.f2;
                        }
                        String outStr = "分组key:" + key + ",总价:" + total + ",窗口开始时间:" + TimeUtil.format(timeWindow.getStart()) + ",窗口结束时间:" + TimeUtil.format(timeWindow.getEnd()) + ",窗口所有事件时间:" + eventTimeList;
                        collector.collect(outStr);
                    }
                });
        operator.print();
        //侧输出流数据
        operator.getSideOutput(lateData).print();
        env.execute("watermark job");
    }
}
  • 测试
[root@flink ~]# nc -lk 8888
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
mysql,2022-11-11 23:12:13,20
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:17,10
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:20,10
java,2022-11-11 23:14:22,10 
java,2022-11-11 23:12:25,10 #设置一个超过1分钟的数据,测试

8b23cbd2b4734c7ab2b07f249152bf89.jpg

9.5.Flink多层保证措施归纳

(1)如何保证在需要的窗口内获取指定的数据?数据有乱序延迟

  • flink采用watermark、allowedLateness()、sideOutputLateData()三个机制来保证获取数据。
  • watermark的作用是防止出现延迟乱序,允许等待一会在触发窗口计算。
  • allowLateness,是将窗口关闭时间在延迟一段时间,允许有一个最大迟到时间,allowLateness的数据会重新触发窗口计算
  • sideOutPut是最后的兜底操作,超过allowLateness后,窗口已经彻底关闭,就会把数据放到侧输出流,侧输出流OutputTag tag = new OutputTag(){},由于泛型擦除的问题,需要重写方法,加花括号。

(2)应用场景:实时监控平台

  • 可以用watermark及时输出数据
  • allowLateness 做短期的更新迟到数据
  • sideOutPut做兜底更新保证数据准确性

(3)总结Flink的机制

  • 第一层 窗口window 的作用是从DataStream数据流里指定范围获取数据。
  • 第二层 watermark的作用是防止数据出现乱序延迟,允许窗口等待延迟数据达到,再触发计算
  • 第三层 allowLateness 会让窗口关闭时间再延迟一段时间, 如果还有数据达到,会局部修复数据并主动更新窗口的数据输出
  • 第四层 sideOutPut侧输出流是最后兜底操作,在窗口已经彻底关闭后,所有过期延迟数据放到侧输出流,可以单独获取,存储到某个地方再批量更新之前的聚合的数据
  • 注意
  • Flink 默认的处理方式直接丢弃迟到的数据
  • sideOutPut还可以进行分流功能
  • DataStream没有getSideOutput方法,SingleOutputStreamOperator才有

(4)版本弃用API

新接口,`WatermarkStrategy`,`TimestampAssigner` 和 `WatermarkGenerator` 因为其对时间戳和 watermark 等重点的抽象和分离很清晰,并且还统一了周期性和标记形式的 watermark 生成方式
新接口之前是用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks ,现在可以弃用了

10.Flink状态State管理和Checkpoint

10.1.Flink的状态State管理简介

(1)什么是State状态

  • 数据流处理离不开状态管理,比如窗口聚合统计、去重、排序等
  • 是一个Operator的运行的状态\历史值,是维护在内存中
  • 流程:一个算子的子任务接收输入流,获取对应的状态,计算新的结果然后把结果更新到状态里面

(2)有状态和无状态介绍

  • 无状态计算: 同个数据进到算子里面多少次,都是一样的输出,比如 filter
  • 有状态计算:需要考虑历史状态,同个输入会有不同的输出,比如sum、reduce聚合操作

(3)状态管理分类

  • ManagedState
  • Flink管理,自动存储恢复
  • 细分两类
  • Keyed State 键控状态(用的多)
  • 有KeyBy才用这个,仅限用在KeyStream中,每个key都有state ,是基于KeyedStream上的状态
  • 一般是用richFlatFunction,或者其他richfunction里面,在open()声明周期里面进行初始化
  • ValueState、ListState、MapState等数据结构
  • Operator State 算子状态(用的少,部分source会用)
  • ListState、UnionListState、BroadcastState等数据结构
  • RawState
  • 用户自己管理和维护
  • 存储结构:二进制数组
  • State数据结构(状态值可能存在内存、磁盘、DB或者其他分布式存储中)
  • ValueState 简单的存储一个值(ThreadLocal / String)
  • ValueState.value()
  • ValueState.update(T value)
  • ListState 列表
  • ListState.add(T value)
  • ListState.get() //得到一个Iterator
  • MapState 映射类型
  • MapState.get(key)
  • MapState.put(key, value)

10.2.Flink状态State后端存储讲解

从Flink 1.13开始,社区重新设计了其公共状态后端类,以帮助用户更好地理解本地状态存储和检查点存储的分离 用户可以迁移现有应用程序以使用新 API,⽽不会丢失任何状态或⼀致性。

(1)Flink内置了以下这些开箱即用的state backends :

  • (新版)HashMapStateBackend、EmbeddedRocksDBStateBackend
  • 如果没有其他配置,系统将使用 HashMapStateBackend。
  • (旧版)MemoryStateBackend、FsStateBackend、RocksDBStateBackend
  • 如果不设置,默认使用 MemoryStateBackend

(2)State状态详解

HashMapStateBackend 保存数据在内部作为Java堆的对象。

  • 键/值状态和窗口操作符持有哈希表,用于存储值、触发器等
  • 非常快,因为每个状态访问和更新都对 Java 堆上的对象进行操作
  • 但是状态大小受集群内可用内存的限制
  • 场景:
  • 具有大状态、长窗口、大键/值状态的作业。
  • 所有高可用性设置。

EmbeddedRocksDBStateBackend 在RocksDB数据库中保存状态数据

  • 该数据库(默认)存储在 TaskManager 本地数据目录中
  • 与HashMapStateBackend在java存储 对象不同,数据存储为序列化的字节数组
  • RocksDB可以根据可用磁盘空间进行扩展,并且是唯一支持增量快照的状态后端。
  • 但是每个状态访问和更新都需要(反)序列化并可能从磁盘读取,这导致平均性能比内存状态后端慢一个数量级
  • 场景
  • 具有非常大状态、长窗口、大键/值状态的作业。
  • 所有高可用性设置
  • 旧版的状态管理
MemoryStateBackend(内存,不推荐在生产场景使用)
FsStateBackend(文件系统上,本地文件系统、HDFS, 性能更好,常用)
RocksDBStateBackend (无需担心 OOM 风险,是大部分时候的选择)

(3)配置方式

方式一:可以flink-conf.yaml使用配置键在中配置默认状态后端state.backend。

配置条目的可能值是hashmap (HashMapStateBackend)、rocksdb (EmbeddedRocksDBStateBackend) 
或实现状态后端工厂StateBackendFactory的类的完全限定类名
#全局配置例子一
# The backend that will be used to store operator state checkpoints
state.backend: hashmap
# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager
#全局配置例子二
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

方式二:代码 单独job配置例子

//代码配置一
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
//代码配置二
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
//或者
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
  • 备注:使用 RocksDBStateBackend 需要加依赖
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
            <version>1.13.1</version>
</dependency>

10.3.Flink的状态State管理实战

  • sum()、maxBy() 等函数底层源码也是有ValueState进行状态存储
  • 需求:
  • 根据订单进行分组,统计找出每个商品最大的订单成交额
  • 不用maxBy实现,用ValueState实现
  • 编码实战
/**
 * 使用valueState实现maxBy功能,统计分组内订单金额最高的订单
 * @author lixiang
 */
public class FlinkStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> ds = env.socketTextStream("192.168.139.20", 8888);
        DataStream<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new RichFlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] arr = value.split(",");
                out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
            }
        });
        //一定要key by后才可以使用键控状态ValueState
        SingleOutputStreamOperator<Tuple2<String, Integer>> maxVideoOrder = flatMapDS.keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        }).map(new RichMapFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>>() {
            private ValueState<Integer> valueState = null;
            @Override
            public void open(Configuration parameters) throws Exception {
                valueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("total", Integer.class));
            }
            @Override
            public Tuple2<String, Integer> map(Tuple3<String, String, Integer> tuple3) throws Exception {
                // 取出State中的最大值
                Integer stateMaxValue = valueState.value();
                Integer currentValue = tuple3.f2;
                if (stateMaxValue == null || currentValue > stateMaxValue) {
                    //更新状态,把当前的作为新的最大值存到状态中
                    valueState.update(currentValue);
                    return Tuple2.of(tuple3.f0, currentValue);
                } else {
                    //历史值更大
                    return Tuple2.of(tuple3.f0, stateMaxValue);
                }
            }
        });
        maxVideoOrder.print();
        env.execute("valueState job");
    }
}
  • 测试
[root@flink ~]# nc -lk 8888
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,30
mysql,2022-11-11 23:12:13,20
java,2022-11-11 23:12:13,10


1cc87424018c43fca76b3b75fed7f414.jpg

10.4.Flink的Checkpoint-SavePoint

  • 什么是Checkpoint 检查点
  • Flink中所有的Operator的当前State的全局快照
  • 默认情况下 checkpoint 是禁用的
  • Checkpoint是把State数据定时持久化存储,防止丢失
  • 手工调用checkpoint,叫 savepoint,主要是用于flink集群维护升级等
  • 底层使用了Chandy-Lamport 分布式快照算法,保证数据在分布式环境下的一致性
  • 开箱即用,Flink 捆绑了这些检查点存储类型:
  • 作业管理器检查点存储 JobManagerCheckpointStorage
  • 文件系统检查点存储 FileSystemCheckpointStorage
  • 配置
//全局配置checkpoints
state.checkpoints.dir: hdfs:///checkpoints/
//作业单独配置checkpoints
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");
//全局配置savepoint
state.savepoints.dir: hdfs:///flink/savepoints

Savepoint 与 Checkpoint 的不同之处

  • 类似于传统数据库中的备份与恢复日志之间的差异
  • Checkpoint 的主要目的是为意外失败的作业提供【重启恢复机制】,
  • Checkpoint 的生命周期由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交互
  • Savepoint 由用户创建,拥有和删除, 主要是【升级 Flink 版本】,调整用户逻辑
  • 除去概念上的差异,Checkpoint 和 Savepoint 的当前实现基本上使用相同的代码并生成相同的格式
  • 端到端(end-to-end)状态一致性
数据一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的
在真实应用中,了流处理器以外还包含了数据源(例如Kafka、Mysql)和输出到持久化系统(Kafka、Mysql、Hbase、CK)
端到端的一致性保证,是意味着结果的正确性贯穿了整个流处理应用的各个环节,每一个组件都要保证自己的一致性。
  • Source
  • 需要外部数据源可以重置读取位置,当发生故障的时候重置偏移量到故障之前的位置
  • 内部
  • 依赖Checkpoints机制,在发生故障的时可以恢复各个环节的数据
  • Sink:
  • 当故障恢复时,数据不会重复写入外部系统,常见的就是 幂等和事务写入(和checkpoint配合)

10.5.Flink的Checkpoint代码配置

//两个检查点之间间隔时间,默认是0,单位毫秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//Checkpoint过程中出现错误,是否让整体任务都失败,默认值为0,表示不容忍任何Checkpoint失败
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
//Checkpoint是进行失败恢复,当一个 Flink 应用程序失败终止、人为取消等时,它的 Checkpoint 就会被清除
//可以配置不同策略进行操作
// DELETE_ON_CANCELLATION: 当作业取消时,Checkpoint 状态信息会被删除,因此取消任务后,不能从 Checkpoint 位置进行恢复任务
// RETAIN_ON_CANCELLATION(多): 当作业手动取消时,将会保留作业的 Checkpoint 状态信息,要手动清除该作业的 Checkpoint 状态信息
       env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//Flink 默认提供 Extractly-Once 保证 State 的一致性,还提供了 Extractly-Once,At-Least-Once 两种模式,
// 设置checkpoint的模式为EXACTLY_ONCE,也是默认的,
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置checkpoint的超时时间, 如果规定时间没完成则放弃,默认是10分钟
env.getCheckpointConfig().setCheckpointTimeout(60000);
//设置同一时刻有多少个checkpoint可以同时执行,默认为1就行,以避免占用太多正常数据处理资源
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//设置了重启策略, 作业在失败后能自动恢复,失败后最多重启3次,每次重启间隔10s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));

11.Flink复杂事件处理CEP

11.1.Flink复杂事件处理CEP介绍

(1)什么是FlinkCEP

  • CEP全称 Complex event processing 复杂事件处理

  • FlinkCEP 是在 Flink 之上实现的复杂事件处理(CEP)库
  • 擅长高吞吐、低延迟的处理,市场上有多种CEP的解决方案,例如Spark,但是Flink专门类库更方便使用

(2)FlinkCEP用途

  • 检测和发现无边界事件流中多个记录的关联规则,得到满足规则的复杂事件
  • 允许业务定义要从输入流中提取的复杂模式序列

(3)FlinkCEP使用流程

  • 定义pattern
  • pattern应用到数据流,得到模式流
  • 从模式流 获取结果

(4)CEP并不包含在flink中,使用前需要自己导入

 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-cep-scala_2.11</artifactId>
     <version>1.7.0</version>
</dependency>

11.2.Flink的复杂事件CEP常见概念

(1)模式(Pattern):定义处理事件的规则

  • 三种模式PatternAPI
  • 个体模式(Individual Patterns):组成复杂规则的每一个单独的模式定义,就是个体模式

组合模式(Combining Patterns):很多个体模式组合起来,形成组合模式

模式组(Groups of Patterns):将一个组合模式作为条件嵌套在个体模式里,就是模式组

近邻模式


严格近邻:期望所有匹配事件严格地一个接一个出现,中间没有任何不匹配的事件, API是.next()

宽松近邻:允许中间出现不匹配的事件,API是.followedBy()

非确定性宽松近邻:可以忽略已经匹配的条件,API是followedByAny()

指定时间约束:指定模式在多长时间内匹配有效,API是within

如果您不希望事件类型直接跟随另一个,notNext()

如果您不希望事件类型介于其他两种事件类型之间,notFollowedBy()

模式分类

单次模式:接收一次一个事件

循环模式:接收一个或多个事件

(2)其他参数

times:指定固定的循环执行次数

greedy:贪婪模式,尽可能多触发

oneOrMore:指定触发一次或多次

timesOrMore:指定触发固定以上的次数

optional:要么不触发要么触发指定的次数

11.3.Flink复杂事件CEP案例实战

需求:同个账号,在5秒内连续登录失败2次,则认为存在而已登录问题

数据格式 李祥,2022-11-11 12:01:01,-1

/**
 * cep-demo
 * @author lixiang
 */
public class FlinkCEPDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> ds = env.socketTextStream("192.168.139.20",8888);
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] arr = value.split(",");
                out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
            }
        });
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forMonotonousTimestamps()
                .withTimestampAssigner((event, timestamp) -> TimeUtil.strToDate(event.f1).getTime()));
        KeyedStream<Tuple3<String, String, Integer>, String> keyedStream = watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        });
        //定义模式
        Pattern<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>> pattern = Pattern.<Tuple3<String, String, Integer>>
                        begin("firstTimeLogin")
                .where(new SimpleCondition<Tuple3<String, String, Integer>>() {
                    @Override
                    public boolean filter(Tuple3<String, String, Integer> value) throws Exception {
                        return value.f2 == -1;
                    }
                })
                .next("secondTimeLogin")
                .where(new SimpleCondition<Tuple3<String, String, Integer>>() {
                    @Override
                    public boolean filter(Tuple3<String, String, Integer> value) throws Exception {
                        return value.f2 == -1;
                    }
                }).within(Time.seconds(5));
        //匹配检查
        PatternStream<Tuple3<String, String, Integer>> patternStream = CEP.pattern(keyedStream, pattern);
        SingleOutputStreamOperator<Tuple3<String, String, String>> select = patternStream.select(new PatternSelectFunction<Tuple3<String, String, Integer>, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> select(Map<String, List<Tuple3<String, String, Integer>>> map) throws Exception {
                Tuple3<String, String, Integer> firstLoginFail = map.get("firstTimeLogin").get(0);
                Tuple3<String, String, Integer> secondLoginFail = map.get("secondTimeLogin").get(0);
                return Tuple3.of(firstLoginFail.f0, firstLoginFail.f1, secondLoginFail.f1);
            }
        });
        select.print("匹配结果");
        env.execute("CEP job");
    }
}
  • 测试
张三,2022-11-11 12:01:01,-1
李四,2022-11-11 12:01:10,-1
李四,2022-11-11 12:01:11,-1
张三,2022-11-11 12:01:13,-1
李四,2022-11-11 12:01:14,-1
李四,2022-11-11 12:01:15,1
张三,2022-11-11 12:01:16,-1
李四,2022-11-11 12:01:17,-1
张三,2022-11-11 12:01:20,1

12.Flink项目打包插件+部署

12.1.Flink服务端多种部署模式

Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同

  • 直接部署启动服务
  • Standalone Cluster集群部署,flink自带集群模式
  • Hadoop YARN 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率
  • Kubernetes 部署
  • Docker部署
  • 53fa7860b2c244a9b9a3cfd323294a66.jpg

12.2.Flink本地模式部署Linux服务器

(1)安装JDK8环境

(1)上传jdk1.8安装包,解压到指目录
tar -xvf jdk-8u181-linux-x64.tar.gz -C /usr/local/
(2)查看解压后的文件
[root@flink ~]# cd /usr/local
[root@flink local]# ls
bin  etc  flink-1.13.1  games  include  jdk1.8.0_181  lib  lib64  libexec  sbin  share  src
(3)jdk1.8.0_181重命名为jdk1.8
mv jdk1.8.0_181/ jdk1.8
(4)配置环境变量
vi /etc/profile
添加配置:
JAVA_HOME=/usr/local/jdk1.8
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
(5)刷新配置
source /etc/profile
(6)查看java环境
[root@flink ~]# java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
  • 注意:如果不安装jdk环境的话启动flink会报这个错误
Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME.

(2)准备flink环境

tar -xvf flink-1.13.1-bin-scala_2.12.tgz -C /usr/local/ 
调整配置文件:conf/flink-conf.yaml
#web ui 端口
rest.port=8081
#调整jobmanager和taskmanager的大小,根据自己的机器进行调整
jobmanager.memory.process.size: 256m
taskmanager.memory.process.size: 256m


931200a761b14a7294202f335c9eb208.jpg

本地模式用到这两个脚本
start-cluster.sh
stop-cluster.sh
启动本地模式:./start-cluster.sh
注意这会可能会报错:
The derived from fraction jvm overhead memory (19.200mb (20132659 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
原因分析:
flink最少需要192M的内存才能启动,jdk1.8之后堆内存采用的是元空间,元空间是根据内存大小自动分配的。如果元空间给的太小,flink将会启动不起来。
调整如下配置:conf/flink-conf.yaml
taskmanager.memory.process.size: 512m
taskmanager.memory.framework.heap.size: 64m
taskmanager.memory.framework.off-heap.size: 64m
taskmanager.memory.jvm-metaspace.size: 64m
taskmanager.memory.jvm-overhead.fraction: 0.2
taskmanager.memory.jvm-overhead.min: 16m
taskmanager.memory.jvm-overhead.max: 64m
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 1mb
taskmanager.memory.network.max: 256mb
  • 重新启动,成功

72b2885066154f9f94b4f5315e1b79bb.jpg

  • 访问web UI ip:8081


ffb46c06437847e2a1cb18f6b8d0f9f6.jpg


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
108 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1075 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
24天前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
142 56
|
23天前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
296 4
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
62 1
|
2月前
|
存储 运维 监控
实时计算Flink版在稳定性、性能、开发运维、安全能力等等跟其他引擎及自建Flink集群比较。
实时计算Flink版在稳定性、性能、开发运维和安全能力等方面表现出色。其自研的高性能状态存储引擎GeminiStateBackend显著提升了作业稳定性,状态管理优化使性能提升40%以上。核心性能较开源Flink提升2-3倍,资源利用率提高100%。提供一站式开发管理、自动化运维和丰富的监控告警功能,支持多语言开发和智能调优。安全方面,具备访问控制、高可用保障和全链路容错能力,确保企业级应用的安全与稳定。
51 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
103 0
|
2月前
|
SQL 运维 大数据
大数据实时计算产品的对比测评
在使用多种Flink实时计算产品后,我发现Flink凭借其流批一体的优势,在实时数据处理领域表现出色。它不仅支持复杂的窗口机制与事件时间处理,还具备高效的数据吞吐能力和精准的状态管理,确保数据处理既快又准。此外,Flink提供了多样化的编程接口和运维工具,简化了开发流程,但在界面友好度上还有提升空间。针对企业级应用,Flink展现了高可用性和安全性,不过价格因素可能影响小型企业的采纳决策。未来可进一步优化文档和自动化调优工具,以提升用户体验。
133 0
|
24天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
196 7
|
24天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
39 2