引言
大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。
上期带大家用StructredStreaming做了双十一实时报表分析,没看过的朋友可以看看,这是链接:
StructredStreaming+Kafka+Mysql(Spark实时计算| 天猫双十一实时报表分析)
这次导师布置了一个最新任务:需求不变,用Flink完成,
阿这
我是菜鸡,刚学Flink,不懂阿~
没办法,只能硬着头皮上了!
先明确一下需求:
1.实时计算出当天零点截止到当前时间的销售总额
2.计算出各个分类的销售额最大的top3
3.每秒钟更新一次统计结果
不管会不会,上来先创建一个流:
//TODO 1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置成流批一体模式 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
牛批~
下一步:
添加订单数据,Tuple2<分类, 金额>
DataStreamSource<Tuple2<String, Double>> orderDS = env.addSource(new MySource());
第三步转换:
需求一:每秒预聚合各个分类的销售总额:从当天0点开始截止到目前为止的各个分类的销售总额
SingleOutputStreamOperator<CategoryPojo> aggregateResult = orderDS.keyBy(t -> t.f0) //注意:中国使用UTC+08:00,您需要一天大小的时间窗口, //窗口从当地时间的每00:00:00开始,您可以使用{@code of(time.days(1),time.hours(-8))} .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) //注意:下面表示每秒触发计算 .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) //聚合(可以使用之前学习的简单聚合:sum/reduce/或自定义聚合:apply或使用aggregate聚合(可以指定如何聚合及如何收集聚合结果)) .aggregate(new MyAggregate(), new MyWindow());
敲了这么久,忙得满头大汉~先看看效果对不对,不对不就白干一场
了:
aggregateResult.print(); env.execute();
还好,成功了!
需求二:计算所有分类的销售总额和分类销售额最大Top3
aggregateResult.keyBy(c -> c.getDateTime()) .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) //先按照时间对数据分组,因为后续要每秒更新/计算销售总额和分类销售额Top3 .process(new MyProcessWindowFunction());
好像又成功了吧?!Flink实时计算也没那么难
加上注释只有76行代码
…
眉头一皱,发现事情并没有那么简单
博主,博主还有自定义类呢,被你吞了??
CategoryPojo.class
/** * 用于存储聚合的结果 */ @Data @AllArgsConstructor @NoArgsConstructor public static class CategoryPojo { private String category;//分类名称 private double totalPrice;//该分类总销售额 private String dateTime;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可 }
MyWindow .class
/** // * interface WindowFunction<IN, OUT, KEY, W extends Window> // * 自定义窗口函数,实现窗口聚合数据的收集 // */ public static class MyWindow implements WindowFunction<Double, CategoryPojo, String, TimeWindow> { private FastDateFormat df =FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss"); @Override public void apply(String key, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception { double totalPrice =0d; for (Double price : input) { totalPrice +=price; } CategoryPojo categoryPojo = new CategoryPojo(); categoryPojo.setCategory(key); categoryPojo.setDateTime(df.format(System.currentTimeMillis())); categoryPojo.setTotalPrice(totalPrice); out.collect(categoryPojo); } }
MyAggregate.class
/** * interface AggregateFunction<IN, ACC, OUT> * 自定义聚合函数,实现各个分类销售额的预聚合/累加 */ public static class MyAggregate implements AggregateFunction<Tuple2<String,Double>,Double,Double>{ //初始化累加器 @Override public Double createAccumulator() { return 0d; } //累加过程 @Override public Double add(Tuple2<String, Double> value, Double accumulator) { return value.f1+accumulator; } //累加结果 @Override public Double getResult(Double accumulator) { return accumulator; } //合并结果 @Override public Double merge(Double a, Double b) { return a+b; } }
计算分类销售额最大的Top3,我用的是之前学的外比较器进行排序
:
数据结构与算法__冒泡排序__Java外比较器和内比较器(排序专题)
MyProcessWindowFunction.class
/** * abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> */ public static class MyProcessWindowFunction extends ProcessWindowFunction<CategoryPojo, Object, String, TimeWindow> { @Override public void process(String key, Context context, Iterable<CategoryPojo> categoryPojos, Collector<Object> out) throws Exception { Double totalAmount = 0d;//用来记录销售总额 //尝试使用外比较器进行排序 ArrayList<CategoryPojo> list = new ArrayList<>(); for (CategoryPojo categoryPojo : categoryPojos) { //--1.计算截止到目前为止的所有分类的销售总额 totalAmount += categoryPojo.getTotalPrice(); //--2. 分类销售额最大的Top3 if (list.size()<3){ list.add(categoryPojo); }else { //>=3 CategoryPojo first = list.get(0); if (categoryPojo.getTotalPrice()>first.getTotalPrice()){ list.remove(first); list.add(categoryPojo); }//进来元素小就不用变 } } list.sort(new Comparator<CategoryPojo>() { @Override public int compare(CategoryPojo o1, CategoryPojo o2) { return (int) (o1.getTotalPrice()-o2.getTotalPrice()); } }); //--3.直接在这里输出 System.out.println("================================================================================================================================"); System.out.println("----当前时间:----"); System.out.println(key); System.out.println("----销售总额:----"); System.out.println(new BigDecimal(totalAmount).setScale(2, RoundingMode.HALF_UP)); System.out.println("----销售额Top3分类:----"); list.stream() .map(c -> { c.setTotalPrice(new BigDecimal(c.getTotalPrice()).setScale(2, RoundingMode.HALF_UP).doubleValue()); return c; }) .sorted((c1, c2) -> c1.getTotalPrice() <= c2.getTotalPrice() ? 1 : -1) .forEach(System.out::println); }}
下面是完整代码:
package demo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.ArrayList; import java.util.Comparator; import java.util.Random; /** * @author ChinaManor * #Description * * Desc今天我们就做一个最简单的模拟电商统计大屏的小例子, * * 需求如下: * * 1.实时计算出当天零点截止到当前时间的销售总额 * * 2.计算出各个分类的销售额最大的top3 * * 3.每秒钟更新一次统计结果 * #Date: 25/6/2021 08:28 */ public class T4 { public static void main(String[] args) throws Exception { //TODO 1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //TODO 2.source //订单数据Tuple2<分类, 金额> DataStreamSource<Tuple2<String, Double>> orderDS = env.addSource(new MySource()); //TODO 3.transformation //-1.每秒预聚合各个分类的销售总额:从当天0点开始截止到目前为止的各个分类的销售总额 SingleOutputStreamOperator<CategoryPojo> aggregateResult = orderDS.keyBy(t -> t.f0) //注意:中国使用UTC+08:00,您需要一天大小的时间窗口, //窗口从当地时间的每00:00:00开始,您可以使用{@code of(time.days(1),time.hours(-8))} .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) //注意:下面表示每秒触发计算 .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) //聚合(可以使用之前学习的简单聚合:sum/reduce/或自定义聚合:apply或使用aggregate聚合(可以指定如何聚合及如何收集聚合结果)) .aggregate(new MyAggregate(), new MyWindow()); //输出查看下预聚合的结果 // aggregateResult.print(); //按照分类将订单金额进行聚合: //分类名称 金额 时间 /* //男装 100 2021-11-11 11:11:11 //女装 100 2021-11-11 11:11:11 //男装 200 2021-11-11 11:11:12 //女装 200 2021-11-11 11:11:12*/ //TODO 4.sink //-2.计算所有分类的销售总额和分类销售额最大Top3 //要求每秒更新/计算所有分类目前的销售总额和分类销售额Top3 // aggregateResult.keyBy(CategoryPojo::getDateTime) aggregateResult.keyBy(c -> c.getDateTime()) .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) //先按照时间对数据分组,因为后续要每秒更新/计算销售总额和分类销售额Top3 .process(new MyProcessWindowFunction()); //TODO 5.execute env.execute(); } /** * abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> */ public static class MyProcessWindowFunction extends ProcessWindowFunction<CategoryPojo, Object, String, TimeWindow> { @Override public void process(String key, Context context, Iterable<CategoryPojo> categoryPojos, Collector<Object> out) throws Exception { Double totalAmount = 0d;//用来记录销售总额 //尝试使用外比较器进行排序 ArrayList<CategoryPojo> list = new ArrayList<>(); for (CategoryPojo categoryPojo : categoryPojos) { //--1.计算截止到目前为止的所有分类的销售总额 totalAmount += categoryPojo.getTotalPrice(); //--2. 分类销售额最大的Top3 if (list.size()<3){ list.add(categoryPojo); }else { //>=3 CategoryPojo first = list.get(0); if (categoryPojo.getTotalPrice()>first.getTotalPrice()){ list.remove(first); list.add(categoryPojo); }//进来元素小就不用变 } } list.sort(new Comparator<CategoryPojo>() { @Override public int compare(CategoryPojo o1, CategoryPojo o2) { return (int) (o1.getTotalPrice()-o2.getTotalPrice()); } }); //--3.直接在这里输出 System.out.println("================================================================================================================================"); System.out.println("----当前时间:----"); System.out.println(key); System.out.println("----销售总额:----"); System.out.println(new BigDecimal(totalAmount).setScale(2, RoundingMode.HALF_UP)); System.out.println("----销售额Top3分类:----"); list.stream() .map(c -> { c.setTotalPrice(new BigDecimal(c.getTotalPrice()).setScale(2, RoundingMode.HALF_UP).doubleValue()); return c; }) .sorted((c1, c2) -> c1.getTotalPrice() <= c2.getTotalPrice() ? 1 : -1) .forEach(System.out::println); }} /** * interface AggregateFunction<IN, ACC, OUT> * 自定义聚合函数,实现各个分类销售额的预聚合/累加 */ public static class MyAggregate implements AggregateFunction<Tuple2<String,Double>,Double,Double>{ //初始化累加器 @Override public Double createAccumulator() { return 0d; } //累加过程 @Override public Double add(Tuple2<String, Double> value, Double accumulator) { return value.f1+accumulator; } //累加结果 @Override public Double getResult(Double accumulator) { return accumulator; } //合并结果 @Override public Double merge(Double a, Double b) { return a+b; } } /** // * interface WindowFunction<IN, OUT, KEY, W extends Window> // * 自定义窗口函数,实现窗口聚合数据的收集 // */ public static class MyWindow implements WindowFunction<Double, CategoryPojo, String, TimeWindow> { private FastDateFormat df =FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss"); @Override public void apply(String key, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception { double totalPrice =0d; for (Double price : input) { totalPrice +=price; } CategoryPojo categoryPojo = new CategoryPojo(); categoryPojo.setCategory(key); categoryPojo.setDateTime(df.format(System.currentTimeMillis())); categoryPojo.setTotalPrice(totalPrice); out.collect(categoryPojo); } } /** * 用于存储聚合的结果 */ @Data @AllArgsConstructor @NoArgsConstructor public static class CategoryPojo { private String category;//分类名称 private double totalPrice;//该分类总销售额 private String dateTime;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可 } /** * 自定义数据源实时产生订单数据Tuple2<分类, 金额> */ public static class MySource implements SourceFunction<Tuple2<String,Double>>{ private boolean flag =true; private String[] categorys ={"男装","女装","童装", "洗护"}; private Random random =new Random(); @Override public void run(SourceContext<Tuple2<String, Double>> ctx) throws Exception { while (flag){ //随机生成分类和金额 int index = random.nextInt(categorys.length); String category = categorys[index];//随机分类 double price = random.nextDouble() * 100; //注意生成[0,100) ctx.collect(Tuple2.of(category,price)); Thread.sleep(20); } } @Override public void cancel() { flag =false; } } }
兄弟萌,我考完试了
这是考试的需求,多了从Kafka读取需求:
1、从kafka读取到数据给5分 2、数据简单处理切分给5分 3、给出合适的数据类型给5分 4、销售总额和分类的订单额数据要精确到小数点后两位5分 5、设置合理的窗口和触发情况给10分 6、实现销售总额正确输出,每秒钟更新一次 30分 7、实现各分类的订单额降序输出,每秒钟更新一次 30分 8、是否按照要求写注释 5分 9、代码整洁度、健壮度 5分
这是参考答案:
Flink几个函数这块,我还需要加强~
package demo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import java.math.BigDecimal; import java.text.SimpleDateFormat; import java.util.*; public class KafkaToFlink { public static void main(String[] args) throws Exception { //TODO 0.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //TODO 1.source //准备kafka连接参数 Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");//集群地址 props.setProperty("group.id", "flink");//消费者组id props.setProperty("auto.offset.reset","latest");//latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费 /earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费 props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测 props.setProperty("enable.auto.commit", "true");//自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中) props.setProperty("auto.commit.interval.ms", "2000");//自动提交的时间间隔 //使用连接参数创建FlinkKafkaConsumer/kafkaSource FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), props); //使用kafkaSource DataStream<String> kafkaDS = env.addSource(kafkaSource); DataStream<Tuple2<String, Double>> sourceKafka = kafkaDS.map(new MapFunction<String, Tuple2<String, Double>>() { @Override public Tuple2<String, Double> map(String value) throws Exception { String[] lines = value.split(":"); return Tuple2.of(lines[0], Double.valueOf(lines[1])); } }); //todo 3.transformation //3.1定义大小为一天的窗口,第二个参数表示中国使用的UTC+08:00时区比UTC时间早 //3.2定义一个1s的触发器 //3.3聚合结果 DataStream<CategoryPojo> tempAggResult = sourceKafka.keyBy(t -> t.f0) .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) .aggregate(new TestAggregate(), new TestWindowResult()); //todo 4.使用上面聚合的结果,实现业务需求: //4.1.实时计算出当天零点截止到当前时间的销售总额 //4.2.计算出各个分类的销售topN //4.3.每秒钟更新一次统计结果 tempAggResult.keyBy(CategoryPojo::getDateTime) .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) .process(new TestProcessWindowFunction()); //todo 5.execute env.execute(); } //public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> private static class TestProcessWindowFunction extends ProcessWindowFunction<CategoryPojo, Object, String, TimeWindow> { @Override public void process(String datetime, Context context, Iterable<CategoryPojo> elements, Collector<Object> out) throws Exception { double totalPrice = 0D; double roundPrice = 0D; Map<String, Double> map = new TreeMap<String, Double>(); for (CategoryPojo element : elements) { //4.1.实时计算出当天零点截止到当前时间的销售总额 totalPrice += element.totalPrice; BigDecimal bigDecimal = new BigDecimal(totalPrice); roundPrice = bigDecimal.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();//四舍五入 // 4.2.计算出各个分类的销售topN map.put(element.category,element.totalPrice); } ArrayList<Map.Entry<String,Double>>list= new ArrayList<>(map.entrySet()); Collections.sort(list, new Comparator<Map.Entry<String, Double>>() { @Override public int compare(Map.Entry<String, Double> o1, Map.Entry<String, Double> o2) { return o2.getValue().compareTo(o1.getValue()); } }); System.out.println("时间 : " + datetime + " 总价 : " + roundPrice + "\ntopN: "); for (int i = 0; i <list.size(); i++) { System.out.println(list.get(i).getKey()+": "+list.get(i).getValue()); } System.out.println("---------------------------------------"); } } //public interface AggregateFunction<IN, ACC, OUT> private static class TestAggregate implements AggregateFunction<Tuple2<String, Double>, Double, Double> { @Override public Double createAccumulator() { return 0D; } @Override public Double add(Tuple2<String, Double> value, Double accumulator) { return value.f1 + accumulator; } @Override public Double getResult(Double accumulator) { return accumulator; } @Override public Double merge(Double a, Double b) { return a + b; } } //public interface WindowFunction<IN, OUT, KEY, W extends Window> private static class TestWindowResult implements WindowFunction<Double, CategoryPojo, String, TimeWindow> { //定义一个时间格式化工具用来将当前时间(双十一那天订单的时间)转为String格式 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public void apply(String category, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception { Double price = input.iterator().next(); BigDecimal bigDecimal = new BigDecimal(price); double totalPrice = bigDecimal.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();//四舍五入 long currentTimeMillis = System.currentTimeMillis(); String dateTime = df.format(currentTimeMillis); CategoryPojo categoryPojo = new CategoryPojo(category, totalPrice, dateTime); out.collect(categoryPojo); } } /** * 用于存储聚合的结果 */ @Data @AllArgsConstructor @NoArgsConstructor public static class CategoryPojo { private String category;//分类名称 private double totalPrice;//该分类总销售额 private String dateTime;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可 } }
造数据到Kafka:
package demo; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.Random; public class DataToKafka { public static void main(String[] args) { //1、准备配置文件 Properties props = new Properties(); props.put("bootstrap.servers", "node1.itcast.cn:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("KafkaCustomPartitioner.class", "test.KafkaCustomPartitioner"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //2、创建KafkaProducer KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props); String[] categorys = {"女装", "男装", "图书", "家电", "洗护", "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公"}; Random random = new Random(); while (true){ //随机生成分类和金额 int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1] String category = categorys[index];//获取的随机分类 double price = random.nextDouble() * 100;//注意nextDouble生成的是[0~1)之间的随机数,*100之后表示[0~100) kafkaProducer.send(new ProducerRecord<String, String>("categories",category+":"+price)); //3、发送数据 System.out.println(category+":"+price); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }
总结
最典型的案例便是淘宝双十一活动,每年双十一购物节,除疯狂购物外,最引人注目的就是双十一大屏不停跳跃的成交总额
。在整个计算链路中包括从天猫交易下单购买到数据采集,数据计算,数据校验,最终落到双十一大屏上展示的全链路时间压缩在5秒以内,顶峰计算性能高达数三十万笔订单/秒,通过多条链路流计算备份确保万无一失。
以上便是大数据Flink史上最简单双十一实时分析案例喜欢的小伙伴欢迎一键三连
!!!
感谢李胜步博主提供的思路: