开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

有没有谁知道如何通过 flink api 方式,将jar任务提交到 Yarn 上以Applicati

有没有谁知道如何通过 flink api 方式,将jar任务提交到 Yarn 上以Application运行?就是用本地Java代码替代"flink run-application ..."命令。

展开
收起
芬奇福贵 2023-08-17 16:01:01 63 0
1 条回答
写回答
取消 提交回答
  • 是的,你可以使用 Flink API 将 Flink 程序以 Application 的方式提交到 Yarn 上。下面是一个示例代码,展示了如何使用 Flink API 进行提交:
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.client.program.PackagedProgram;
    import org.apache.flink.client.program.ProgramInvocationException;
    import org.apache.flink.client.yarn.ApplicationIdProvider;
    import org.apache.flink.client.yarn.YarnClusterClient;
    import org.apache.flink.client.yarn.YarnClusterDescriptor;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.runtime.yarn.AbstractYarnClusterDescriptor;
    import org.apache.hadoop.yarn.api.records.ApplicationId;

    public class SubmitToFlinkYarnApp {

    public static void main(String[] args) throws Exception {
        // 设置 Flink 程序的执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
        // 设置 Flink 程序的参数
        ParameterTool params = ParameterTool.fromArgs(args);
        env.getConfig().setGlobalJobParameters(params);
    
        // 创建 YarnClusterDescriptor 对象
        YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(new Configuration(),
                YarnClusterDescriptor.DEFAULT_YARN_PROPERTIES_FILE);
    
        // 设置配置信息
        Configuration flinkConfig = new Configuration();
        flinkConfig.addAll(clusterDescriptor.getFlinkConfiguration());
    
        // 设置 Yarn 的配置参数
        clusterDescriptor.setFlinkConfiguration(flinkConfig);
        clusterDescriptor.setLocalJarPath(new Path("/path/to/your/flink-job.jar"));
    
        // 提交 Flink 程序到 Yarn
        PackagedProgram program = new PackagedProgram(new File("/path/to/your/flink-job.jar"));
        try {
            clusterDescriptor.deploySessionCluster(clusterDescriptor.getClusterSpecification(), program, false);
        } catch (ProgramInvocationException e) {
            e.printStackTrace();
        }
    
        // 获取 ApplicationId
        ApplicationIdProvider applicationIdProvider = clusterDescriptor.getClusterClient().getApplicationId();
        ApplicationId applicationId = applicationIdProvider.get();
    
        // 输出 ApplicationId
        System.out.println("Submitted Flink application with id " + applicationId);
    
        // 关闭集群
        clusterDescriptor.close();
    }
    

    }

    上述代码中,我们使用 YarnClusterDescriptor 对象来设置 Yarn 相关的配置信息,然后调用 deploySessionCluster 方法将程序提交到 Yarn 集群中。程序会以 Application 的方式在 Yarn 上运行。
    请确保在代码中替换 /path/to/your/flink-job.jar 的部分为你实际的 Flink 程序的 JAR 文件路径。
    这样,你就可以使用上述代码将 Flink 程序以 Application 的方式提交到 Yarn 上了。

    此答案来自钉钉群“【2】Apache Flink China 社区"

    2023-08-17 20:47:43
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    低代码开发师(初级)实战教程 立即下载
    冬季实战营第三期:MySQL数据库进阶实战 立即下载
    阿里巴巴DevOps 最佳实践手册 立即下载