咨询一下Flink,RichFunction函数里的open方法的参数Configuration, 怎么传递?
void open(Configuration parameters)
这么写也传递不过去
Configuration conf = new Configuration();
conf.setString("name", "wsy");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
在open()时, 传递进去的是一个空Configuration, 这样是拿不到的
放到全局参数中去拿 ,此回答整理自钉群“【②群】Apache Flink China社区”
在Flink中,open()
方法的参数Configuration
是通过RichFunction
的子类构造函数传递的。您可以通过以下方式传递Configuration
对象:
public class MyRichFunction extends RichFunction {
private Configuration conf;
public MyRichFunction(Configuration conf) {
this.conf = conf;
}
@Override
public void open(Configuration parameters) throws Exception {
// 在这里可以使用conf对象进行配置操作
}
// 其他方法...
}
然后,在创建StreamExecutionEnvironment
时,将Configuration
对象传递给MyRichFunction
的构造函数:
Configuration conf = new Configuration();
conf.setString("name", "wsy");
MyRichFunction myRichFunction = new MyRichFunction(conf);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
这样,您就可以在open()
方法中使用传递的Configuration
对象进行配置操作了。
在 Flink 中,open(Configuration parameters)
方法的 parameters
参数是由 Flink 系统自动填充的,用于传递给用户自定义函数运行时的配置信息。如果你想要自定义并传递额外的配置,应该通过 StreamExecutionEnvironment
设置全局配置或者通过 RuntimeContext
在运行时获取。
// 设置全局配置(这些配置会被分发到各个 TaskManager 上,不一定能直接传递给 open 方法)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(new Configuration());
// 添加自定义参数
env.getConfig().getConfiguration().setString("name", "wsy");
// 在 RichFunction 中获取配置
public class MyRichMapFunction extends RichMapFunction<String, String> {
private transient String name;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 从 RuntimeContext 获取全局配置
Configuration globalConfig = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
name = globalConfig.getString("name");
}
// ...
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。