开发者社区> 问答> 正文

Pandas UDF处理过的数据sink问题

使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下 

@udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],  result_type=DataTypes.ROW(  [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),  DataTypes.FIELD('aveBuy', DataTypes.INT())),  func_type='pandas')  def orderCalc(code, amount): 

df = pd.DataFrame({'code': code, 'amount': amount}) 

pandas 数据处理后输入另一个dataframe output 

return (output['buyQtl'], output['aveBuy']) 

定义了csv的sink如下 

create table csvSink (  buyQtl BIGINT,  aveBuy INT   ) with (  'connector.type' = 'filesystem',  'format.type' = 'csv',  'connector.path' = 'e:/output'  ) 

然后进行如下的操作: 

result_table = t_env.sql_query("""  select orderCalc(code, amount)  from some_source  group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount  """)  result_table.execute_insert("csvSink") 

在执行程序的时候提示没法入库 

py4j.protocol.Py4JJavaError: An error occurred while calling  o98.executeInsert. 

: org.apache.flink.table.api.ValidationException: Column types of query  result and sink for registered table  'default_catalog.default_database.csvSink' do not match. 

Cause: Different number of columns. 

Query schema: [EXPR$0: ROW<buyQtl BIGINT, aveBuy INT >] 

Sink schema: [buyQtl: BIGINT, aveBuy: INT] 

at  org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx  ception(DynamicSinkUtils.java:304) 

at  org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply  ImplicitCast(DynamicSinkUtils.java:134) 

是UDF的输出结构不对吗,还是需要调整sink table的结构?*来自志愿者整理的flink邮件归档

展开
收起
又出bug了-- 2021-12-02 11:25:08 764 0
1 条回答
写回答
取消 提交回答
  • 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。

    你可以尝试将sql语句改成以下形式:

    select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1) from some_source group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount

    此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”*来自志愿者整理的FLINK邮件归档

    2021-12-02 14:10:23
    赞同 展开评论 打赏
问答标签:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
中文:即学即用的Pandas入门与时间序列分析 立即下载
即学即用的Pandas入门与时间序列分析 立即下载
PyODPS架构以及DataFrame实现原理 立即下载