目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def drop_fields(message, *fields): import json message = json.loads(message) for field in fields: message.pop(field) return json.dumps(message)
st_env
.form_path("source")
.select("drop_fields(message,'x')")
.insert_into("sink")
message 格式: {“a”:"1","x","2"}
报错参数类型不匹配: Actual:(java.lang.String, java.lang.String) Expected:(org.apache.flink.table.dataformat.BinaryString)
新手入门,请多指教,感谢。
*来自志愿者整理的flink邮件归档
The input types should be as following:
input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。