请问一下各位大佬,java 写的flink程序提交到yarn集群,application name都
flink1.13.3源码:
flink-yarn项目下的YarnClusterDescriptor类
session pre-job yarn-application yarn的名称写死了。除非自己改源码编译jar包。 -ynm参数是无效的。我也烦死了。启动很多yarn,完全不知道那个是那个。
为啥不用application-type来区分 三种启动模式。太无语了。
package org.apache.flink.yarn;
@Override
public ClusterClientProvider
deployApplicationCluster( final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException {
checkNotNull(clusterSpecification);
checkNotNull(applicationConfiguration);
final YarnDeploymentTarget deploymentTarget =
YarnDeploymentTarget.fromConfig(flinkConfiguration);
if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
throw new ClusterDeploymentException(
'Couldn't deploy Yarn Application Cluster.'
+ ' Expected deployment.target='
+ YarnDeploymentTarget.APPLICATION.getName()
+ ' but actual one was \''
+ deploymentTarget.getName()
+ '\'');
}
applicationConfiguration.applyToConfiguration(flinkConfiguration);
final List pipelineJars =
flinkConfiguration
.getOptional(PipelineOptions.JARS)
.orElse(Collections.emptyList());
Preconditions.checkArgument(pipelineJars.size() == 1, 'Should only have one jar');
try {
return deployInternal(
clusterSpecification,
'Flink Application Cluster',
YarnApplicationClusterEntryPoint.class.getName(),
null,
false);
} catch (Exception e) {
throw new ClusterDeploymentException('Couldn't deploy Yarn Application Cluster', e);
}
}
@Override
public ClusterClientProvider deploySessionCluster(
ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
'Flink session cluster',
getYarnSessionClusterEntrypoint(),
null,
false);
} catch (Exception e) {
throw new ClusterDeploymentException('Couldn't deploy Yarn session cluster', e);
}
}
@Override
public ClusterClientProvider deployJobCluster(
ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)
throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
'Flink per-job cluster',
getYarnJobClusterEntrypoint(),
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException('Could not deploy Yarn job cluster.', e);
}
}
赞0
踩0