Flink 1.13 CliFrontend之runApplication源码解读
提示:本文的源码是基于Flink 1.13版本文章目录前言一、application模式提交命令二、源码解读1. 整体运行流程2. runApplication方法解读2.1 deployer.run方法解读2.2 clusterDescriptor.deployApplicationCluster方法解读2.3 deployInternal方法解读2.4 startAppMaster方法解读前言
·
提示:本文的源码是基于Flink 1.13版本
文章目录
前言
Flink具有多种提交方式,比如:常用的local模式,stantalone模式,yarn模式,k8s等,其中yarn又分为per-job、session、application三种不同的方式,前两种很早之前就有了,而从Flink 1.11开始出现了一种新的模式,就是application模式。它和per-job的区别主要在于per-job的JobGrap的构建需要在客户端完成,并将依赖项和 JobGraph 发送到集群,这一系列的操作,由于需要大量的网络带宽下载依赖项并将二进制文件发送到集群,会造成客户端消耗大量的资源。尤其在大量用户共享客户端时,问题更加突出,而新的部署方式 Application 模式就解决了该问题。
一、application模式提交命令
./bin/flink run-application
-t yarn-application
-c com.bigdata.Test
-yqu root.users.flink /home/bigdata/test.jar
二、源码解读
1. 整体运行流程
exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
从上面的flink 命令shell脚本中可以看到启动类为org.apache.flink.client.cli.CliFrontend ,接下来先从整体上介绍一下脚本启动后的运行流程。
public static void main(final String[] args) {
//打印启动命令、环境等信息
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 获取flink 配置目录,如果不存在就报错
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 从flink配置目录中读取flink-conf.yaml文件,加到配置里
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
//加载常用的命令行操作
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
int retCode = 31;
try {
//根绝配置和命令行操作初始化CliFrontend
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
//安装安全机制
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
//调用CliFrontend的parseParameters方法, 解析命令行参数,运行具体的action,获取程序执行的运行码
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
} finally {
//获取运行码,退出程序
System.exit(retCode);
}
}
2. runApplication方法解读
在解析命令后,如果匹配上application模式,就调用该方法执行任务的提交操作。
protected void runApplication(String[] args) throws Exception {
LOG.info("Running 'run-application' command.");
//添加常用的命令行操作
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
//根据命令行出入的参数解析命令命令行操作
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
//判断操作是否为help操作,打印帮助信息,并返回
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRunApplication(customCommandLines);
return;
}
//根据参数信息,筛选出有效的命令行操作。
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
//初始化应用部署者
final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);
final ProgramOptions programOptions;
final Configuration effectiveConfiguration;
//判断是否为pyflink操作
if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
//通过反射初始化程序操作
programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
//获取有效的配置
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.emptyList());
//如果不是pyflink操作
} else {
//初始化程序操作
programOptions = new ProgramOptions(commandLine);
//java程序主要验证jar文件路径是否存在
programOptions.validate();
//根据jar路径获取URI
final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
//获取有效的配置
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.singletonList(uri.toString()));
}
//初始化应用配置
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
//提交部署到集群
deployer.run(effectiveConfiguration, applicationConfiguration);
}
2.1 deployer.run方法解读
public <ClusterID> void run(
final Configuration configuration,
final ApplicationConfiguration applicationConfiguration)
throws Exception {
checkNotNull(configuration);
checkNotNull(applicationConfiguration);
LOG.info("Submitting application in 'Application Mode'.");
//根绝配置获取集群客户端工厂类
final ClusterClientFactory<ClusterID> clientFactory =
clientServiceLoader.getClusterClientFactory(configuration);
//创建集群描述器
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clientFactory.createClusterDescriptor(configuration)) {
//获取集群特殊配置:jm、tm、numberOfTaskSlots
final ClusterSpecification clusterSpecification =
clientFactory.getClusterSpecification(configuration);
//部署应用到集群
clusterDescriptor.deployApplicationCluster(
clusterSpecification, applicationConfiguration);
}
}
2.2 clusterDescriptor.deployApplicationCluster方法解读
public ClusterClientProvider<ApplicationId> deployApplicationCluster(
final ClusterSpecification clusterSpecification,
final ApplicationConfiguration applicationConfiguration)
throws ClusterDeploymentException {
checkNotNull(clusterSpecification);
checkNotNull(applicationConfiguration);
//获取部署模式:session、application、per-job
final YarnDeploymentTarget deploymentTarget =
YarnDeploymentTarget.fromConfig(flinkConfiguration);
//如果不是application模式,抛异常提示
if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
throw new ClusterDeploymentException(
"Couldn't deploy Yarn Application Cluster."
+ " Expected deployment.target="
+ YarnDeploymentTarget.APPLICATION.getName()
+ " but actual one was \""
+ deploymentTarget.getName()
+ "\"");
}
//设置application的主类和运行参数
applicationConfiguration.applyToConfiguration(flinkConfiguration);
//获取jar文件
final List<String> pipelineJars =
flinkConfiguration
.getOptional(PipelineOptions.JARS)
.orElse(Collections.emptyList());
Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
try {
//这里jobGraph参数为空,体现了application的特性,不需要客户端生产JobGraph
return deployInternal(
clusterSpecification,
"Flink Application Cluster",
YarnApplicationClusterEntryPoint.class.getName(),
null,
false);
} catch (Exception e) {
throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", e);
}
}
2.3 deployInternal方法解读
private ClusterClientProvider<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached)
throws Exception {
//获取当前用户信息
final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
//判断kerberos安全验证是否开启
if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
boolean useTicketCache =
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
//判断当前用户是否有权限
if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
throw new RuntimeException(
"Hadoop security with Kerberos is enabled but the login user "
+ "does not have Kerberos credentials or delegation tokens!");
}
}
//校验参数是否配置,以及配置的vcores等参数是否超过集群最大值
isReadyForDeployment(clusterSpecification);
// ------------------ Check if the specified queue exists --------------------
//检查配置的资源队列是否存在
checkYarnQueues(yarnClient);
// ------------------ Check if the YARN ClusterClient has the requested resources
// --------------
// Create application via yarnClient
//yarn客户端创建application
final YarnClientApplication yarnApplication = yarnClient.createApplication();
//获取yanr应用response
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
//获取最大资源容量
Resource maxRes = appResponse.getMaximumResourceCapability();
//获取资源可用资源
final ClusterResourceDescription freeClusterMem;
try {
freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch (YarnException | IOException e) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException(
"Could not retrieve information about free cluster resources.", e);
}
//获取yarn最小分配内存
final int yarnMinAllocationMB =
yarnConfiguration.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
if (yarnMinAllocationMB <= 0) {
throw new YarnDeploymentException(
"The minimum allocation memory "
+ "("
+ yarnMinAllocationMB
+ " MB) configured via '"
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "' should be greater than 0.");
}
final ClusterSpecification validClusterSpecification;
try {
//获取验证过的集群配置
validClusterSpecification =
validateClusterResources(
clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
} catch (YarnDeploymentException yde) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw yde;
}
LOG.info("Cluster specification: {}", validClusterSpecification);
//设置执行模式:正常的还是分离的
final ClusterEntrypoint.ExecutionMode executionMode =
detached
? ClusterEntrypoint.ExecutionMode.DETACHED
: ClusterEntrypoint.ExecutionMode.NORMAL;
flinkConfiguration.setString(
ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());
//设置程序的appMaster服务
ApplicationReport report =
startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
//打印停止application操作的命令提示
if (detached) {
final ApplicationId yarnApplicationId = report.getApplicationId();
logDetachedClusterInformation(yarnApplicationId, LOG);
}
//设置应用部署后的端口、地址、应用id等信息到配置里
setClusterEntrypointInfoToConfig(report);
return () -> {
try {
//启动rest集群服务客户端,用于处理rest请求
return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
} catch (Exception e) {
throw new RuntimeException("Error while creating RestClusterClient.", e);
}
};
}
2.4 startAppMaster方法解读
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification)
throws Exception {
//根据配置初始化文件系统
org.apache.flink.core.fs.FileSystem.initialize(
configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
final FileSystem fs = FileSystem.get(yarnConfiguration);
if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem")
&& fs.getScheme().startsWith("file")) {
LOG.warn(
"The file system scheme is '"
+ fs.getScheme()
+ "'. This indicates that the "
+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+ "The Flink YARN client needs to store its files in a distributed file system");
}
//构建应用提交上下文
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
//获取远程共享lib路径
final List<Path> providedLibDirs =
Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration);
//初始化yarn应用文件uploader
final YarnApplicationFileUploader fileUploader =
YarnApplicationFileUploader.from(
fs,
getStagingDir(fs),
providedLibDirs,
appContext.getApplicationId(),
getFileReplication());
//添加应用文件和日志文件
Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
for (File file : shipFiles) {
systemShipFiles.add(file.getAbsoluteFile());
}
final String logConfigFilePath =
configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
if (logConfigFilePath != null) {
systemShipFiles.add(new File(logConfigFilePath));
}
final ApplicationId appId = appContext.getApplicationId();
//设置高可用集群id
setHAClusterIdIfNotSet(configuration, appId);
//设置应用重启次数,高可用根据配置来,如果没有就设置2次,如果不是高可用就设置成1次
if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
// activate re-execution of failed applications
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
//通过反射设置失败间隔
activateHighAvailabilitySupport(appContext);
} else {
// set number of application retries to 1 in the default case
appContext.setMaxAppAttempts(
configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
}
//per-job模式下添加用户的jar
final Set<Path> userJarFiles = new HashSet<>();
if (jobGraph != null) {
userJarFiles.addAll(
jobGraph.getUserJars().stream()
.map(f -> f.toUri())
.map(Path::new)
.collect(Collectors.toSet()));
}
//添加udf 的jar文件
final List<URI> jarUrls =
ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
if (jarUrls != null
&& YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
}
//per-job模式下上传本地文件到远程
if (jobGraph != null) {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
jobGraph.getUserArtifacts().entrySet()) {
// only upload local files
if (!Utils.isRemotePath(entry.getValue().filePath)) {
Path localPath = new Path(entry.getValue().filePath);
Tuple2<Path, Long> remoteFileInfo =
fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
jobGraph.setUserArtifactRemotePath(
entry.getKey(), remoteFileInfo.f0.toString());
}
}
//设置缓存文件配置
jobGraph.writeUserArtifactEntriesToConfiguration();
}
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
addLibFoldersToShipFiles(systemShipFiles);
}
//上传各种依赖文件到远程
final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
final List<String> uploadedDependencies =
fileUploader.registerMultipleLocalResources(
systemShipFiles.stream()
.map(e -> new Path(e.toURI()))
.collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
systemClassPaths.addAll(uploadedDependencies);
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
Set<File> shipOnlyFiles = new HashSet<>();
addPluginsFoldersToShipFiles(shipOnlyFiles);
fileUploader.registerMultipleLocalResources(
shipOnlyFiles.stream()
.map(e -> new Path(e.toURI()))
.collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
}
if (!shipArchives.isEmpty()) {
fileUploader.registerMultipleLocalResources(
shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.ARCHIVE);
}
// Upload and register user jars
final List<String> userClassPaths =
fileUploader.registerMultipleLocalResources(
userJarFiles,
userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
: Path.CUR_DIR,
LocalResourceType.FILE);
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
}
Collections.sort(systemClassPaths);
Collections.sort(userClassPaths);
StringBuilder classPathBuilder = new StringBuilder();
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
for (String classPath : systemClassPaths) {
classPathBuilder.append(classPath).append(File.pathSeparator);
}
final YarnLocalResourceDescriptor localResourceDescFlinkJar =
fileUploader.uploadFlinkDist(flinkJarPath);
classPathBuilder
.append(localResourceDescFlinkJar.getResourceKey())
.append(File.pathSeparator);
//把jobGraph写到本地文件中
if (jobGraph != null) {
File tmpJobGraphFile = null;
try {
tmpJobGraphFile = File.createTempFile(appId.toString(), null);
try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
obOutput.writeObject(jobGraph);
}
final String jobGraphFilename = "job.graph";
configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);
fileUploader.registerSingleLocalResource(
jobGraphFilename,
new Path(tmpJobGraphFile.toURI()),
"",
LocalResourceType.FILE,
true,
false);
classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
} catch (Exception e) {
LOG.warn("Add job graph to local resource fail.");
throw e;
} finally {
if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
}
}
}
//上传flink配置文件到远程
File tmpConfigurationFile = null;
try {
tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);
String flinkConfigKey = "flink-conf.yaml";
fileUploader.registerSingleLocalResource(
flinkConfigKey,
new Path(tmpConfigurationFile.getAbsolutePath()),
"",
LocalResourceType.FILE,
true,
true);
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
} finally {
if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
}
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
Path remoteYarnSiteXmlPath = null;
if (System.getenv("IN_TESTS") != null) {
File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
LOG.info(
"Adding Yarn configuration {} to the AM container local resource bucket",
f.getAbsolutePath());
Path yarnSitePath = new Path(f.getAbsolutePath());
remoteYarnSiteXmlPath =
fileUploader
.registerSingleLocalResource(
Utils.YARN_SITE_FILE_NAME,
yarnSitePath,
"",
LocalResourceType.FILE,
false,
false)
.getPath();
if (System.getProperty("java.security.krb5.conf") != null) {
configuration.set(
SecurityOptions.KERBEROS_KRB5_PATH,
System.getProperty("java.security.krb5.conf"));
}
}
Path remoteKrb5Path = null;
boolean hasKrb5 = false;
String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
final File krb5 = new File(krb5Config);
LOG.info(
"Adding KRB5 configuration {} to the AM container local resource bucket",
krb5.getAbsolutePath());
final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
remoteKrb5Path =
fileUploader
.registerSingleLocalResource(
Utils.KRB5_FILE_NAME,
krb5ConfPath,
"",
LocalResourceType.FILE,
false,
false)
.getPath();
hasKrb5 = true;
}
Path remotePathKeytab = null;
String localizedKeytabPath = null;
String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if (keytab != null) {
boolean localizeKeytab =
flinkConfiguration.getBoolean(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
localizedKeytabPath =
flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
if (localizeKeytab) {
// Localize the keytab to YARN containers via local resource.
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
remotePathKeytab =
fileUploader
.registerSingleLocalResource(
localizedKeytabPath,
new Path(keytab),
"",
LocalResourceType.FILE,
false,
false)
.getPath();
} else {
// // Assume Keytab is pre-installed in the container.
localizedKeytabPath =
flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
}
}
final JobManagerProcessSpec processSpec =
JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);
//启动ApplicationMasterContainer
final ContainerLaunchContext amContainer =
setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);
if (UserGroupInformation.isSecurityEnabled()) {
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container.");
List<Path> yarnAccessList =
ConfigUtils.decodeListFromConfig(
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
Utils.setTokensFor(
amContainer,
ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),
yarnConfiguration);
}
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
fileUploader.close();
final Map<String, String> appMasterEnv = new HashMap<>();
appMasterEnv.putAll(
ConfigurationUtils.getPrefixedKeyValuePairs(
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());
// set Flink on YARN internal configuration values
appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR, localResourceDescFlinkJar.toString());
appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fileUploader.getHomeDir().toString());
appMasterEnv.put(
YarnConfigKeys.ENV_CLIENT_SHIP_FILES,
encodeYarnLocalResourceDescriptorListToString(
fileUploader.getEnvShipResourceList()));
appMasterEnv.put(
YarnConfigKeys.FLINK_YARN_FILES,
fileUploader.getApplicationDir().toUri().toString());
// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
appMasterEnv.put(
YarnConfigKeys.ENV_HADOOP_USER_NAME,
UserGroupInformation.getCurrentUser().getUserName());
if (localizedKeytabPath != null) {
appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
if (remotePathKeytab != null) {
appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
}
}
if (remoteYarnSiteXmlPath != null) {
appMasterEnv.put(
YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
}
if (remoteKrb5Path != null) {
appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
}
// 设置类加载路径
Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);
amContainer.setEnvironment(appMasterEnv);
//为 ApplicationMaster设置必要的资源配置
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(clusterSpecification.getMasterMemoryMB());
capability.setVirtualCores(
flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));
final String customApplicationName = customName != null ? customName : applicationName;
appContext.setApplicationName(customApplicationName);
appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
// 设置应用优先级
int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);
if (priorityNum >= 0) {
Priority priority = Priority.newInstance(priorityNum);
appContext.setPriority(priority);
}
//设置资源队列
if (yarnQueue != null) {
appContext.setQueue(yarnQueue);
}
setApplicationNodeLabel(appContext);
setApplicationTags(appContext);
// 添加钩子函数,在部署失败的时候执行清理工作
Thread deploymentFailureHook =
new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " + appId);
//提交任务到集群
yarnClient.submitApplication(appContext);
LOG.info("Waiting for the cluster to be allocated");
//循环判断任务状态,kill或者失败抛异常,running或者finished结束循环,其他状态打印信息
final long startTime = System.currentTimeMillis();
ApplicationReport report;
YarnApplicationState lastAppState = YarnApplicationState.NEW;
loop:
while (true) {
try {
report = yarnClient.getApplicationReport(appId);
} catch (IOException e) {
throw new YarnDeploymentException("Failed to deploy the cluster.", e);
}
YarnApplicationState appState = report.getYarnApplicationState();
LOG.debug("Application State: {}", appState);
switch (appState) {
case FAILED:
case KILLED:
throw new YarnDeploymentException(
"The YARN application unexpectedly switched to state "
+ appState
+ " during deployment. \n"
+ "Diagnostics from YARN: "
+ report.getDiagnostics()
+ "\n"
+ "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
+ "yarn logs -applicationId "
+ appId);
// break ..
case RUNNING:
LOG.info("YARN application has been deployed successfully.");
break loop;
case FINISHED:
LOG.info("YARN application has been finished successfully.");
break loop;
default:
if (appState != lastAppState) {
LOG.info("Deploying cluster, current state " + appState);
}
if (System.currentTimeMillis() - startTime > 60000) {
LOG.info(
"Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
}
}
lastAppState = appState;
Thread.sleep(250);
}
// 部署成功,移除钩子函数
ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
return report;
}
更多推荐
已为社区贡献1条内容
所有评论(0)