Flink计算PV,UV的案例及问题分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: PV(访问量):即Page View, 即页面浏览量或点击量,用户每次刷新即被计算一次。UV(独立访客):即Unique Visitor,访问您网站的一台电脑客户端为一个访客。00:00-24:00内相同的客户端只被计算一次。一个UV可以用很多PV,一个PV也只能对应一个IP没有这些数据的支持,意味着你不知道产品的发展情况,用户获取成本,UV,PV,注册转化率;没有这些数据做参考,你不会知道接下来提供什么建议给领导采纳,也推测不出领导为啥烦忧,那么就么有任何表现的机会。举两个UV计算的场景:1. 实时计算当天零点起,到当前时间的uv。2. 实时计算当天每个小时的UV。0点

PV(访问量):即Page View, 即页面浏览量或点击量,用户每次刷新即被计算一次。

UV(独立访客):即Unique Visitor,访问您网站的一台电脑客户端为一个访客。00:00-24:00内相同的客户端只被计算一次。
一个UV可以用很多PV,一个PV也只能对应一个IP

没有这些数据的支持,意味着你不知道产品的发展情况,用户获取成本,UV,PV,注册转化率;没有这些数据做参考,你不会知道接下来提供什么建议给领导采纳,也推测不出领导为啥烦忧,那么就么有任何表现的机会。

举两个UV计算的场景:

  1. 实时计算当天零点起,到当前时间的uv。
  2. 实时计算当天每个小时的UV。0点...12点...24点

请问这个用spark streaming如何实现呢?是不是很难有好的思路呢?

今天主要是想给大家用flink来实现一下,在这方面flink确实比较优秀了。

主要技术点就在group by的使用。

下面就是完整的案例:

package org.table.uv;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class ComputeUVDay {

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
    tEnv.registerFunction("DateUtil",new DateUtil());
    tEnv.connect(
            new Kafka()
                    .version("0.10")
                    //   "0.8", "0.9", "0.10", "0.11", and "universal"
                    .topic("jsontest")
                    .property("bootstrap.servers", "localhost:9092")
                    .property("group.id","test")
                    .startFromLatest()
    )
            .withFormat(
                    new Json()
                            .failOnMissingField(false)
                            .deriveSchema()
            )
            .withSchema(

                    new Schema()
                            .field("rowtime", Types.SQL_TIMESTAMP)
                            .rowtime(new Rowtime()
                                    .timestampsFromField("eventtime")
                                    .watermarksPeriodicBounded(2000)
                            )
                            .field("fruit", Types.STRING)
                            .field("number", Types.INT)
            )
            .inAppendMode()
            .registerTableSource("source");

    // 計算天級別的uv

// Table table = tEnv.sqlQuery("select DateUtil(rowtime),count(distinct fruit) from source group by DateUtil(rowtime)");

    // 计算小时级别uv
    Table table = tEnv.sqlQuery("select  DateUtil(rowtime,'yyyyMMddHH'),count(distinct fruit) from source group by DateUtil(rowtime,'yyyyMMddHH')");

    tEnv.toRetractStream(table, Row.class).addSink(new SinkFunction<Tuple2<Boolean, Row>>() {
        @Override
        public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
            System.out.println(value.f1.toString());
        }
    });

    System.out.println(env.getExecutionPlan());
    env.execute("ComputeUVDay");
}

}
其中DateUtil类如下:

package org.table.uv;

import org.apache.flink.table.functions.ScalarFunction;

import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.SimpleDateFormat;

public class DateUtil extends ScalarFunction {

public static String eval(long timestamp){
    String result = "null";
    try {
        DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        result = sdf.format(new Timestamp(timestamp));
    } catch (Exception e) {
        e.printStackTrace();
    }
    return result;
}
public static String eval(long ts, String format) {

    String result = "null";
    try {
        DateFormat sdf = new SimpleDateFormat(format);
        result = sdf.format(ts);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return result;
}
public static void main(String[] args) {
    String eval = eval(System.currentTimeMillis(),"yyyyMMddHH");
    System.out.println(eval);
}

}
代码里面的案例,是可以用于生产中的吗?

假如数据量小可以直接使用,游戏转让平台每秒数据量大的话,就比较麻烦。因为你看group by后面的维度,只有当天date 这个维度,这样就会导致计算状态超级集中而使得内存占用超大进而引发oom。

这种情况解决办法就是将状态打散,然后再次聚合即可,典型的分治思想。

具体做法作为福利分享给球友吧。

还有一个问题就是由于存在全局去重及分组操作,flink内部必然要维护一定的状态信息,那么这些状态信息肯定不是要一直保存的,比如uv,我们只需要更新今天,最多昨天的状态,这个点之前的状态要删除的,不能让他白白占着内存,而导致任务内存消耗巨大,甚至因oom而挂掉。

StreamQueryConfig streamQueryConfig = tEnv.queryConfig();
streamQueryConfig.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(15));

tEnv.sqlUpdate(sql,streamQueryConfig);
再有就是能使用事件时间吗?事件时间假如事件严重超时了,比如,我们状态保留时间设置的是两天,两天之后状态清除,那么这时候来了事件时间刚刚好是两天之前的,由于已经没有状态就会重新计算uv覆盖已经生成的值,就导致值错误了,这个问题如何解决呢?

这算是一个疑问吧?

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
64 0
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
121 0
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
42 1
|
1月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
43 0
|
1月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
37 0
|
1月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
112 0
|
3月前
|
SQL 流计算
Flink SQL 在快手实践问题之使用Dynamic Cumulate Window绘制直播间累计UV曲线如何解决
Flink SQL 在快手实践问题之使用Dynamic Cumulate Window绘制直播间累计UV曲线如何解决
57 1
|
4月前
|
SQL 网络安全 API
实时计算 Flink版产品使用问题之使用ProcessTime进行窗口计算,并且有4台机器的时间提前了2个小时,会导致什么情况
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在进行DWS层的实时聚合计算时,遇到多次更新同一个字段的情况,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即&quot;Top N&quot;问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
178 1
下一篇
无影云桌面