提示:本文的源码是基于Flink 1.13版本


前言

Flink具有多种提交方式,比如:常用的local模式,stantalone模式,yarn模式,k8s等,其中yarn又分为per-job、session、application三种不同的方式,前两种很早之前就有了,而从Flink 1.11开始出现了一种新的模式,就是application模式。它和per-job的区别主要在于per-job的JobGrap的构建需要在客户端完成,并将依赖项和 JobGraph 发送到集群,这一系列的操作,由于需要大量的网络带宽下载依赖项并将二进制文件发送到集群,会造成客户端消耗大量的资源。尤其在大量用户共享客户端时,问题更加突出,而新的部署方式 Application 模式就解决了该问题。


一、application模式提交命令

./bin/flink run-application 
-t yarn-application  
-c com.bigdata.Test
-yqu root.users.flink  /home/bigdata/test.jar

二、源码解读

1. 整体运行流程

exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

从上面的flink 命令shell脚本中可以看到启动类为org.apache.flink.client.cli.CliFrontend ,接下来先从整体上介绍一下脚本启动后的运行流程。

public static void main(final String[] args) {
		//打印启动命令、环境等信息
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
        // 获取flink 配置目录,如果不存在就报错
        final String configurationDirectory = getConfigurationDirectoryFromEnv();
        // 从flink配置目录中读取flink-conf.yaml文件,加到配置里
        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);
        //加载常用的命令行操作
        final List<CustomCommandLine> customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);
        int retCode = 31;
        try {
        	//根绝配置和命令行操作初始化CliFrontend
            final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
            //安装安全机制
            SecurityUtils.install(new SecurityConfiguration(cli.configuration));
            //调用CliFrontend的parseParameters方法, 解析命令行参数,运行具体的action,获取程序执行的运行码
            retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
        } catch (Throwable t) {
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            LOG.error("Fatal error while running command line interface.", strippedThrowable);
            strippedThrowable.printStackTrace();
        } finally {
        	//获取运行码,退出程序
            System.exit(retCode);
        }
    }

2. runApplication方法解读

在解析命令后,如果匹配上application模式,就调用该方法执行任务的提交操作。

protected void runApplication(String[] args) throws Exception {
        LOG.info("Running 'run-application' command.");
        //添加常用的命令行操作
        final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        //根据命令行出入的参数解析命令命令行操作
        final CommandLine commandLine = getCommandLine(commandOptions, args, true);
        //判断操作是否为help操作,打印帮助信息,并返回
        if (commandLine.hasOption(HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRunApplication(customCommandLines);
            return;
        }
        //根据参数信息,筛选出有效的命令行操作。
        final CustomCommandLine activeCommandLine =
                validateAndGetActiveCommandLine(checkNotNull(commandLine));
        //初始化应用部署者
        final ApplicationDeployer deployer =
                new ApplicationClusterDeployer(clusterClientServiceLoader);

        final ProgramOptions programOptions;
        final Configuration effectiveConfiguration;
        //判断是否为pyflink操作
        if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
            //通过反射初始化程序操作
            programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
            //获取有效的配置
            effectiveConfiguration =
                    getEffectiveConfiguration(
                            activeCommandLine,
                            commandLine,
                            programOptions,
                            Collections.emptyList());
        //如果不是pyflink操作
        } else {
            //初始化程序操作
            programOptions = new ProgramOptions(commandLine);
            //java程序主要验证jar文件路径是否存在
            programOptions.validate();
            //根据jar路径获取URI
            final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
            //获取有效的配置
            effectiveConfiguration =
                    getEffectiveConfiguration(
                            activeCommandLine,
                            commandLine,
                            programOptions,
                            Collections.singletonList(uri.toString()));
        }
        //初始化应用配置
        final ApplicationConfiguration applicationConfiguration =
                new ApplicationConfiguration(
                        programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
        //提交部署到集群
        deployer.run(effectiveConfiguration, applicationConfiguration);
    }

2.1 deployer.run方法解读

public <ClusterID> void run(
            final Configuration configuration,
            final ApplicationConfiguration applicationConfiguration)
            throws Exception {
        checkNotNull(configuration);
        checkNotNull(applicationConfiguration);

        LOG.info("Submitting application in 'Application Mode'.");
        //根绝配置获取集群客户端工厂类
        final ClusterClientFactory<ClusterID> clientFactory =
                clientServiceLoader.getClusterClientFactory(configuration);
        //创建集群描述器
        try (final ClusterDescriptor<ClusterID> clusterDescriptor =
                clientFactory.createClusterDescriptor(configuration)) {
            //获取集群特殊配置:jm、tm、numberOfTaskSlots
            final ClusterSpecification clusterSpecification =
                    clientFactory.getClusterSpecification(configuration);
            //部署应用到集群
            clusterDescriptor.deployApplicationCluster(
                    clusterSpecification, applicationConfiguration);
        }
    }

2.2 clusterDescriptor.deployApplicationCluster方法解读

public ClusterClientProvider<ApplicationId> deployApplicationCluster(
            final ClusterSpecification clusterSpecification,
            final ApplicationConfiguration applicationConfiguration)
            throws ClusterDeploymentException {
        checkNotNull(clusterSpecification);
        checkNotNull(applicationConfiguration);
        //获取部署模式:session、application、per-job
        final YarnDeploymentTarget deploymentTarget =
                YarnDeploymentTarget.fromConfig(flinkConfiguration);
        //如果不是application模式,抛异常提示
        if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
            throw new ClusterDeploymentException(
                    "Couldn't deploy Yarn Application Cluster."
                            + " Expected deployment.target="
                            + YarnDeploymentTarget.APPLICATION.getName()
                            + " but actual one was \""
                            + deploymentTarget.getName()
                            + "\"");
        }
        //设置application的主类和运行参数
        applicationConfiguration.applyToConfiguration(flinkConfiguration);
        //获取jar文件
        final List<String> pipelineJars =
                flinkConfiguration
                        .getOptional(PipelineOptions.JARS)
                        .orElse(Collections.emptyList());
        Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");

        try {
            //这里jobGraph参数为空,体现了application的特性,不需要客户端生产JobGraph
            return deployInternal(
                    clusterSpecification,
                    "Flink Application Cluster",
                    YarnApplicationClusterEntryPoint.class.getName(),
                    null,
                    false);
        } catch (Exception e) {
            throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", e);
        }
    }

2.3 deployInternal方法解读

private ClusterClientProvider<ApplicationId> deployInternal(
            ClusterSpecification clusterSpecification,
            String applicationName,
            String yarnClusterEntrypoint,
            @Nullable JobGraph jobGraph,
            boolean detached)
            throws Exception {
        //获取当前用户信息
        final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
        //判断kerberos安全验证是否开启
        if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
            boolean useTicketCache =
                    flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
            //判断当前用户是否有权限
            if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
                throw new RuntimeException(
                        "Hadoop security with Kerberos is enabled but the login user "
                                + "does not have Kerberos credentials or delegation tokens!");
            }
        }
        //校验参数是否配置,以及配置的vcores等参数是否超过集群最大值
        isReadyForDeployment(clusterSpecification);

        // ------------------ Check if the specified queue exists --------------------
        //检查配置的资源队列是否存在
        checkYarnQueues(yarnClient);

        // ------------------ Check if the YARN ClusterClient has the requested resources
        // --------------

        // Create application via yarnClient
        //yarn客户端创建application
        final YarnClientApplication yarnApplication = yarnClient.createApplication();
        //获取yanr应用response
        final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
        //获取最大资源容量
        Resource maxRes = appResponse.getMaximumResourceCapability();
        //获取资源可用资源
        final ClusterResourceDescription freeClusterMem;
        try {
            freeClusterMem = getCurrentFreeClusterResources(yarnClient);
        } catch (YarnException | IOException e) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw new YarnDeploymentException(
                    "Could not retrieve information about free cluster resources.", e);
        }
        //获取yarn最小分配内存
        final int yarnMinAllocationMB =
                yarnConfiguration.getInt(
                        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
                        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
        if (yarnMinAllocationMB <= 0) {
            throw new YarnDeploymentException(
                    "The minimum allocation memory "
                            + "("
                            + yarnMinAllocationMB
                            + " MB) configured via '"
                            + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
                            + "' should be greater than 0.");
        }

        final ClusterSpecification validClusterSpecification;
        try {
            //获取验证过的集群配置
            validClusterSpecification =
                    validateClusterResources(
                            clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
        } catch (YarnDeploymentException yde) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw yde;
        }

        LOG.info("Cluster specification: {}", validClusterSpecification);
        //设置执行模式:正常的还是分离的
        final ClusterEntrypoint.ExecutionMode executionMode =
                detached
                        ? ClusterEntrypoint.ExecutionMode.DETACHED
                        : ClusterEntrypoint.ExecutionMode.NORMAL;

        flinkConfiguration.setString(
                ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());
		//设置程序的appMaster服务
        ApplicationReport report =
                startAppMaster(
                        flinkConfiguration,
                        applicationName,
                        yarnClusterEntrypoint,
                        jobGraph,
                        yarnClient,
                        yarnApplication,
                        validClusterSpecification);
        //打印停止application操作的命令提示
        if (detached) {
            final ApplicationId yarnApplicationId = report.getApplicationId();
            logDetachedClusterInformation(yarnApplicationId, LOG);
        }
        //设置应用部署后的端口、地址、应用id等信息到配置里
        setClusterEntrypointInfoToConfig(report);
        return () -> {
            try {
                //启动rest集群服务客户端,用于处理rest请求
                return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
            } catch (Exception e) {
                throw new RuntimeException("Error while creating RestClusterClient.", e);
            }
        };
    }

2.4 startAppMaster方法解读

private ApplicationReport startAppMaster(
            Configuration configuration,
            String applicationName,
            String yarnClusterEntrypoint,
            JobGraph jobGraph,
            YarnClient yarnClient,
            YarnClientApplication yarnApplication,
            ClusterSpecification clusterSpecification)
            throws Exception {

        //根据配置初始化文件系统
        org.apache.flink.core.fs.FileSystem.initialize(
                configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

        final FileSystem fs = FileSystem.get(yarnConfiguration);

        if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem")
                && fs.getScheme().startsWith("file")) {
            LOG.warn(
                    "The file system scheme is '"
                            + fs.getScheme()
                            + "'. This indicates that the "
                            + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
                            + "The Flink YARN client needs to store its files in a distributed file system");
        }

        //构建应用提交上下文
        ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
        //获取远程共享lib路径
        final List<Path> providedLibDirs =
                Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration);
        //初始化yarn应用文件uploader
        final YarnApplicationFileUploader fileUploader =
                YarnApplicationFileUploader.from(
                        fs,
                        getStagingDir(fs),
                        providedLibDirs,
                        appContext.getApplicationId(),
                        getFileReplication());

        //添加应用文件和日志文件
        Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
        for (File file : shipFiles) {
            systemShipFiles.add(file.getAbsoluteFile());
        }

        final String logConfigFilePath =
                configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
        if (logConfigFilePath != null) {
            systemShipFiles.add(new File(logConfigFilePath));
        }


        final ApplicationId appId = appContext.getApplicationId();

        //设置高可用集群id
        setHAClusterIdIfNotSet(configuration, appId);
        //设置应用重启次数,高可用根据配置来,如果没有就设置2次,如果不是高可用就设置成1次
        if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
            // activate re-execution of failed applications
            appContext.setMaxAppAttempts(
                    configuration.getInteger(
                            YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                            YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
            //通过反射设置失败间隔
            activateHighAvailabilitySupport(appContext);
        } else {
            // set number of application retries to 1 in the default case
            appContext.setMaxAppAttempts(
                    configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
        }
        //per-job模式下添加用户的jar
        final Set<Path> userJarFiles = new HashSet<>();
        if (jobGraph != null) {
            userJarFiles.addAll(
                    jobGraph.getUserJars().stream()
                            .map(f -> f.toUri())
                            .map(Path::new)
                            .collect(Collectors.toSet()));
        }
        //添加udf 的jar文件
        final List<URI> jarUrls =
                ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
        if (jarUrls != null
                && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
            userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
        }
        //per-job模式下上传本地文件到远程
        if (jobGraph != null) {
            for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                    jobGraph.getUserArtifacts().entrySet()) {
                // only upload local files
                if (!Utils.isRemotePath(entry.getValue().filePath)) {
                    Path localPath = new Path(entry.getValue().filePath);
                    Tuple2<Path, Long> remoteFileInfo =
                            fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
                    jobGraph.setUserArtifactRemotePath(
                            entry.getKey(), remoteFileInfo.f0.toString());
                }
            }
            //设置缓存文件配置
            jobGraph.writeUserArtifactEntriesToConfiguration();
        }

        if (providedLibDirs == null || providedLibDirs.isEmpty()) {
            addLibFoldersToShipFiles(systemShipFiles);
        }

        //上传各种依赖文件到远程
        final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
        final List<String> uploadedDependencies =
                fileUploader.registerMultipleLocalResources(
                        systemShipFiles.stream()
                                .map(e -> new Path(e.toURI()))
                                .collect(Collectors.toSet()),
                        Path.CUR_DIR,
                        LocalResourceType.FILE);
        systemClassPaths.addAll(uploadedDependencies);
        if (providedLibDirs == null || providedLibDirs.isEmpty()) {
            Set<File> shipOnlyFiles = new HashSet<>();
            addPluginsFoldersToShipFiles(shipOnlyFiles);
            fileUploader.registerMultipleLocalResources(
                    shipOnlyFiles.stream()
                            .map(e -> new Path(e.toURI()))
                            .collect(Collectors.toSet()),
                    Path.CUR_DIR,
                    LocalResourceType.FILE);
        }

        if (!shipArchives.isEmpty()) {
            fileUploader.registerMultipleLocalResources(
                    shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
                    Path.CUR_DIR,
                    LocalResourceType.ARCHIVE);
        }

        // Upload and register user jars
        final List<String> userClassPaths =
                fileUploader.registerMultipleLocalResources(
                        userJarFiles,
                        userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
                                ? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
                                : Path.CUR_DIR,
                        LocalResourceType.FILE);

        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
            systemClassPaths.addAll(userClassPaths);
        }

        Collections.sort(systemClassPaths);
        Collections.sort(userClassPaths);

        StringBuilder classPathBuilder = new StringBuilder();
        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }
        for (String classPath : systemClassPaths) {
            classPathBuilder.append(classPath).append(File.pathSeparator);
        }

      
        final YarnLocalResourceDescriptor localResourceDescFlinkJar =
                fileUploader.uploadFlinkDist(flinkJarPath);
        classPathBuilder
                .append(localResourceDescFlinkJar.getResourceKey())
                .append(File.pathSeparator);
        //把jobGraph写到本地文件中
        if (jobGraph != null) {
            File tmpJobGraphFile = null;
            try {
                tmpJobGraphFile = File.createTempFile(appId.toString(), null);
                try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
                        ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
                    obOutput.writeObject(jobGraph);
                }

                final String jobGraphFilename = "job.graph";
                configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);

                fileUploader.registerSingleLocalResource(
                        jobGraphFilename,
                        new Path(tmpJobGraphFile.toURI()),
                        "",
                        LocalResourceType.FILE,
                        true,
                        false);
                classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
            } catch (Exception e) {
                LOG.warn("Add job graph to local resource fail.");
                throw e;
            } finally {
                if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
                    LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
                }
            }
        }
        //上传flink配置文件到远程
        File tmpConfigurationFile = null;
        try {
            tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
            BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);

            String flinkConfigKey = "flink-conf.yaml";
            fileUploader.registerSingleLocalResource(
                    flinkConfigKey,
                    new Path(tmpConfigurationFile.getAbsolutePath()),
                    "",
                    LocalResourceType.FILE,
                    true,
                    true);
            classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
        } finally {
            if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
                LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
            }
        }

        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }
        Path remoteYarnSiteXmlPath = null;
        if (System.getenv("IN_TESTS") != null) {
            File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
            LOG.info(
                    "Adding Yarn configuration {} to the AM container local resource bucket",
                    f.getAbsolutePath());
            Path yarnSitePath = new Path(f.getAbsolutePath());
            remoteYarnSiteXmlPath =
                    fileUploader
                            .registerSingleLocalResource(
                                    Utils.YARN_SITE_FILE_NAME,
                                    yarnSitePath,
                                    "",
                                    LocalResourceType.FILE,
                                    false,
                                    false)
                            .getPath();
            if (System.getProperty("java.security.krb5.conf") != null) {
                configuration.set(
                        SecurityOptions.KERBEROS_KRB5_PATH,
                        System.getProperty("java.security.krb5.conf"));
            }
        }

        Path remoteKrb5Path = null;
        boolean hasKrb5 = false;
        String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
        if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
            final File krb5 = new File(krb5Config);
            LOG.info(
                    "Adding KRB5 configuration {} to the AM container local resource bucket",
                    krb5.getAbsolutePath());
            final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
            remoteKrb5Path =
                    fileUploader
                            .registerSingleLocalResource(
                                    Utils.KRB5_FILE_NAME,
                                    krb5ConfPath,
                                    "",
                                    LocalResourceType.FILE,
                                    false,
                                    false)
                            .getPath();
            hasKrb5 = true;
        }

        Path remotePathKeytab = null;
        String localizedKeytabPath = null;
        String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
        if (keytab != null) {
            boolean localizeKeytab =
                    flinkConfiguration.getBoolean(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
            localizedKeytabPath =
                    flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
            if (localizeKeytab) {
                // Localize the keytab to YARN containers via local resource.
                LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
                remotePathKeytab =
                        fileUploader
                                .registerSingleLocalResource(
                                        localizedKeytabPath,
                                        new Path(keytab),
                                        "",
                                        LocalResourceType.FILE,
                                        false,
                                        false)
                                .getPath();
            } else {
                // // Assume Keytab is pre-installed in the container.
                localizedKeytabPath =
                        flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
            }
        }

        final JobManagerProcessSpec processSpec =
                JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                        flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);
       //启动ApplicationMasterContainer                
        final ContainerLaunchContext amContainer =
                setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);

        if (UserGroupInformation.isSecurityEnabled()) {
            // set HDFS delegation tokens when security is enabled
            LOG.info("Adding delegation token to the AM container.");
            List<Path> yarnAccessList =
                    ConfigUtils.decodeListFromConfig(
                            configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
            Utils.setTokensFor(
                    amContainer,
                    ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),
                    yarnConfiguration);
        }

        amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
        fileUploader.close();
        final Map<String, String> appMasterEnv = new HashMap<>();
        appMasterEnv.putAll(
                ConfigurationUtils.getPrefixedKeyValuePairs(
                        ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
        appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());

        // set Flink on YARN internal configuration values
        appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR, localResourceDescFlinkJar.toString());
        appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
        appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fileUploader.getHomeDir().toString());
        appMasterEnv.put(
                YarnConfigKeys.ENV_CLIENT_SHIP_FILES,
                encodeYarnLocalResourceDescriptorListToString(
                        fileUploader.getEnvShipResourceList()));
        appMasterEnv.put(
                YarnConfigKeys.FLINK_YARN_FILES,
                fileUploader.getApplicationDir().toUri().toString());

        // https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
        appMasterEnv.put(
                YarnConfigKeys.ENV_HADOOP_USER_NAME,
                UserGroupInformation.getCurrentUser().getUserName());

        if (localizedKeytabPath != null) {
            appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
            String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
            appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
            if (remotePathKeytab != null) {
                appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
            }
        }

        if (remoteYarnSiteXmlPath != null) {
            appMasterEnv.put(
                    YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
        }
        if (remoteKrb5Path != null) {
            appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
        }
        // 设置类加载路径
        Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);

        amContainer.setEnvironment(appMasterEnv);
        //为 ApplicationMaster设置必要的资源配置
        Resource capability = Records.newRecord(Resource.class);
        capability.setMemory(clusterSpecification.getMasterMemoryMB());
        capability.setVirtualCores(
                flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));

        final String customApplicationName = customName != null ? customName : applicationName;

        appContext.setApplicationName(customApplicationName);
        appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
        appContext.setAMContainerSpec(amContainer);
        appContext.setResource(capability);

        // 设置应用优先级
        int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);
        if (priorityNum >= 0) {
            Priority priority = Priority.newInstance(priorityNum);
            appContext.setPriority(priority);
        }
		//设置资源队列
        if (yarnQueue != null) {
            appContext.setQueue(yarnQueue);
        }

        setApplicationNodeLabel(appContext);

        setApplicationTags(appContext);

        // 添加钩子函数,在部署失败的时候执行清理工作
        Thread deploymentFailureHook =
                new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
        Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
        LOG.info("Submitting application master " + appId);
        //提交任务到集群
        yarnClient.submitApplication(appContext);

        LOG.info("Waiting for the cluster to be allocated");
        //循环判断任务状态,kill或者失败抛异常,running或者finished结束循环,其他状态打印信息
        final long startTime = System.currentTimeMillis();
        ApplicationReport report;
        YarnApplicationState lastAppState = YarnApplicationState.NEW;
        loop:
        while (true) {
            try {
                report = yarnClient.getApplicationReport(appId);
            } catch (IOException e) {
                throw new YarnDeploymentException("Failed to deploy the cluster.", e);
            }
            YarnApplicationState appState = report.getYarnApplicationState();
            LOG.debug("Application State: {}", appState);
            switch (appState) {
                case FAILED:
                case KILLED:
                    throw new YarnDeploymentException(
                            "The YARN application unexpectedly switched to state "
                                    + appState
                                    + " during deployment. \n"
                                    + "Diagnostics from YARN: "
                                    + report.getDiagnostics()
                                    + "\n"
                                    + "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
                                    + "yarn logs -applicationId "
                                    + appId);
                    // break ..
                case RUNNING:
                    LOG.info("YARN application has been deployed successfully.");
                    break loop;
                case FINISHED:
                    LOG.info("YARN application has been finished successfully.");
                    break loop;
                default:
                    if (appState != lastAppState) {
                        LOG.info("Deploying cluster, current state " + appState);
                    }
                    if (System.currentTimeMillis() - startTime > 60000) {
                        LOG.info(
                                "Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
                    }
            }
            lastAppState = appState;
            Thread.sleep(250);
        }

        // 部署成功,移除钩子函数
        ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
        return report;
    }

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐