(8)FlinkSQL自定义UDF

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink提供了自定义函数的基础能力,在需要满足特殊业务场景需求时,根据自身需要按需定制自己的UDF 下面将简单演示一个UDF的定义和UDF的使用过程:

Flink提供了自定义函数的基础能力,在需要满足特殊业务场景需求时,根据自身需要按需定制自己的UDF 下面将简单演示一个UDF的定义和UDF的使用过程:
(1)定义一个UDF

package com.udf;

import org.apache.flink.table.functions.ScalarFunction;

/**
 * Created by lj on 2022-07-25.
 */
public class TestUDF extends ScalarFunction {
    public String eval(String value) {
        return value + "_udf";
    }
}

(2)使用UDF

public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");
        SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] split = s.split(",");
                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
            }
        });

        // 将流转化为表
        Table table = tableEnv.fromDataStream(waterDS,
                $("id"),
                $("ts"),
                $("vc"),
                $("pt").proctime());

        tableEnv.createTemporaryView("EventTable", table);

/*
        // 1. 直接调用自定义udf 函数
        //        table.select(call(myFunction.class,$("id"))).execute().print();
        // 2. 先注册在使用
        tableEnv.createTemporarySystemFunction("MyLength",myFunction.class);
        //2.1 在使用注册的自定义函数 名称为MyLength
        //        table.select(call("MyLength",$("id"))).execute().print();
        // 2.2 采用sql 的方式进行使用自定义函数
            tableEnv.sqlQuery("select id, MyLength(id) from "+table).execute().print();
* */

        tableEnv.createTemporarySystemFunction("MyLength",TestUDF.class);
        Table result = tableEnv.sqlQuery(
                "SELECT " +
                        "id as componentname, " +                //window_start, window_end,
                        "COUNT(ts) as componentcount ,SUM(ts) as componentsum, " +
                        "MyLength(cast(COUNT(ts) as string)) as testudf " +
                        "FROM TABLE( " +
                        "TUMBLE( TABLE EventTable , " +
                        "DESCRIPTOR(pt), " +
                        "INTERVAL '10' SECOND)) " +
                        "GROUP BY id , window_start, window_end"
        );

        tableEnv.toRetractStream(result, Row.class).print("toRetractStream");       //缩进模式

        env.execute();
    }

(3)应用效果
image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之在使用udf遇到报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
7月前
|
SQL Java 流计算
Flink SQL UDF(用户自定义函数)需要打包成JAR文件并上传到Flink集群中
【1月更文挑战第1天】【1月更文挑战第2篇】Flink SQL UDF(用户自定义函数)需要打包成JAR文件并上传到Flink集群中
337 0
|
6月前
|
存储 分布式计算 大数据
MaxCompute产品使用合集之在sql里调用自定义的udf时,设置一次同时处理的数据行数,是并行执行还是串行执行的
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
SQL Java 应用服务中间件
Apache Doris 自定义C++ UDF之流程详解(1)
Apache Doris 自定义C++ UDF之流程详解(1)
269 0
|
7月前
|
SQL 消息中间件 Apache
Flink报错问题之使用hive udf函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
7月前
|
消息中间件 SQL Java
Flink报错问题之调用udf时报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
7月前
|
SQL 消息中间件 Kafka
Flink报错问题之SQL作业中调用UDTF报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
7月前
|
SQL 消息中间件 分布式数据库
Flink报错问题之flink 1.11指定rowtime字段报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
SQL 缓存 API
FlinkSQL函数
自定义函数分类 函数基本使用 UDF实现要点 UDTF, UDAF, UDATF 实现要点
FlinkSQL函数
|
SQL 分布式计算 Java
如何在 hive udf 中访问配置数据-踩坑记录,方案汇总与对比-udf中可以写sql吗?
如何在 hive udf 中访问配置数据-踩坑记录,方案汇总与对比-udf中可以写sql吗?