开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请教一下、flink on yarn模式下如何让提交的Application任务中加载的一些配置文

请教一下、flink on yarn模式下如何让提交的Application任务中加载的一些配置文件及jar包打到hdfs的/user/flink/.flink的目录下

展开
收起
十一0204 2023-04-11 09:12:20 562 0
3 条回答
写回答
取消 提交回答
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    在阿里云Flink on YARN模式下,可以通过以下步骤将应用程序依赖的配置文件和jar包上传到HDFS的/user/flink/.flink目录下:

    1. 将需要上传的配置文件和jar包打包成一个zip文件,例如app-dependencies.zip

    2. 在Flink on YARN的配置文件中,将yarn.application.classpath参数设置为./app-dependencies.zip。这样,Flink on YARN会将app-dependencies.zip文件上传到HDFS的/user/flink/.flink目录下,并将其添加到应用程序的classpath中。

    3. 在应用程序中,可以通过以下方式访问上传的配置文件和jar包:

      • 配置文件:可以使用Flink的ExecutionEnvironment.readTextFile()ExecutionEnvironment.readCsvFile()等方法读取HDFS上的配置文件。

      • jar包:可以使用Flink的ExecutionEnvironment.registerCachedFile()方法将HDFS上的jar包注册为本地缓存文件,然后使用ExecutionEnvironment.getRegisteredCachedFile()方法获取缓存文件的路径。

    2023-04-26 22:38:04
    赞同 展开评论 打赏
  • 值得去的地方都没有捷径

    Flink on Yarn模式下可以通过以下步骤来实现将配置文件和JAR包打包到HDFS的/user/flink/.flink目录下:

    将配置文件和JAR包上传至HDFS中,例如上传到/user/flink/config和/user/flink/lib目录中。

    提交Flink Application任务时,通过-yt 参数指定YARN的配置文件所在目录。

    在Flink Application中,获取HDFS的/user/flink/.flink目录路径,将配置文件和JAR包放到该目录下即可。

    下面是具体步骤:

    将配置文件和JAR包上传至HDFS中,例如上传到/user/flink/config和/user/flink/lib目录中。 hdfs dfs -mkdir -p /user/flink/config hdfs dfs -put flink-conf.yaml /user/flink/config hdfs dfs -mkdir -p /user/flink/lib hdfs dfs -put flink-custom-connector.jar /user/flink/lib 在提交Flink Application任务的命令中,通过-yt 参数指定YARN的配置文件所在目录,例如yarn-client.sh命令如下: bin/yarn-session.sh -n 2 -tm 1024 -s 2 -d -jm 1024 -tm_sp 1024 -yt /path/to/yarn/conf 在Flink Application中,获取HDFS的/user/flink/.flink目录路径,将配置文件和JAR包放到该目录下即可,例如: class MyFlinkApp { public static void main(String[] args) throws Exception { // 获取Flink配置对象 Configuration flinkConf = GlobalConfiguration.loadConfiguration();

        // 获取YARN配置对象
        YarnConfiguration yarnConf = new YarnConfiguration();
        flinkConf.addAll(yarnConf);
    
        // 获取HDFS客户端
        FileSystem fs = FileSystem.get(yarnConf);
    
        // 获取当前用户的HDFS目录
        String userHomeDir = fs.getHomeDirectory().toString();
    
        // 获取Flink的配置目录
        String flinkConfDir = flinkConf.getString(CoreOptions.FLINK_CONF_DIR);
    
        // 获取Flink的lib目录
        String flinkLibDir = flinkConfDir + "/../lib";
    
        // HDFS上的Flink配置目录
        String flinkHdfsConfDir = userHomeDir + "/.flink/config";
        Path flinkHdfsConfPath = new Path(flinkHdfsConfDir);
        if (!fs.exists(flinkHdfsConfPath)) {
            fs.mkdirs(flinkHdfsConfPath);
        }
    
        // HDFS上的Flink lib目录
        String flinkHdfsLibDir = userHomeDir + "/.flink/lib";
        Path flinkHdfsLibPath = new Path(flinkHdfsLibDir);
        if (!fs.exists(flinkHdfsLibPath)) {
            fs.mkdirs(flinkHdfsLibPath);
        }
    
        // 上传配置文件到HDFS
        Path flinkConfPath = new Path(flinkConfDir);
        if (fs.exists(flinkConfPath)) {
            FileStatus[] fileStatuses = fs.listStatus(flinkConfPath);
            for (FileStatus fileStatus : fileStatuses) {
                Path confFilePath = fileStatus.getPath();
                String confFileName = confFilePath.getName();
                Path targetPath = new Path(flinkHdfsConfDir + "/" + confFileName);
                fs.copyFromLocalFile(confFilePath, targetPath);
            } 
        }
    
        // 上传JAR包到HDFS
        Path flinkLibPath = new Path(flinkLibDir);
        if (fs.exists(flinkLibPath)) {
            FileStatus[] fileStatuses = fs.listStatus(flinkLibPath);
            for (FileStatus fileStatus : fileStatuses) {
                Path libFilePath = fileStatus.getPath();
                String libFileName = libFilePath.getName();
                Path targetPath = new Path(flinkHdfsLibDir + "/" + libFileName);
                fs.copyFromLocalFile(libFilePath, targetPath);
            }
        }
    
        // 设置用户的Flink配置目录和lib目录
        flinkConf.set(CoreOptions.FLINK_CONF_DIR, flinkHdfsConfDir);
        flinkConf.set(CoreOptions.FLINK_LIB_DIR, flinkHdfsLibDir);
    
        // 创建Flink流处理执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // ...
    }
    

    } 通过以上步骤,即可将配置文件和JAR包打包到HDFS的/user/flink/.flink目录下,供Flink Application任务使用。

    2023-04-17 17:10:28
    赞同 展开评论 打赏
  • 坚持这件事孤独又漫长。
    • 可以使用--yarn-files参数将需要加载的配置文件及jar包打到HDFS的/user/flink/.flink目录下。具体操作步骤如下:

      1. 将需要加载的配置文件及jar包放到一个目录中(例如conf文件夹和lib文件夹)。

      2. 使用以下命令将这个目录上传到HDFS的/user/flink/.flink目录下:

      hadoop fs -put <path_to_folder> /user/flink/.flink/
      
      1. 在提交Application任务时,加上--yarn-files参数,并指定需要上传的文件或目录的路径(如果有多个文件或目录需要上传,可以使用逗号分隔):
      ./bin/flink run -m yarn-cluster -yn 2 --yarn-files hdfs:///user/flink/.flink/conf,hdfs:///user/flink/.flink/lib/application.jar -c com.example.ApplicationMain /path/to/application.jar
      

      上述命令将会将/user/flink/.flink/conf文件夹和/user/flink/.flink/lib文件夹中的所有文件及/application.jar打包成一个jar包,然后上传到Yarn集群上,并在执行Application任务时将其加入classpath中。

    2023-04-11 11:05:40
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载