可以实现的,可以使用flink sql Filter进行实现
给你看个flink sql 过滤列子吧:
public class UdtfJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings streamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment streamTabelEnv = StreamTableEnvironment.create(streamEnv, streamSettings);
KafkaTabelSource kafkaTabelSource = new KafkaTabelSource();
streamTabelEnv.registerTableSource("kafkaDataStream", kafkaTabelSource);//使用自定义TableSource
//注册自定义函数定义三个关键字:"KeyWord","WARNING","illegal"
streamTabelEnv.registerFunction("CountKEY", new KyeWordCount(new String[]{"KeyWord","WARNING","illegal"}));
//编写SQL
Table wordWithCount = streamTabelEnv.sqlQuery("SELECT key,COUNT(countv) AS countsum FROM kafkaDataStream LEFT JOIN LATERAL TABLE(CountKEY(response)) as T(key, countv) ON TRUE GROUP BY key");
//直接输出Retract流
streamTabelEnv.toRetractStream(wordWithCount, Row.class).print();
streamTabelEnv.execute("BLINK STREAMING QUERY");
}
}
Filter:DataStream -> DataStream,每个数据元执行布尔函数,只保存函数返回 true 的数据元。过滤掉零值的过滤器:
dataStream.filter { _ != 0 }
使用自定义函数实现关键字过滤统计 自定义表函数(UDTF) 与自定义的标量函数相似,自定义表函数将零,一个或多个标量值作为输入参数。 但是,与标量函数相比,它可以返回任意数量的行作为输出,而不是单个值。
为了定义表函数,必须扩展基类TableFunction并实现评估方法。 表函数的行为由其评估方法确定。 必须将评估方法声明为公开并命名为eval。 通过实现多个名为eval的方法,可以重载TableFunction。 评估方法的参数类型确定表函数的所有有效参数。 返回表的类型由TableFunction的通用类型确定。 评估方法使用 collect(T)方法发出输出行。
自定义聚合函数(UDAGGs)将一个表聚合为一个标量值。
聚合函数适合用于累计的工作,假设您有一个包含饮料数据的表。该表由三列组成:id、name和price,共计5行。想象一下,你需要找到所有饮料的最高价格。执行max()聚合。您需要检查5行中的每一行,结果将是单个数值。
用户定义的聚合函数是通过扩展AggregateFunction类来实现的。AggregateFunction的工作原理如下。首先,它需要一个累加器,这个累加器是保存聚合中间结果的数据结构。通过调用AggregateFunction的createAccumulator()方法来创建一个空的累加器。随后,对每个输入行调用该函数的accumulator()方法来更新累加器。处理完所有行之后,将调用函数的getValue()方法来计算并返回最终结果。
用户定义的表聚合函数(UDTAGGs)将一个表(具有一个或多个属性的一个或多个行)聚合到具有多行和多列的结果表。
使用自定义函数实现关键字过滤统计 自定义表函数(UDTF) 与自定义的标量函数相似,自定义表函数将零,一个或多个标量值作为输入参数。 但是,与标量函数相比,它可以返回任意数量的行作为输出,而不是单个值。
为了定义表函数,必须扩展基类TableFunction并实现评估方法。 表函数的行为由其评估方法确定。 必须将评估方法声明为公开并命名为eval。 通过实现多个名为eval的方法,可以重载TableFunction。 评估方法的参数类型确定表函数的所有有效参数。 返回表的类型由TableFunction的通用类型确定。 评估方法使用 collect(T)方法发出输出行。 定义一个过滤字符串 记下关键字 的自定义表函数
首先是肯定可用使用sql关键字的操作,然后说的事件的话,最好是这个事件操作的时候有字段的变化,然后根据具体的字段变化来过滤掉这个事件的处理
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。