有没有谁知道如何通过 flink api 方式,将jar任务提交到 Yarn 上以Application运行?就是用本地Java代码替代"flink run-application ..."命令。
是的,你可以使用 Flink API 将 Flink 程序以 Application 的方式提交到 Yarn 上。下面是一个示例代码,展示了如何使用 Flink API 进行提交:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.yarn.ApplicationIdProvider;
import org.apache.flink.client.yarn.YarnClusterClient;
import org.apache.flink.client.yarn.YarnClusterDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.yarn.AbstractYarnClusterDescriptor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
public class SubmitToFlinkYarnApp {
public static void main(String[] args) throws Exception {
// 设置 Flink 程序的执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 设置 Flink 程序的参数
ParameterTool params = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(params);
// 创建 YarnClusterDescriptor 对象
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(new Configuration(),
YarnClusterDescriptor.DEFAULT_YARN_PROPERTIES_FILE);
// 设置配置信息
Configuration flinkConfig = new Configuration();
flinkConfig.addAll(clusterDescriptor.getFlinkConfiguration());
// 设置 Yarn 的配置参数
clusterDescriptor.setFlinkConfiguration(flinkConfig);
clusterDescriptor.setLocalJarPath(new Path("/path/to/your/flink-job.jar"));
// 提交 Flink 程序到 Yarn
PackagedProgram program = new PackagedProgram(new File("/path/to/your/flink-job.jar"));
try {
clusterDescriptor.deploySessionCluster(clusterDescriptor.getClusterSpecification(), program, false);
} catch (ProgramInvocationException e) {
e.printStackTrace();
}
// 获取 ApplicationId
ApplicationIdProvider applicationIdProvider = clusterDescriptor.getClusterClient().getApplicationId();
ApplicationId applicationId = applicationIdProvider.get();
// 输出 ApplicationId
System.out.println("Submitted Flink application with id " + applicationId);
// 关闭集群
clusterDescriptor.close();
}
}
上述代码中,我们使用 YarnClusterDescriptor 对象来设置 Yarn 相关的配置信息,然后调用 deploySessionCluster 方法将程序提交到 Yarn 集群中。程序会以 Application 的方式在 Yarn 上运行。
请确保在代码中替换 /path/to/your/flink-job.jar 的部分为你实际的 Flink 程序的 JAR 文件路径。
这样,你就可以使用上述代码将 Flink 程序以 Application 的方式提交到 Yarn 上了。此回答整理自钉群“【②群】Apache Flink China社区”
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。