Flink史上最简单双十一实时分析案例

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink史上最简单双十一实时分析案例

引言

大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。

上期带大家用StructredStreaming做了双十一实时报表分析,没看过的朋友可以看看,这是链接:

StructredStreaming+Kafka+Mysql(Spark实时计算| 天猫双十一实时报表分析)

这次导师布置了一个最新任务:需求不变,用Flink完成,

阿这

我是菜鸡,刚学Flink,不懂阿~

没办法,只能硬着头皮上了!

先明确一下需求:

1.实时计算出当天零点截止到当前时间的销售总额

2.计算出各个分类的销售额最大的top3

3.每秒钟更新一次统计结果

不管会不会,上来先创建一个流:

//TODO 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置成流批一体模式        
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

牛批~

下一步:

添加订单数据,Tuple2<分类, 金额>

DataStreamSource<Tuple2<String, Double>> orderDS = env.addSource(new MySource());

第三步转换:

需求一:每秒预聚合各个分类的销售总额:从当天0点开始截止到目前为止的各个分类的销售总额

SingleOutputStreamOperator<CategoryPojo> aggregateResult = orderDS.keyBy(t -> t.f0)
                //注意:中国使用UTC+08:00,您需要一天大小的时间窗口,
                //窗口从当地时间的每00:00:00开始,您可以使用{@code of(time.days(1),time.hours(-8))}
                .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
                //注意:下面表示每秒触发计算
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
                //聚合(可以使用之前学习的简单聚合:sum/reduce/或自定义聚合:apply或使用aggregate聚合(可以指定如何聚合及如何收集聚合结果))
                .aggregate(new MyAggregate(), new MyWindow());

敲了这么久,忙得满头大汉~先看看效果对不对,不对不就白干一场了:

aggregateResult.print();
env.execute();

还好,成功了!

需求二:计算所有分类的销售总额和分类销售额最大Top3

aggregateResult.keyBy(c -> c.getDateTime())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
                //先按照时间对数据分组,因为后续要每秒更新/计算销售总额和分类销售额Top3
                .process(new MyProcessWindowFunction());

好像又成功了吧?!Flink实时计算也没那么难

加上注释只有76行代码

眉头一皱,发现事情并没有那么简单

博主,博主还有自定义类呢,被你吞了??

CategoryPojo.class

/**
     * 用于存储聚合的结果
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CategoryPojo {
        private String category;//分类名称
        private double totalPrice;//该分类总销售额
        private String dateTime;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可
    }

MyWindow .class

/**
     //     * interface WindowFunction<IN, OUT, KEY, W extends Window>
     //     * 自定义窗口函数,实现窗口聚合数据的收集
     //     */
    public static class MyWindow implements WindowFunction<Double, CategoryPojo, String, TimeWindow> {
        private FastDateFormat df =FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
        @Override
        public void apply(String key, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception {
            double totalPrice =0d;
            for (Double price : input) {
                totalPrice +=price;
            }
            CategoryPojo categoryPojo = new CategoryPojo();
            categoryPojo.setCategory(key);
            categoryPojo.setDateTime(df.format(System.currentTimeMillis()));
            categoryPojo.setTotalPrice(totalPrice);
            out.collect(categoryPojo);
        }
    }

MyAggregate.class

/**
     * interface AggregateFunction<IN, ACC, OUT>
     * 自定义聚合函数,实现各个分类销售额的预聚合/累加
     */
    public static class MyAggregate implements AggregateFunction<Tuple2<String,Double>,Double,Double>{
        //初始化累加器
        @Override
        public Double createAccumulator() {
            return 0d;
        }
        //累加过程
        @Override
        public Double add(Tuple2<String, Double> value, Double accumulator) {
            return value.f1+accumulator;
        }
        //累加结果
        @Override
        public Double getResult(Double accumulator) {
            return accumulator;
        }
        //合并结果
        @Override
        public Double merge(Double a, Double b) {
            return a+b;
        }
    }

计算分类销售额最大的Top3,我用的是之前学的外比较器进行排序:

数据结构与算法__冒泡排序__Java外比较器和内比较器(排序专题)

MyProcessWindowFunction.class

/**
     * abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
     */
    public static class MyProcessWindowFunction extends ProcessWindowFunction<CategoryPojo, Object, String, TimeWindow> {
        @Override
        public void process(String key, Context context, Iterable<CategoryPojo> categoryPojos, Collector<Object> out) throws Exception {
            Double totalAmount = 0d;//用来记录销售总额
            //尝试使用外比较器进行排序
            ArrayList<CategoryPojo> list = new ArrayList<>();
            for (CategoryPojo categoryPojo : categoryPojos) {
                //--1.计算截止到目前为止的所有分类的销售总额
                totalAmount += categoryPojo.getTotalPrice();
                //--2. 分类销售额最大的Top3
                if (list.size()<3){
                    list.add(categoryPojo);
                }else {
                    //>=3
                    CategoryPojo first = list.get(0);
                    if (categoryPojo.getTotalPrice()>first.getTotalPrice()){
                        list.remove(first);
                        list.add(categoryPojo);
                    }//进来元素小就不用变
                }
            }
            list.sort(new Comparator<CategoryPojo>() {
                @Override
                public int compare(CategoryPojo o1, CategoryPojo o2) {
                    return (int) (o1.getTotalPrice()-o2.getTotalPrice());
                }
            });
            //--3.直接在这里输出
            System.out.println("================================================================================================================================");
            System.out.println("----当前时间:----");
            System.out.println(key);
            System.out.println("----销售总额:----");
            System.out.println(new BigDecimal(totalAmount).setScale(2, RoundingMode.HALF_UP));
            System.out.println("----销售额Top3分类:----");
            list.stream()
                    .map(c -> {
                        c.setTotalPrice(new BigDecimal(c.getTotalPrice()).setScale(2, RoundingMode.HALF_UP).doubleValue());
                        return c;
                    })
                    .sorted((c1, c2) -> c1.getTotalPrice() <= c2.getTotalPrice() ? 1 : -1)
                    .forEach(System.out::println); }}

下面是完整代码:

package demo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Random;
/**
 * @author ChinaManor
 * #Description
 * * Desc今天我们就做一个最简单的模拟电商统计大屏的小例子,
 *  * 需求如下:
 *  * 1.实时计算出当天零点截止到当前时间的销售总额
 *  * 2.计算出各个分类的销售额最大的top3
 *  * 3.每秒钟更新一次统计结果
 * #Date: 25/6/2021 08:28
 */
public class T4 {
    public static void main(String[] args) throws Exception {
        //TODO 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //TODO 2.source
        //订单数据Tuple2<分类, 金额>
        DataStreamSource<Tuple2<String, Double>> orderDS = env.addSource(new MySource());
        //TODO 3.transformation
        //-1.每秒预聚合各个分类的销售总额:从当天0点开始截止到目前为止的各个分类的销售总额
        SingleOutputStreamOperator<CategoryPojo> aggregateResult = orderDS.keyBy(t -> t.f0)
                //注意:中国使用UTC+08:00,您需要一天大小的时间窗口,
                //窗口从当地时间的每00:00:00开始,您可以使用{@code of(time.days(1),time.hours(-8))}
                .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
                //注意:下面表示每秒触发计算
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
                //聚合(可以使用之前学习的简单聚合:sum/reduce/或自定义聚合:apply或使用aggregate聚合(可以指定如何聚合及如何收集聚合结果))
                .aggregate(new MyAggregate(), new MyWindow());
        //输出查看下预聚合的结果
//        aggregateResult.print();
        //按照分类将订单金额进行聚合:
        //分类名称  金额  时间
     /*   //男装  100   2021-11-11 11:11:11
        //女装  100   2021-11-11 11:11:11
        //男装  200   2021-11-11 11:11:12
        //女装  200   2021-11-11 11:11:12*/
        //TODO 4.sink
        //-2.计算所有分类的销售总额和分类销售额最大Top3
        //要求每秒更新/计算所有分类目前的销售总额和分类销售额Top3
//        aggregateResult.keyBy(CategoryPojo::getDateTime)
          aggregateResult.keyBy(c -> c.getDateTime())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
                //先按照时间对数据分组,因为后续要每秒更新/计算销售总额和分类销售额Top3
                .process(new MyProcessWindowFunction());
        //TODO 5.execute
        env.execute();
    }
    /**
     * abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
     */
    public static class MyProcessWindowFunction extends ProcessWindowFunction<CategoryPojo, Object, String, TimeWindow> {
        @Override
        public void process(String key, Context context, Iterable<CategoryPojo> categoryPojos, Collector<Object> out) throws Exception {
            Double totalAmount = 0d;//用来记录销售总额
            //尝试使用外比较器进行排序
            ArrayList<CategoryPojo> list = new ArrayList<>();
            for (CategoryPojo categoryPojo : categoryPojos) {
                //--1.计算截止到目前为止的所有分类的销售总额
                totalAmount += categoryPojo.getTotalPrice();
                //--2. 分类销售额最大的Top3
                if (list.size()<3){
                    list.add(categoryPojo);
                }else {
                    //>=3
                    CategoryPojo first = list.get(0);
                    if (categoryPojo.getTotalPrice()>first.getTotalPrice()){
                        list.remove(first);
                        list.add(categoryPojo);
                    }//进来元素小就不用变
                }
            }
            list.sort(new Comparator<CategoryPojo>() {
                @Override
                public int compare(CategoryPojo o1, CategoryPojo o2) {
                    return (int) (o1.getTotalPrice()-o2.getTotalPrice());
                }
            });
            //--3.直接在这里输出
            System.out.println("================================================================================================================================");
            System.out.println("----当前时间:----");
            System.out.println(key);
            System.out.println("----销售总额:----");
            System.out.println(new BigDecimal(totalAmount).setScale(2, RoundingMode.HALF_UP));
            System.out.println("----销售额Top3分类:----");
            list.stream()
                    .map(c -> {
                        c.setTotalPrice(new BigDecimal(c.getTotalPrice()).setScale(2, RoundingMode.HALF_UP).doubleValue());
                        return c;
                    })
                    .sorted((c1, c2) -> c1.getTotalPrice() <= c2.getTotalPrice() ? 1 : -1)
                    .forEach(System.out::println); }}
    /**
     * interface AggregateFunction<IN, ACC, OUT>
     * 自定义聚合函数,实现各个分类销售额的预聚合/累加
     */
    public static class MyAggregate implements AggregateFunction<Tuple2<String,Double>,Double,Double>{
        //初始化累加器
        @Override
        public Double createAccumulator() {
            return 0d;
        }
        //累加过程
        @Override
        public Double add(Tuple2<String, Double> value, Double accumulator) {
            return value.f1+accumulator;
        }
        //累加结果
        @Override
        public Double getResult(Double accumulator) {
            return accumulator;
        }
        //合并结果
        @Override
        public Double merge(Double a, Double b) {
            return a+b;
        }
    }
    /**
     //     * interface WindowFunction<IN, OUT, KEY, W extends Window>
     //     * 自定义窗口函数,实现窗口聚合数据的收集
     //     */
    public static class MyWindow implements WindowFunction<Double, CategoryPojo, String, TimeWindow> {
        private FastDateFormat df =FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
        @Override
        public void apply(String key, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception {
            double totalPrice =0d;
            for (Double price : input) {
                totalPrice +=price;
            }
            CategoryPojo categoryPojo = new CategoryPojo();
            categoryPojo.setCategory(key);
            categoryPojo.setDateTime(df.format(System.currentTimeMillis()));
            categoryPojo.setTotalPrice(totalPrice);
            out.collect(categoryPojo);
        }
    }
    /**
     * 用于存储聚合的结果
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CategoryPojo {
        private String category;//分类名称
        private double totalPrice;//该分类总销售额
        private String dateTime;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可
    }
    /**
     * 自定义数据源实时产生订单数据Tuple2<分类, 金额>
     */
    public static class MySource implements SourceFunction<Tuple2<String,Double>>{
        private boolean flag =true;
        private String[] categorys ={"男装","女装","童装", "洗护"};
        private Random random =new Random();
        @Override
        public void run(SourceContext<Tuple2<String, Double>> ctx) throws Exception {
            while (flag){
            //随机生成分类和金额
                int index = random.nextInt(categorys.length);
                String category = categorys[index];//随机分类
                double price = random.nextDouble() * 100; //注意生成[0,100)
                ctx.collect(Tuple2.of(category,price));
                Thread.sleep(20);
            }
        }
        @Override
        public void cancel() {
            flag =false;
        }
    }
}

兄弟萌,我考完试了

这是考试的需求,多了从Kafka读取需求:

1、从kafka读取到数据给5分
2、数据简单处理切分给5分
3、给出合适的数据类型给5分
4、销售总额和分类的订单额数据要精确到小数点后两位5分
5、设置合理的窗口和触发情况给10分
6、实现销售总额正确输出,每秒钟更新一次 30分
7、实现各分类的订单额降序输出,每秒钟更新一次 30分
8、是否按照要求写注释 5分
9、代码整洁度、健壮度 5分

这是参考答案:

Flink几个函数这块,我还需要加强~

package demo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.*;
public class KafkaToFlink {
    public static void main(String[] args) throws Exception {
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);
        //TODO 1.source
        //准备kafka连接参数
        Properties props  = new Properties();
        props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");//集群地址
        props.setProperty("group.id", "flink");//消费者组id
        props.setProperty("auto.offset.reset","latest");//latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费 /earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
        props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
        props.setProperty("enable.auto.commit", "true");//自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)
        props.setProperty("auto.commit.interval.ms", "2000");//自动提交的时间间隔
        //使用连接参数创建FlinkKafkaConsumer/kafkaSource
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), props);
        //使用kafkaSource
        DataStream<String> kafkaDS = env.addSource(kafkaSource);
        DataStream<Tuple2<String, Double>> sourceKafka = kafkaDS.map(new MapFunction<String, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> map(String value) throws Exception {
                String[] lines = value.split(":");
                return Tuple2.of(lines[0], Double.valueOf(lines[1]));
            }
        });
        //todo 3.transformation
        //3.1定义大小为一天的窗口,第二个参数表示中国使用的UTC+08:00时区比UTC时间早
        //3.2定义一个1s的触发器
        //3.3聚合结果
        DataStream<CategoryPojo> tempAggResult = sourceKafka.keyBy(t -> t.f0)
                .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
                .aggregate(new TestAggregate(), new TestWindowResult());
        //todo 4.使用上面聚合的结果,实现业务需求:
        //4.1.实时计算出当天零点截止到当前时间的销售总额
        //4.2.计算出各个分类的销售topN
        //4.3.每秒钟更新一次统计结果
        tempAggResult.keyBy(CategoryPojo::getDateTime)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
                .process(new TestProcessWindowFunction());
        //todo 5.execute
        env.execute();
    }
    //public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
    private static class TestProcessWindowFunction extends ProcessWindowFunction<CategoryPojo, Object, String, TimeWindow> {
        @Override
        public void process(String datetime, Context context, Iterable<CategoryPojo> elements, Collector<Object> out) throws Exception {
            double totalPrice = 0D;
            double roundPrice = 0D;
            Map<String, Double> map = new TreeMap<String, Double>();
            for (CategoryPojo element : elements) {
                //4.1.实时计算出当天零点截止到当前时间的销售总额
                totalPrice += element.totalPrice;
                BigDecimal bigDecimal = new BigDecimal(totalPrice);
                roundPrice = bigDecimal.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();//四舍五入
                // 4.2.计算出各个分类的销售topN
                map.put(element.category,element.totalPrice);
            }
            ArrayList<Map.Entry<String,Double>>list= new ArrayList<>(map.entrySet());
            Collections.sort(list, new Comparator<Map.Entry<String, Double>>() {
                @Override
                public int compare(Map.Entry<String, Double> o1, Map.Entry<String, Double> o2) {
                    return o2.getValue().compareTo(o1.getValue());
                }
            });
            System.out.println("时间 : " + datetime + "  总价 : " + roundPrice + "\ntopN: ");
            for (int i = 0; i <list.size(); i++) {
                System.out.println(list.get(i).getKey()+": "+list.get(i).getValue());
            }
            System.out.println("---------------------------------------");
        }
    }
    //public interface AggregateFunction<IN, ACC, OUT>
    private static class TestAggregate implements AggregateFunction<Tuple2<String, Double>, Double, Double> {
        @Override
        public Double createAccumulator() {
            return 0D;
        }
        @Override
        public Double add(Tuple2<String, Double> value, Double accumulator) {
            return value.f1 + accumulator;
        }
        @Override
        public Double getResult(Double accumulator) {
            return accumulator;
        }
        @Override
        public Double merge(Double a, Double b) {
            return a + b;
        }
    }
    //public interface WindowFunction<IN, OUT, KEY, W extends Window>
    private static class TestWindowResult implements WindowFunction<Double, CategoryPojo, String, TimeWindow> {
        //定义一个时间格式化工具用来将当前时间(双十一那天订单的时间)转为String格式
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        @Override
        public void apply(String category, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception {
            Double price = input.iterator().next();
            BigDecimal bigDecimal = new BigDecimal(price);
            double totalPrice = bigDecimal.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();//四舍五入
            long currentTimeMillis = System.currentTimeMillis();
            String dateTime = df.format(currentTimeMillis);
            CategoryPojo categoryPojo = new CategoryPojo(category, totalPrice, dateTime);
            out.collect(categoryPojo);
        }
    }
    /**
     * 用于存储聚合的结果
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CategoryPojo {
        private String category;//分类名称
        private double totalPrice;//该分类总销售额
        private String dateTime;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可
    }
}

造数据到Kafka:

package demo;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class DataToKafka {
    public static void main(String[] args) {
        //1、准备配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1.itcast.cn:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("KafkaCustomPartitioner.class", "test.KafkaCustomPartitioner");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //2、创建KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        String[] categorys = {"女装", "男装", "图书", "家电", "洗护", "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公"};
        Random random = new Random();
        while (true){
            //随机生成分类和金额
            int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1]
            String category = categorys[index];//获取的随机分类
            double price = random.nextDouble() * 100;//注意nextDouble生成的是[0~1)之间的随机数,*100之后表示[0~100)
            kafkaProducer.send(new ProducerRecord<String, String>("categories",category+":"+price));
            //3、发送数据
            System.out.println(category+":"+price);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

总结

最典型的案例便是淘宝双十一活动,每年双十一购物节,除疯狂购物外,最引人注目的就是双十一大屏不停跳跃的成交总额。在整个计算链路中包括从天猫交易下单购买到数据采集,数据计算,数据校验,最终落到双十一大屏上展示的全链路时间压缩在5秒以内,顶峰计算性能高达数三十万笔订单/秒,通过多条链路流计算备份确保万无一失。

以上便是大数据Flink史上最简单双十一实时分析案例喜欢的小伙伴欢迎一键三连!!!

感谢李胜步博主提供的思路:

原文链接


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
自然语言处理 监控 数据挖掘
【Flink】Flink中的窗口分析
【4月更文挑战第19天】【Flink】Flink中的窗口分析
|
1月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
63 5
|
1月前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
64 0
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
121 0
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
206 2
|
1月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
43 0
|
1月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
37 0
|
1月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
112 0
|
1月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
68 0
|
3月前
|
消息中间件 数据挖掘 Kafka
揭秘数据洪流中的救世主:Confluent与Flink的实时分析奇迹!
【8月更文挑战第9天】在现代数据处理中,实时数据分析至关重要。Confluent Platform与Apache Flink的组合提供了一套高效的数据流处理方案。Confluent Platform基于Apache Kafka构建,负责数据的收集、传输及管理;而Flink则擅长实时处理这些数据流,进行复杂分析。首先需配置Confluent Platform,包括设置Zookeeper、Kafka brokers及相关服务。
66 1
下一篇
无影云桌面