开发者社区> 问答> 正文

MaxCompute快速入门:编写UDF



MaxCompute 的 UDF 包括:UDF,UDAF,UDTF 三种函数。通常情况下,此三种函数被统称为 UDF。如果您使用 Maven,可以从 Maven 库 中搜索 odps-sdk-udf 获取不同版本的 Java SDK。
相关配置信息如下所示:

  1. <dependency>
  2.     <groupId>com.aliyun.odps</groupId>
  3.     <artifactId>odps-sdk-udf</artifactId>
  4.     <version>0.20.7-public</version>
  5. </dependency>

注意
  • groupId,artifactId,version 信息请以在 Maven 库中查询到的信息为准 。

  • UDF 目前只支持 Java 语言接口,如果您想编写 UDF 程序,可以通过 添加资源 的方式将 UDF 代码上传到项目空间中,使用 注册函数 语句创建 UDF。

  • 本章节中会分别给出 UDF,UDAF,UDTF 的代码示例,运行 UDF 的示例请参见 UDF 开发插件介绍

  • Java 和 MaxCompute 的数据类型对应关系,请参见 参数与返回值类型


UDF 示例


下面将为您介绍实现字符小写转换功能的 UDF 完整流程。

操作步骤


  1. 编写代码。
    按照 MaxCompute UDF 框架的规定,实现函数功能,并进行编译。代码实现,示例如下:package org.alidata.odps.udf.examples;
  2. import com.aliyun.odps.udf.UDF;
  3. public final class Lower extends UDF {
  4. public String evaluate(String s) {
  5. if (s == null) { return null; }
  6. return s.toLowerCase();
  7. }
  8. }

将这个 Jar 包命名为 my_lower.jar 。
添加资源。
在运行 UDF 之前,必须指定引用的 UDF 代码。您编写的代码通过资源的形式添加到 MaxCompute 中。Java UDF必须被打成 Jar 包,以 Jar 资源添加到 MaxCompute 中,UDF 框架会自动加载 Jar 包,运行用户自定义的 UDF。MaxCompute MapReduce 也用到了资源这一特有概念,详情请参见 MapReduce
执行如下命令:
  1. add jar my_lower.jar;
  2. -- 如果存在同名的资源请将这个 jar 包重命名,
  3. -- 并注意修改下面示例命令中相关 jar 包的名字;
  4. -- 又或者直接使用-f选项覆盖原有的 jar 资源

注册 UDF 函数。
您的 Jar 包被上传后,使得 MaxCompute 有条件自动获取代码并运行。但此时仍然无法使用这个 UDF,因为 MaxCompute中并没有关于这个 UDF 的任何信息。因此需要您在 MaxCompute 中注册一个唯一的函数名,并指定这个函数名与哪个 Jar资源的哪个函数对应。关于如何注册 UDF,请参见 注册函数
运行如下命令:
  1. CREATE FUNCTION test_lower AS org.alidata.odps.udf.examples.Lower USING my_lower.jar;

在 SQL 中使用此函数:
  1. select test_lower('A') from my_test_table;


UDAF 示例


UDAF 的注册方式与 UDF 基本相同,使用方式与内建函数中的 聚合函数 相同。计算平均值的 UDAF 的代码示例,如下所示:
  1. package org.alidata.odps.udf.examples;
  2. import com.aliyun.odps.io.LongWritable;
  3. import com.aliyun.odps.io.Text;
  4. import com.aliyun.odps.io.Writable;
  5. import com.aliyun.odps.udf.Aggregator;
  6. import com.aliyun.odps.udf.UDFException;
  7. /**
  8. * project: example_project
  9. * table: wc_in2
  10. * partitions: p2=1,p1=2
  11. * columns: colc,colb,cola
  12. */
  13. public class UDAFExample extends Aggregator {
  14.   @Override
  15.   public void iterate(Writable arg0, Writable[] arg1) throws UDFException {
  16.     LongWritable result = (LongWritable) arg0;
  17.     for (Writable item : arg1) {
  18.       Text txt = (Text) item;
  19.       result.set(result.get() + txt.getLength());
  20.     }
  21.   }
  22.   @Override
  23.   public void merge(Writable arg0, Writable arg1) throws UDFException {
  24.     LongWritable result = (LongWritable) arg0;
  25.     LongWritable partial = (LongWritable) arg1;
  26.     result.set(result.get() + partial.get());
  27.   }
  28.   @Override
  29.   public Writable newBuffer() {
  30.     return new LongWritable(0L);
  31.   }
  32.   @Override
  33.   public Writable terminate(Writable arg0) throws UDFException {
  34.     return arg0;
  35.   }
  36. }


UDTF 示例


UDTF 的注册和使用方式与 UDF 相同。代码示例如下:
  1. package org.alidata.odps.udtf.examples;
  2. import com.aliyun.odps.udf.UDTF;
  3. import com.aliyun.odps.udf.UDTFCollector;
  4. import com.aliyun.odps.udf.annotation.Resolve;
  5. import com.aliyun.odps.udf.UDFException;
  6. // TODO define input and output types, e.g., "string,string->string,bigint".
  7. @Resolve({"string,bigint->string,bigint"})
  8. public class MyUDTF extends UDTF {
  9.   @Override
  10.   public void process(Object[] args) throws UDFException {
  11.     String a = (String) args[0];
  12.     Long b = (Long) args[1];
  13.     for (String t: a.split("\\s+")) {
  14.       forward(t, b);
  15.     }
  16.   }
  17. }

MaxCompute 提供了很多内建函数来满足您的计算需求,同时您还可以通过创建自定义函数来满足不同的计算需求。详情请参见 创建自定义函数

展开
收起
行者武松 2017-10-23 15:17:47 2428 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Data+AI时代大数据平台应该如何建设 立即下载
大数据AI一体化的解读 立即下载
极氪大数据 Serverless 应用实践 立即下载