flink1.11 在TableEnvironment环境中注册并使用自定义的Aggregate Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment 注册和使用则是正常,这应该说明自定义的函数是ok的)
org.apache.flink.table.api.TableException: Aggregate functions are not updated to the new type system yet. at org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84) at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211) at java.util.function.Function.lambda$andThen$1(Function.java:88) at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178) at org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651) at org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616) at org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511) at org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685) at com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48) at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)
// 以下是代码 // main EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build();
TableEnvironment tEnv = TableEnvironment.create(envSettings);
// 注册source table, jdbc table source tEnv.executeSql("CREATE TABLE wx_event_log (....) with ('connect.type'='jdbc'),....");
// 注册sink table,csv table sink tEnv.executeSql("CREATE TABLE wx_data_statistics (....) with ('connect.type'='filesystem','format.type'='csv',.....)");
// 注册agg function tEnv.createTemporarySystemFunction("firSendMsgFunc",new FirstSendMsgFunc());
Table table2 = tEnv.sqlQuery("select from_user,create_time from wx_event_log where msg_type='text' and create_time between '2020-03-20' and '2020-03-21'");
table2.groupBy($("from_user"))
.aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today")) .select($("from_user"),$("first_send_msg_today")) .executeInsert("wx_data_statistics");
// 自定义agg function类 public class FirstSendMsgFunc extends AggregateFunction<LocalDateTime,CountDTO> {
public void accumulate(CountDTO acc, LocalDateTime createTime) { if (acc.getDateTime() == null) { acc.setDateTime(createTime); } else if (acc.getDateTime().isAfter(createTime)) { acc.setDateTime(createTime); } }
@Override public LocalDateTime getValue(CountDTO acc) { return acc.getDateTime(); }
@Override public CountDTO createAccumulator() { return new CountDTO(); } }
// accumulate pojo 类 public class CountDTO implements Serializable {
private Integer count;
private LocalDateTime dateTime;
public Integer getCount() { return count; }
public void setCount(Integer count) { this.count = count; }
public LocalDateTime getDateTime() { return dateTime; }
public void setDateTime(LocalDateTime dateTime) { this.dateTime = dateTime; } }
*来自志愿者整理的flink邮件归档
1.11 版本上 TableEnvironment#createTemporarySystemFunction 接口暂时还不支持 AggregateFunction。 你说 StreamTableEnvironment 可以,我估计你用的是 StreamTableEnvironment#registerFunction, 这个是支持 AggregateFunction 的。
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。