开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

如何通过 flink api 方式,将jar任务提交到Yarn上以Application运行?

有没有谁知道如何通过 flink api 方式,将jar任务提交到 Yarn 上以Application运行?就是用本地Java代码替代"flink run-application ..."命令。

展开
收起
三分钟热度的鱼 2023-08-22 20:49:08 264 0
1 条回答
写回答
取消 提交回答
  • 是的,你可以使用 Flink API 将 Flink 程序以 Application 的方式提交到 Yarn 上。下面是一个示例代码,展示了如何使用 Flink API 进行提交:
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.client.program.PackagedProgram;
    import org.apache.flink.client.program.ProgramInvocationException;
    import org.apache.flink.client.yarn.ApplicationIdProvider;
    import org.apache.flink.client.yarn.YarnClusterClient;
    import org.apache.flink.client.yarn.YarnClusterDescriptor;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.runtime.yarn.AbstractYarnClusterDescriptor;
    import org.apache.hadoop.yarn.api.records.ApplicationId;

    public class SubmitToFlinkYarnApp {

    public static void main(String[] args) throws Exception {
        // 设置 Flink 程序的执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
        // 设置 Flink 程序的参数
        ParameterTool params = ParameterTool.fromArgs(args);
        env.getConfig().setGlobalJobParameters(params);
    
        // 创建 YarnClusterDescriptor 对象
        YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(new Configuration(),
                YarnClusterDescriptor.DEFAULT_YARN_PROPERTIES_FILE);
    
        // 设置配置信息
        Configuration flinkConfig = new Configuration();
        flinkConfig.addAll(clusterDescriptor.getFlinkConfiguration());
    
        // 设置 Yarn 的配置参数
        clusterDescriptor.setFlinkConfiguration(flinkConfig);
        clusterDescriptor.setLocalJarPath(new Path("/path/to/your/flink-job.jar"));
    
        // 提交 Flink 程序到 Yarn
        PackagedProgram program = new PackagedProgram(new File("/path/to/your/flink-job.jar"));
        try {
            clusterDescriptor.deploySessionCluster(clusterDescriptor.getClusterSpecification(), program, false);
        } catch (ProgramInvocationException e) {
            e.printStackTrace();
        }
    
        // 获取 ApplicationId
        ApplicationIdProvider applicationIdProvider = clusterDescriptor.getClusterClient().getApplicationId();
        ApplicationId applicationId = applicationIdProvider.get();
    
        // 输出 ApplicationId
        System.out.println("Submitted Flink application with id " + applicationId);
    
        // 关闭集群
        clusterDescriptor.close();
    }
    

    }

    上述代码中,我们使用 YarnClusterDescriptor 对象来设置 Yarn 相关的配置信息,然后调用 deploySessionCluster 方法将程序提交到 Yarn 集群中。程序会以 Application 的方式在 Yarn 上运行。
    请确保在代码中替换 /path/to/your/flink-job.jar 的部分为你实际的 Flink 程序的 JAR 文件路径。
    这样,你就可以使用上述代码将 Flink 程序以 Application 的方式提交到 Yarn 上了。此回答整理自钉群“【②群】Apache Flink China社区”

    2023-08-22 21:01:05
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Spring Boot2.0实战Redis分布式缓存 立即下载
    CUDA MATH API 立即下载
    API PLAYBOOK 立即下载