开发者社区> 问答> 正文

UDF 怎么根据不同的参数返回不同的类型?

我使用的是 Flink 1.10 Blink Planer。 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。

为什么我想要这个功能: 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多 场景2: 我的数据是一个 Json ,问题同上。

在场景1中,我改了下 Flink 的源码,在 ScalarFunction 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化 @Override public void initialize(LogicalType[] sqlTypes, String[] paramNames) { // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型 } 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) 这个类型不进行 cast 是无法直接使用的。 public class TimestampTest extends ScalarFunction {

public Object eval(long timestamp, String pattern, int num) { Timestamp timestamp1 = new Timestamp(timestamp); SimpleDateFormat sdf = new SimpleDateFormat(pattern); if (num < 4) { //返回 STRING 类型 return String.valueOf(timestamp); } if (num < 6) { //返回 BIGINT return timestamp - 100; } if (num < 8) { //返回 DOUBLE double ss = 0.9; return (double) timestamp + ss; } //返回 STRING return sdf.format(timestamp1); } }

*来自志愿者整理的flink邮件归档

展开
收起
游客nnqbtnagn7h6s 2021-12-06 20:22:34 1515 0
1 条回答
写回答
取消 提交回答
  • 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。

    但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。

    如果有理解不对之处,敬请指出。

    *来自志愿者整理的flink邮件归档

    2021-12-06 21:47:41
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载