一、UDF的分类
UDF类型 |
描述 |
UDF(User Defined Scalar Function) |
用户自定义标量值函数。其输入与输出是一对一的关系,即读入一行数据,输出一个值。 |
UDTF(User Defined Table Valued Function) |
自定义表值函数。用于解决调用一次函数输出多行数据的需求。UDTF是唯一能够返回多个字段的自定义函数。UDTF不等于UDT(User Defined Type)。 |
UDAF(User Defined Aggregation Function) |
自定义聚合函数。其输入与输出是多对一的关系,即将多条输入记录聚合成一个输出值。UDAF可以与SQL中的GROUP BY语句联用。具体语法请参见 。 |
二、UDF参数解析
MaxCompute数据类型与Java数据类型的对应关系如下。
注意点:
- 此处ARRAY类型对应的Java类型是List,而不是数组。
- VARCHAR,BINART,STRUCT一些数据类型是ODPS独有的
- Java中对应的数据类型以及返回值数据类型是对象,数据类型首字母需大写。
MaxCompute Type |
Java Type |
TINYINT |
java.lang.Byte |
SMALLINT |
java.lang.Short |
INT |
java.lang.Integer |
BIGINT |
java.lang.Long |
FLOAT |
java.lang.Float |
DOUBLE |
java.lang.Double |
DECIMAL |
java.math.BigDecimal |
BOOLEAN |
java.lang.Boolean |
STRING |
java.lang.String |
VARCHAR |
com.aliyun.odps.data.Varchar |
BINARY |
com.aliyun.odps.data.Binary |
DATETIME |
java.util.Date |
TIMESTAMP |
java.sql.Timestamp |
ARRAY |
java.util.List |
MAP |
java.util.Map |
STRUCT |
com.aliyun.odps.data.Struct |
MaxCompute 2.0版本支持定义Java UDF时,使用Writable类型作为参数和返回值。MaxCompute数据类型和Java Writable类型的映射关系如下。
MaxCompute Type |
Java Writable Type |
TINYINT |
ByteWritable |
SMALLINT |
ShortWritable |
INT |
IntWritable |
BIGINT |
LongWritable |
FLOAT |
FloatWritable |
DOUBLE |
DoubleWritable |
DECIMAL |
BigDecimalWritable |
BOOLEAN |
BooleanWritable |
STRING |
Text |
VARCHAR |
VarcharWritable |
BINARY |
BytesWritable |
DATETIME |
DatetimeWritable |
TIMESTAMP |
TimestampWritable |
INTERVAL_YEAR_MONTH |
IntervalYearMonthWritable |
INTERVAL_DAY_TIME |
IntervalDayTimeWritable |
ARRAY |
N/A |
MAP |
N/A |
STRUCT |
N/A |
MaxCompute SQL Type |
Python 2 Type |
BIGINT |
INT |
STRING |
STR |
DOUBLE |
FLOAT |
BOOLEAN |
BOOL |
DATETIME |
INT |
FLOAT |
FLOAT |
CHAR |
STR |
VARCHAR |
STR |
BINARY |
BYTEARRAY |
DATE |
INT |
DECIMAL |
DECIMAL.DECIMAL |
ARRAY |
LIST |
MAP |
DICT |
STRUCT |
COLLECTIONS.NAMEDTUPLE |
MaxCompute SQL Type |
Python 3 Type |
BIGINT |
INT |
STRING |
UNICODE |
DOUBLE |
FLOAT |
BOOLEAN |
BOOL |
DATETIME |
DATETIME.DATETIME |
FLOAT |
FLOAT |
CHAR |
UNICODE |
VARCHAR |
UNICODE |
BINARY |
BYTES |
DATE |
DATETIME.DATE |
DECIMAL |
DECIMAL.DECIMAL |
ARRAY |
LIST |
MAP |
DICT |
STRUCT |
COLLECTIONS.NAMEDTUPLE |
三、UDF的使用方式
UDF、UDTF、UDAT可进行参考文档
https://help.aliyun.com/document_detail/27867.html?spm=a2c4g.11186623.6.762.463d7468xnFPHb
JAVA UDF
UDF的高级使用:
3.1UDF中的变长参数
java语言:
packagecom.mrtest.cn; importcom.aliyun.odps.udf.UDF; importcom.aliyun.odps.udf.annotation.Resolve; importjava.util.ArrayList; importjava.util.List; "*->array"}) ({publicclassTestUDFextendsUDF { publicListevaluate(String ... s) { Listlist=newArrayList(); for (Stringname : s) { list.add(name); } returnlist; } }
Python语言:
fromodps.udfimportannotate"*->bigint") (classParamFunc(object): defevaluate(self, *nums): sum=0fornuminnums: sum=num+sumreturnsum
3.2UDF的重载
注意事项:对于List与List是不能解析对应的方法的,这种属于类型擦除
packagecom.aliyun.odps.examples.udf; importcom.aliyun.odps.udf.UDF; publicclassUDFExampleextendsUDF { publicStringevaluate(Stringa) { return"s2s:"+a; } publicStringevaluate(Stringa, Stringb) { return"ss2s:"+a+","+b; } publicStringevaluate(Stringa, Stringb, Stringc) { return"sss2s:"+a+","+b+","+c; } }
3.3UDF访问对应文件和表
java语言:
packagecom.aliyun.odps.examples.udf; importcom.aliyun.odps.udf.ExecutionContext; importcom.aliyun.odps.udf.UDF; importcom.aliyun.odps.udf.UDFException; importjava.io.BufferedReader; importjava.io.IOException; importjava.io.InputStream; importjava.io.InputStreamReader; importjava.util.Iterator; publicclassUDFResourceextendsUDF { ExecutionContextctx; longfileResourceLineCount; longtableResource1RecordCount; longtableResource2RecordCount; publicvoidsetup(ExecutionContextctx) throwsUDFException { this.ctx=ctx; try { InputStreamin=ctx.readResourceFileAsStream("file_resource.txt"); BufferedReaderbr=newBufferedReader(newInputStreamReader(in)); Stringline; fileResourceLineCount=0; while ((line=br.readLine()) !=null) { fileResourceLineCount++; } br.close(); Iteratoriterator=ctx.readResourceTable("table_resource1").iterator(); tableResource1RecordCount=0; while (iterator.hasNext()) { tableResource1RecordCount++; iterator.next(); } iterator=ctx.readResourceTable("table_resource2").iterator(); tableResource2RecordCount=0; while (iterator.hasNext()) { tableResource2RecordCount++; iterator.next(); } } catch (IOExceptione) { thrownewUDFException(e); } } /*** project: example_project table: wc_in2 partitions: p2=1,p1=2 columns: colc,colb*/publicStringevaluate(Stringa, Stringb) { return"ss2s:"+a+","+b+"|fileResourceLineCount="+fileResourceLineCount+"|tableResource1RecordCount="+tableResource1RecordCount+"|tableResource2RecordCount="+tableResource2RecordCount; } }
python语言:
#coding: utf-8fromodps.udfimportannotatefromodps.distcacheimportget_cache_file'double -> double') (classCompute(object): def__init__(self): importjson#获取对应文本文件cache_file=get_cache_file('file.txt') dataMat= [] forlineincache_file : curLine=line.strip().split(',') #处理逻辑cache_file.close() #获取对应的表文件records=list(get_cache_table('table_resource1')) forrecordinrecords: self.my_dict[record[0]] = [record[1]] #处理逻辑defevaluate(self, input): #处理逻辑
3.4UDF访问外部网络(VPC、外部网络、专有网络)
https://help.aliyun.com/document_detail/187866.html
3.5UDF使用第三方包
https://help.aliyun.com/document_detail/189752.html
#coding: utf-8# explode.pyfromodps.udfimportannotatefromodps.distcacheimportget_cache_archiveimportdatetimedefinclude_package_path(res_name): importos, sysarchive_files=get_cache_archive(res_name) dir_names=sorted ([os.path.dirname(os.path.normpath(f.name)) forfinarchive_filesif'.dist_info'notinf.name], key=lambdav: len(v)) sys.path.append(os.path.dirname(dir_names[0])) "string->boolean") (classis_workday_udf(object): def__init__(self): include_package_path('chinese-calendar-master.zip') defevaluate(self, date_str): # try:importchinese_calendardate_strs=date_str.split("-") year_num=int(date_strs[0]) month_num=int(date_strs[1]) day_num=int(date_strs[2]) date_num=datetime.date(year=year_num, month=month_num, day=day_num) result=chinese_calendar.is_workday(date_num) returnresult# except:# return True
函数的注册
执行的select的的操作
set odsp.pypy.enabled=false;set odps.isolation.session.enable=true;select my_json('{"info":"11","desc":"a|b","filename":"4b-2a-3c-4d-5b"}')as a;
3.6使用嵌入式开发UDF
java语言:
CREATE TEMPORARY FUNCTION foo AS'com.mypackage.Reverse' USING #CODE ('lang'='JAVA')package com.mypackage;import com.aliyun.odps.udf.UDF;public class Reverse extends UDF { public String evaluate(String input){ if (input ==null) return null; StringBuilder ret = new StringBuilder(); for (int i = input.toCharArray().length-1; i >=0; i--) { ret.append(input.toCharArray()[i]);} return ret.toString();}}#END CODE;
SELECT foo('abdc');
- 嵌入式代码块可以置于USING后或脚本末尾,置于USING后的代码块作用域仅为CREATE TEMPORARY FUNCTION语句。
- CREATE TEMPORARY FUNCTION创建的函数为临时函数,仅在本次执行生效,不会存入MaxCompute的Meta系统。
python语言:
CREATETEMPORARYFUNCTIONfooAS'embedded.UDFTest'USING#CODE ('lang'='PYTHON', 'filename'='embedded')fromodps.udfimportannotate"bigint->bigint") (classUDFTest(object): defevaluate(self, a): returna*a#END CODE;
SELECT foo(4);
- Python代码的缩进需要符合Python语言规范。
- 由于注册Python UDF时AS后的类名需要包含Python源码的文件名,您可以通过’filename’=’embedded’指定一个虚拟文件名。
3.7使用SQL语言定义函数
create sql function my_sum(@a BIGINT, @b BIGINT, @c BIGINT) returns @my_sum BIGINTasbegin @temp := @a + @b; @my_sum := @temp + @c;end;
create sql function my_func(@s STRING)AS if(@s rlike '"git_(m|a)"',1,0);
欢迎加入“MaxCompute开发者社区2群”,点击链接申请加入或扫描二维码