开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink sql 可不可以实现 过滤某种操作事件

flink sql 可不可以实现 过滤某种操作事件

展开
收起
雪哥哥 2022-11-13 20:18:52 2121 0
7 条回答
写回答
取消 提交回答
  • 学无止境!

    可以实现

    2022-11-30 11:41:07
    赞同 展开评论 打赏
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    可以实现的,可以使用flink sql Filter进行实现

    2022-11-28 10:33:04
    赞同 展开评论 打赏
  • 资深技术专家,全网粉丝10W+。主攻技术开发,擅长分享、写文、测评。

    给你看个flink sql 过滤列子吧:

    public class UdtfJob {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            EnvironmentSettings streamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            StreamTableEnvironment streamTabelEnv = StreamTableEnvironment.create(streamEnv, streamSettings);
            KafkaTabelSource kafkaTabelSource = new KafkaTabelSource();
            streamTabelEnv.registerTableSource("kafkaDataStream", kafkaTabelSource);//使用自定义TableSource
            //注册自定义函数定义三个关键字:"KeyWord","WARNING","illegal"
            streamTabelEnv.registerFunction("CountKEY", new KyeWordCount(new String[]{"KeyWord","WARNING","illegal"}));
            //编写SQL
            Table wordWithCount = streamTabelEnv.sqlQuery("SELECT key,COUNT(countv) AS countsum FROM kafkaDataStream LEFT JOIN LATERAL TABLE(CountKEY(response)) as T(key, countv) ON TRUE GROUP BY key");
            //直接输出Retract流
            streamTabelEnv.toRetractStream(wordWithCount, Row.class).print();
            streamTabelEnv.execute("BLINK STREAMING QUERY");
        }
    }
    
    
    2022-11-24 20:26:58
    赞同 展开评论 打赏
  • Filter:DataStream -> DataStream,每个数据元执行布尔函数,只保存函数返回 true 的数据元。过滤掉零值的过滤器:

    dataStream.filter { _ != 0 }
    
    2022-11-23 17:07:16
    赞同 展开评论 打赏
  • 十年摸盘键,代码未曾试。 今日码示君,谁有上云事。

    使用自定义函数实现关键字过滤统计 自定义表函数(UDTF) 与自定义的标量函数相似,自定义表函数将零,一个或多个标量值作为输入参数。 但是,与标量函数相比,它可以返回任意数量的行作为输出,而不是单个值。

    为了定义表函数,必须扩展基类TableFunction并实现评估方法。 表函数的行为由其评估方法确定。 必须将评估方法声明为公开并命名为eval。 通过实现多个名为eval的方法,可以重载TableFunction。 评估方法的参数类型确定表函数的所有有效参数。 返回表的类型由TableFunction的通用类型确定。 评估方法使用 collect(T)方法发出输出行。

    自定义聚合函数(UDAGGs)将一个表聚合为一个标量值。

    聚合函数适合用于累计的工作,假设您有一个包含饮料数据的表。该表由三列组成:id、name和price,共计5行。想象一下,你需要找到所有饮料的最高价格。执行max()聚合。您需要检查5行中的每一行,结果将是单个数值。

    用户定义的聚合函数是通过扩展AggregateFunction类来实现的。AggregateFunction的工作原理如下。首先,它需要一个累加器,这个累加器是保存聚合中间结果的数据结构。通过调用AggregateFunction的createAccumulator()方法来创建一个空的累加器。随后,对每个输入行调用该函数的accumulator()方法来更新累加器。处理完所有行之后,将调用函数的getValue()方法来计算并返回最终结果。

    用户定义的表聚合函数(UDTAGGs)将一个表(具有一个或多个属性的一个或多个行)聚合到具有多行和多列的结果表。

    2022-11-23 14:51:03
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    使用自定义函数实现关键字过滤统计 自定义表函数(UDTF) 与自定义的标量函数相似,自定义表函数将零,一个或多个标量值作为输入参数。 但是,与标量函数相比,它可以返回任意数量的行作为输出,而不是单个值。

    为了定义表函数,必须扩展基类TableFunction并实现评估方法。 表函数的行为由其评估方法确定。 必须将评估方法声明为公开并命名为eval。 通过实现多个名为eval的方法,可以重载TableFunction。 评估方法的参数类型确定表函数的所有有效参数。 返回表的类型由TableFunction的通用类型确定。 评估方法使用 collect(T)方法发出输出行。 定义一个过滤字符串 记下关键字 的自定义表函数

    2022-11-23 11:49:56
    赞同 展开评论 打赏
  • 网站:http://ixiancheng.cn/ 微信订阅号:小马哥学JAVA

    首先是肯定可用使用sql关键字的操作,然后说的事件的话,最好是这个事件操作的时候有字段的变化,然后根据具体的字段变化来过滤掉这个事件的处理

    2022-11-22 19:35:08
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    SQL Server在电子商务中的应用与实践 立即下载
    GeoMesa on Spark SQL 立即下载
    原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载