Flink1.15源码

flink入口类(bin/flink)

"org.apache.flink.client.cli.CliFrontend"
    EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
    // 1. System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR)找到配置文件路径
    final String configurationDirectory = getConfigurationDirectoryFromEnv();
    // 2. 加载全局配置
    final Configuration configuration =
            GlobalConfiguration.loadConfiguration(configurationDirectory);
    // 3. 加载命令行参数配置
    final List<CustomCommandLine> customCommandLines =
            loadCustomCommandLines(configuration, configurationDirectory);
    int retCode = 31;
    try {
        final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
        SecurityUtils.install(new SecurityConfiguration(cli.configuration));
        // 解析参数并执行
        retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args)); 
    } catch (Throwable t) {
        final Throwable strippedThrowable =
                ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
        LOG.error("Fatal error while running command line interface.", strippedThrowable);
        strippedThrowable.printStackTrace();
    } finally {
        System.exit(retCode);
    }

parseAndRun(args)

    // 核对是否有参数
    if (args.length < 1) {
        CliFrontendParser.printHelp(customCommandLines);
        System.out.println("Please specify an action.");
        return 1;
    }
    // 提交模式
    String action = args[0];
    switch (action) {
        case ACTION_RUN: // action为'run',其中有两种提交模式:'-t per-job' 和'-t yarn-session'
            run(params);
            return 0;
        case ACTION_RUN_APPLICATION: // action为'run-application',提交模式为:'-t yarn-application'
            runApplication(params);
            return 0;
        ...
                }

A、run(per-job&yarn-session)

    protected void run(String[] args) throws Exception {
        LOG.info("Running 'run' command.");
        // 添加配置参数:SAVEPOINT_PATH_OPTION,SAVEPOINT_ALLOW_NON_RESTORED_OPTION,SAVEPOINT_RESTORE_MODE
        final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        // 合并配置参数'commandOptions'和命令行输入的'args'参数
        final CommandLine commandLine = getCommandLine(commandOptions, args, true);

        // evaluate help flag ,"Show the help message for the CLI Frontend or the action."
        if (commandLine.hasOption(HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRun(customCommandLines);
            return;
        }
        // 验证参数是否可用
        final CustomCommandLine activeCommandLine =
                validateAndGetActiveCommandLine(checkNotNull(commandLine));
        // 封装参数
        final ProgramOptions programOptions = ProgramOptions.create(commandLine);
        // 返回可用jar包
        final List<URL> jobJars = getJobJarAndDependencies(programOptions);
        // 封装所有有效配置 -> Configuration
        final Configuration effectiveConfiguration =
                getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

        LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

        try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
            executeProgram(effectiveConfiguration, program);
        }
    }

1、ProgramOptions.create(封装参数)

    public static ProgramOptions create(CommandLine line) throws CliArgsException {
        if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {
            return createPythonProgramOptions(line);
        } else {
            // 返回项目参数,如 class,jar, parallelism
            return new ProgramOptions(line);
        }
    }
1.1、ProgramOptions
    protected ProgramOptions(CommandLine line) throws CliArgsException {
        super(line);
        ...
            ...
        this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line);
    }

2、getJobJarAndDependencies(返回可用jar包)

    private List<URL> getJobJarAndDependencies(ProgramOptions programOptions)
            throws CliArgsException {
        // 入口类
        String entryPointClass = programOptions.getEntryPointClassName();
        // jar包路径
        String jarFilePath = programOptions.getJarFilePath();

        try {
            File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;
            return PackagedProgram.getJobJarAndDependencies(jarFile, entryPointClass);
        } ...
    }
2.1、getJobJarAndDependencies
    public static List<URL> getJobJarAndDependencies(
            File jarFile, @Nullable String entryPointClassName) throws ProgramInvocationException {
        URL jarFileUrl = loadJarFile(jarFile);

        List<File> extractedTempLibraries =
                jarFileUrl == null
                        ? Collections.emptyList()
                        : extractContainedLibraries(jarFileUrl);

        List<URL> libs = new ArrayList<URL>(extractedTempLibraries.size() + 1);

        if (jarFileUrl != null) {
            libs.add(jarFileUrl);
        }
        for (File tmpLib : extractedTempLibraries) {
            try {
                libs.add(tmpLib.getAbsoluteFile().toURI().toURL());
            } catch (MalformedURLException e) {
                throw new RuntimeException("URL is invalid. This should not happen.", e);
            }
        }

        if (isPython(entryPointClassName)) {
            libs.add(PackagedProgramUtils.getPythonJar());
        }

        return libs;
    }

3、executeProgram

    protected void executeProgram(final Configuration configuration, final PackagedProgram program)
            throws ProgramInvocationException {
        ClientUtils.executeProgram(
                new DefaultExecutorServiceLoader(), configuration, program, false, false);
    }
3.1、ClientUtils.executeProgram
    public static void executeProgram(...)
            throws ProgramInvocationException {
            ...
            ...
            try {
                program.invokeInteractiveModeForExecution();
            } ...
    }
3.2、invokeInteractiveModeForExecution
    public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
        ...
        try {
            callMainMethod(mainClass, args);
        } ...
    }
3.3、callMainMethod
    private static void callMainMethod(Class<?> entryClass, String[] args)
            throws ProgramInvocationException {
        Method mainMethod;
        ...
        try {
            mainMethod = entryClass.getMethod("main", String[].class);
        } 
        	...
        try {
            // 调用提交任务中jar包的class方法
            mainMethod.invoke(null, (Object) args);
        } 
        ...
    }

B、parseAndRun(run-application)

    	// package org.apache.flink.client.cli
        protected void runApplication(String[] args) throws Exception {
        LOG.info("Running 'run-application' command.");

        final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        final CommandLine commandLine = getCommandLine(commandOptions, args, true);

        if (commandLine.hasOption(HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRunApplication(customCommandLines);
            return;
        }

        final CustomCommandLine activeCommandLine =
                validateAndGetActiveCommandLine(checkNotNull(commandLine));
		//TODO ApplicationClusterDeployer是继承ApplicationDeployer
        // 这里是重点
        final ApplicationDeployer deployer =
                new ApplicationClusterDeployer(clusterClientServiceLoader);

        final ProgramOptions programOptions;
        final Configuration effectiveConfiguration;

        // No need to set a jarFile path for Pyflink job.
        if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
            programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
            effectiveConfiguration =
                    getEffectiveConfiguration(
                            activeCommandLine,
                            commandLine,
                            programOptions,
                            Collections.emptyList());
        } else {
            programOptions = new ProgramOptions(commandLine);
            programOptions.validate();
            final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
            effectiveConfiguration =
                    getEffectiveConfiguration(
                            activeCommandLine,
                            commandLine,
                            programOptions,
                            Collections.singletonList(uri.toString()));
        }

        final ApplicationConfiguration applicationConfiguration =
                new ApplicationConfiguration(
                        programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
        deployer.run(effectiveConfiguration, applicationConfiguration);
    }

1、ApplicationClusterDeployer

// package org.apache.flink.client.deployment.application.cli;
public class ApplicationClusterDeployer implements ApplicationDeployer {

    private static final Logger LOG = LoggerFactory.getLogger(ApplicationClusterDeployer.class);

    private final ClusterClientServiceLoader clientServiceLoader;

    public ApplicationClusterDeployer(final ClusterClientServiceLoader clientServiceLoader) {
        this.clientServiceLoader = checkNotNull(clientServiceLoader);
    }

    public <ClusterID> void run(
            final Configuration configuration,
            final ApplicationConfiguration applicationConfiguration)
            throws Exception {
        checkNotNull(configuration);
        checkNotNull(applicationConfiguration);

        LOG.info("Submitting application in 'Application Mode'.");
		// 获取ClusterClientFactory,ClusterClientFactory里面包含了clients创建在cluster的一些必要信息
        final ClusterClientFactory<ClusterID> clientFactory =
                clientServiceLoader.getClusterClientFactory(configuration);
        // clusterDescriptor用于在集群部署用于集群通信的客户端的描述器
        try (final ClusterDescriptor<ClusterID> clusterDescriptor =
                clientFactory.createClusterDescriptor(configuration)) {
            // 通过上文获取的clusterDescriptor,启动集群任务的一些基本信息
            final ClusterSpecification clusterSpecification =
                    clientFactory.getClusterSpecification(configuration);
			// 启动任务
            clusterDescriptor.deployApplicationCluster(
                    clusterSpecification, applicationConfiguration);
        }
    }
}
1.1、ApplicationDeployer(注意看注释信息)
// package org.apache.flink.client.deployment;
@Internal
public interface ApplicationDeployer {
    /**
     * Submits a user program for execution and runs the main user method on the cluster.
     *
     * @param configuration the configuration containing all the necessary information about
     *     submitting the user program.
     * @param applicationConfiguration an {@link ApplicationConfiguration} specific to the
     *     application to be executed.
     */
    <ClusterID> void run(
            final Configuration configuration,
            final ApplicationConfiguration applicationConfiguration)
            throws Exception;
}

1.2、ClusterSpecification
// package org.apache.flink.client.deployment;
public final class ClusterSpecification {
    private final int masterMemoryMB;
    private final int taskManagerMemoryMB;
    private final int slotsPerTaskManager;

    private ClusterSpecification(
            int masterMemoryMB, int taskManagerMemoryMB, int slotsPerTaskManager) {
        this.masterMemoryMB = masterMemoryMB;
        this.taskManagerMemoryMB = taskManagerMemoryMB;
        this.slotsPerTaskManager = slotsPerTaskManager;
    }

    public int getMasterMemoryMB() {
        return masterMemoryMB;
    }

    public int getTaskManagerMemoryMB() {
        return taskManagerMemoryMB;
    }

    public int getSlotsPerTaskManager() {
        return slotsPerTaskManager;
    }

    @Override
    public String toString() {
        return "ClusterSpecification{"
                + "masterMemoryMB="
                + masterMemoryMB
                + ", taskManagerMemoryMB="
                + taskManagerMemoryMB
                + ", slotsPerTaskManager="
                + slotsPerTaskManager
                + '}';
    }

    /** Builder for the {@link ClusterSpecification} instance. */
    public static class ClusterSpecificationBuilder {
        private int masterMemoryMB = 768;
        private int taskManagerMemoryMB = 1024;
        private int slotsPerTaskManager = 1;

        public ClusterSpecificationBuilder setMasterMemoryMB(int masterMemoryMB) {
            this.masterMemoryMB = masterMemoryMB;
            return this;
        }

        public ClusterSpecificationBuilder setTaskManagerMemoryMB(int taskManagerMemoryMB) {
            this.taskManagerMemoryMB = taskManagerMemoryMB;
            return this;
        }

        public ClusterSpecificationBuilder setSlotsPerTaskManager(int slotsPerTaskManager) {
            this.slotsPerTaskManager = slotsPerTaskManager;
            return this;
        }

        public ClusterSpecification createClusterSpecification() {
            return new ClusterSpecification(
                    masterMemoryMB, taskManagerMemoryMB, slotsPerTaskManager);
        }
    }
}
1.3、clusterDescriptor.deployApplicationCluster
	// package org.apache.flink.yarn;    
	@Override
    public ClusterClientProvider<ApplicationId> deployApplicationCluster(
            final ClusterSpecification clusterSpecification,
            final ApplicationConfiguration applicationConfiguration)
            throws ClusterDeploymentException {
        checkNotNull(clusterSpecification);
        checkNotNull(applicationConfiguration);

        final YarnDeploymentTarget deploymentTarget =
                YarnDeploymentTarget.fromConfig(flinkConfiguration);
        if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
            throw new ClusterDeploymentException(
                    "Couldn't deploy Yarn Application Cluster."
                            + " Expected deployment.target="
                            + YarnDeploymentTarget.APPLICATION.getName()
                            + " but actual one was \""
                            + deploymentTarget.getName()
                            + "\"");
        }

        applicationConfiguration.applyToConfiguration(flinkConfiguration);

        // No need to do pipelineJars validation if it is a PyFlink job.
        if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName())
                || PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) {
            final List<String> pipelineJars =
                    flinkConfiguration
                            .getOptional(PipelineOptions.JARS)
                            .orElse(Collections.emptyList());
            Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
        }

        try {
            return deployInternal(
                    clusterSpecification,
                    "Flink Application Cluster",
                    YarnApplicationClusterEntryPoint.class.getName(),
                    null,
                    false);
        } catch (Exception e) {
            throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", e);
        }
    }
1.3.1、return deployInternal(…)
	// package org.apache.flink.yarn; 	
	private ClusterClientProvider<ApplicationId> deployInternal(
            ClusterSpecification clusterSpecification,
            String applicationName,
            String yarnClusterEntrypoint,
            @Nullable JobGraph jobGraph,
            boolean detached)
            throws Exception {
        ...
        isReadyForDeployment(clusterSpecification);
        // ------------------ Check if the specified queue exists --------------------
        checkYarnQueues(yarnClient);

        // ------------------ Check if the YARN ClusterClient has the requested resources

        // 向yarn的resourcemanager(以下直接说成yarn)申请注册application
        final YarnClientApplication yarnApplication = yarnClient.createApplication();
    	// yarn返回注册请求信息(资源等信息)
        final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
		// 分配的最大资源
        Resource maxRes = appResponse.getMaximumResourceCapability();

        final ClusterResourceDescription freeClusterMem;
        try {
            // 查看集群资源信息: 1.3.1.1
            freeClusterMem = getCurrentFreeClusterResources(yarnClient);
        } catch (YarnException | IOException e) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw new YarnDeploymentException(
                    "Could not retrieve information about free cluster resources.", e);
        }
    	...
        final ClusterSpecification validClusterSpecification;
    	// 这里就确定运行任务的资源了 master,taskManager
        try {
            validClusterSpecification =
                    validateClusterResources(
                            clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
        } catch (YarnDeploymentException yde) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw yde;
        }
		... 
        // 启动AppMaster
        ApplicationReport report =
                startAppMaster(
                        flinkConfiguration,
                        applicationName,
                        yarnClusterEntrypoint,
                        jobGraph,
                        yarnClient,
                        yarnApplication,
                        validClusterSpecification);
		...
    }
1.3.1.1 getCurrentFreeClusterResources
	// package org.apache.flink.yarn;    
	private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient)
            throws YarnException, IOException {
        // 可用节点
        List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);

        int totalFreeMemory = 0;
        int containerLimit = 0;
        int[] nodeManagersFree = new int[nodes.size()];
		// 从节点获取内存资源信息
        for (int i = 0; i < nodes.size(); i++) {
            NodeReport rep = nodes.get(i);
            int free =
                    rep.getCapability().getMemory()
                            - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0);
            nodeManagersFree[i] = free;
            totalFreeMemory += free;
            if (free > containerLimit) {
                containerLimit = free;
            }
        }
        // 返回总空闲内存资源,最大可用单节点资源,各节点可用资源
        return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
    }
1.3.1.2 startAppMaster(很长。慢慢看)
//package org.apache.flink.yarn;
private ApplicationReport startAppMaster(
            Configuration configuration,
            String applicationName,
            String yarnClusterEntrypoint,
            JobGraph jobGraph,
            YarnClient yarnClient,
            YarnClientApplication yarnApplication,
            ClusterSpecification clusterSpecification)
            throws Exception {
        ...
        final FileSystem fs = FileSystem.get(yarnConfiguration);
    	...
        ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();

        final List<Path> providedLibDirs =
                Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration);

        Path stagingDirPath = getStagingDir(fs);
        FileSystem stagingDirFs = stagingDirPath.getFileSystem(yarnConfiguration);
        final YarnApplicationFileUploader fileUploader =
                YarnApplicationFileUploader.from(
                        stagingDirFs,
                        stagingDirPath,
                        providedLibDirs,
                        appContext.getApplicationId(),
                        getFileReplication());

        // The files need to be shipped and added to classpath.
        Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
        for (File file : shipFiles) {
            systemShipFiles.add(file.getAbsoluteFile());
        }

        final String logConfigFilePath =
                configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
        if (logConfigFilePath != null) {
            systemShipFiles.add(new File(logConfigFilePath));
        }

        // Set-up ApplicationSubmissionContext for the application

        final ApplicationId appId = appContext.getApplicationId();

        // ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
        setHAClusterIdIfNotSet(configuration, appId);

        if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
            // activate re-execution of failed applications
            appContext.setMaxAppAttempts(
                    configuration.getInteger(
                            YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                            YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));

            activateHighAvailabilitySupport(appContext);
        } else {
            // set number of application retries to 1 in the default case
            appContext.setMaxAppAttempts(
                    configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
        }

        final Set<Path> userJarFiles = new HashSet<>();
        if (jobGraph != null) {
            userJarFiles.addAll(
                    jobGraph.getUserJars().stream()
                            .map(f -> f.toUri())
                            .map(Path::new)
                            .collect(Collectors.toSet()));
        }

        final List<URI> jarUrls =
                ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
        if (jarUrls != null
                && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
            userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
        }

        // only for per job mode
        if (jobGraph != null) {
            for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                    jobGraph.getUserArtifacts().entrySet()) {
                // only upload local files
                if (!Utils.isRemotePath(entry.getValue().filePath)) {
                    Path localPath = new Path(entry.getValue().filePath);
                    Tuple2<Path, Long> remoteFileInfo =
                            fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
                    jobGraph.setUserArtifactRemotePath(
                            entry.getKey(), remoteFileInfo.f0.toString());
                }
            }

            jobGraph.writeUserArtifactEntriesToConfiguration();
        }

        if (providedLibDirs == null || providedLibDirs.isEmpty()) {
            addLibFoldersToShipFiles(systemShipFiles);
        }

        // Register all files in provided lib dirs as local resources with public visibility
        // and upload the remaining dependencies as local resources with APPLICATION visibility.
        final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
        final List<String> uploadedDependencies =
                fileUploader.registerMultipleLocalResources(
                        systemShipFiles.stream()
                                .map(e -> new Path(e.toURI()))
                                .collect(Collectors.toSet()),
                        Path.CUR_DIR,
                        LocalResourceType.FILE);
        systemClassPaths.addAll(uploadedDependencies);

        // upload and register ship-only files
        // Plugin files only need to be shipped and should not be added to classpath.
        if (providedLibDirs == null || providedLibDirs.isEmpty()) {
            Set<File> shipOnlyFiles = new HashSet<>();
            addPluginsFoldersToShipFiles(shipOnlyFiles);
            fileUploader.registerMultipleLocalResources(
                    shipOnlyFiles.stream()
                            .map(e -> new Path(e.toURI()))
                            .collect(Collectors.toSet()),
                    Path.CUR_DIR,
                    LocalResourceType.FILE);
        }

        if (!shipArchives.isEmpty()) {
            fileUploader.registerMultipleLocalResources(
                    shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
                    Path.CUR_DIR,
                    LocalResourceType.ARCHIVE);
        }

        // only for application mode
        // Python jar file only needs to be shipped and should not be added to classpath.
        if (YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)
                && PackagedProgramUtils.isPython(configuration.get(APPLICATION_MAIN_CLASS))) {
            fileUploader.registerMultipleLocalResources(
                    Collections.singletonList(
                            new Path(PackagedProgramUtils.getPythonJar().toURI())),
                    ConfigConstants.DEFAULT_FLINK_OPT_DIR,
                    LocalResourceType.FILE);
        }

        // Upload and register user jars
        final List<String> userClassPaths =
                fileUploader.registerMultipleLocalResources(
                        userJarFiles,
                        userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
                                ? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
                                : Path.CUR_DIR,
                        LocalResourceType.FILE);

        // usrlib will be automatically shipped if it exists.
        if (ClusterEntrypointUtils.tryFindUserLibDirectory().isPresent()) {
            final Set<File> usrLibShipFiles = new HashSet<>();
            addUsrLibFolderToShipFiles(usrLibShipFiles);
            final List<String> usrLibClassPaths =
                    fileUploader.registerMultipleLocalResources(
                            usrLibShipFiles.stream()
                                    .map(e -> new Path(e.toURI()))
                                    .collect(Collectors.toSet()),
                            Path.CUR_DIR,
                            LocalResourceType.FILE);
            userClassPaths.addAll(usrLibClassPaths);
        }

        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
            systemClassPaths.addAll(userClassPaths);
        }

        // normalize classpath by sorting
        Collections.sort(systemClassPaths);
        Collections.sort(userClassPaths);

        // classpath assembler
        StringBuilder classPathBuilder = new StringBuilder();
        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }
        for (String classPath : systemClassPaths) {
            classPathBuilder.append(classPath).append(File.pathSeparator);
        }

        // Setup jar for ApplicationMaster
        final YarnLocalResourceDescriptor localResourceDescFlinkJar =
                fileUploader.uploadFlinkDist(flinkJarPath);
        classPathBuilder
                .append(localResourceDescFlinkJar.getResourceKey())
                .append(File.pathSeparator);

        // write job graph to tmp file and add it to local resource
        // TODO: server use user main method to generate job graph
        if (jobGraph != null) {
            File tmpJobGraphFile = null;
            try {
                tmpJobGraphFile = File.createTempFile(appId.toString(), null);
                try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
                        ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
                    obOutput.writeObject(jobGraph);
                }

                final String jobGraphFilename = "job.graph";
                configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);

                fileUploader.registerSingleLocalResource(
                        jobGraphFilename,
                        new Path(tmpJobGraphFile.toURI()),
                        "",
                        LocalResourceType.FILE,
                        true,
                        false);
                classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
            } catch (Exception e) {
                LOG.warn("Add job graph to local resource fail.");
                throw e;
            } finally {
                if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
                    LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
                }
            }
        }

        // Upload the flink configuration
        // write out configuration file
        File tmpConfigurationFile = null;
        try {
            tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);

            // remove localhost bind hosts as they render production clusters unusable
            removeLocalhostBindHostSetting(configuration, JobManagerOptions.BIND_HOST);
            removeLocalhostBindHostSetting(configuration, TaskManagerOptions.BIND_HOST);
            // this setting is unconditionally overridden anyway, so we remove it for clarity
            configuration.removeConfig(TaskManagerOptions.HOST);

            BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);

            String flinkConfigKey = "flink-conf.yaml";
            fileUploader.registerSingleLocalResource(
                    flinkConfigKey,
                    new Path(tmpConfigurationFile.getAbsolutePath()),
                    "",
                    LocalResourceType.FILE,
                    true,
                    true);
            classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
        } finally {
            if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
                LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
            }
        }

        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }

        // To support Yarn Secure Integration Test Scenario
        // In Integration test setup, the Yarn containers created by YarnMiniCluster does not have
        // the Yarn site XML
        // and KRB5 configuration files. We are adding these files as container local resources for
        // the container
        // applications (JM/TMs) to have proper secure cluster setup
        Path remoteYarnSiteXmlPath = null;
        if (System.getenv("IN_TESTS") != null) {
            File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
            LOG.info(
                    "Adding Yarn configuration {} to the AM container local resource bucket",
                    f.getAbsolutePath());
            Path yarnSitePath = new Path(f.getAbsolutePath());
            remoteYarnSiteXmlPath =
                    fileUploader
                            .registerSingleLocalResource(
                                    Utils.YARN_SITE_FILE_NAME,
                                    yarnSitePath,
                                    "",
                                    LocalResourceType.FILE,
                                    false,
                                    false)
                            .getPath();
            if (System.getProperty("java.security.krb5.conf") != null) {
                configuration.set(
                        SecurityOptions.KERBEROS_KRB5_PATH,
                        System.getProperty("java.security.krb5.conf"));
            }
        }

        Path remoteKrb5Path = null;
        boolean hasKrb5 = false;
        String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
        if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
            final File krb5 = new File(krb5Config);
            LOG.info(
                    "Adding KRB5 configuration {} to the AM container local resource bucket",
                    krb5.getAbsolutePath());
            final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
            remoteKrb5Path =
                    fileUploader
                            .registerSingleLocalResource(
                                    Utils.KRB5_FILE_NAME,
                                    krb5ConfPath,
                                    "",
                                    LocalResourceType.FILE,
                                    false,
                                    false)
                            .getPath();
            hasKrb5 = true;
        }

        Path remotePathKeytab = null;
        String localizedKeytabPath = null;
        String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
        if (keytab != null) {
            boolean localizeKeytab =
                    flinkConfiguration.getBoolean(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
            localizedKeytabPath =
                    flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
            if (localizeKeytab) {
                // Localize the keytab to YARN containers via local resource.
                LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
                remotePathKeytab =
                        fileUploader
                                .registerSingleLocalResource(
                                        localizedKeytabPath,
                                        new Path(keytab),
                                        "",
                                        LocalResourceType.FILE,
                                        false,
                                        false)
                                .getPath();
            } else {
                // // Assume Keytab is pre-installed in the container.
                localizedKeytabPath =
                        flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
            }
        }

        final JobManagerProcessSpec processSpec =
                JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                        flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);
        final ContainerLaunchContext amContainer =
                setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);

        // setup security tokens
        if (UserGroupInformation.isSecurityEnabled()) {
            // set HDFS delegation tokens when security is enabled
            LOG.info("Adding delegation token to the AM container.");
            final List<Path> pathsToObtainToken = new ArrayList<>();
            boolean fetchToken =
                    configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
            if (fetchToken) {
                List<Path> yarnAccessList =
                        ConfigUtils.decodeListFromConfig(
                                configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
                pathsToObtainToken.addAll(yarnAccessList);
                pathsToObtainToken.addAll(fileUploader.getRemotePaths());
            }
            Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken);
        }

        amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
        fileUploader.close();

        // Setup CLASSPATH and environment variables for ApplicationMaster
        final Map<String, String> appMasterEnv = new HashMap<>();
        // set user specified app master environment variables
        appMasterEnv.putAll(
                ConfigurationUtils.getPrefixedKeyValuePairs(
                        ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
        // set Flink app class path
        appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());

        // Set FLINK_OPT_DIR to `opt` folder under working dir in container
        appMasterEnv.put(
                ENV_FLINK_OPT_DIR, Path.CUR_DIR + "/" + ConfigConstants.DEFAULT_FLINK_OPT_DIR);

        // set Flink on YARN internal configuration values
        appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR, localResourceDescFlinkJar.toString());
        appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
        appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fileUploader.getHomeDir().toString());
        appMasterEnv.put(
                YarnConfigKeys.ENV_CLIENT_SHIP_FILES,
                encodeYarnLocalResourceDescriptorListToString(
                        fileUploader.getEnvShipResourceList()));
        appMasterEnv.put(
                YarnConfigKeys.FLINK_YARN_FILES,
                fileUploader.getApplicationDir().toUri().toString());

        // https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
        appMasterEnv.put(
                YarnConfigKeys.ENV_HADOOP_USER_NAME,
                UserGroupInformation.getCurrentUser().getUserName());

        if (localizedKeytabPath != null) {
            appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
            String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
            appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
            if (remotePathKeytab != null) {
                appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
            }
        }

        // To support Yarn Secure Integration Test Scenario
        if (remoteYarnSiteXmlPath != null) {
            appMasterEnv.put(
                    YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
        }
        if (remoteKrb5Path != null) {
            appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
        }

        // set classpath from YARN configuration
        Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);

        amContainer.setEnvironment(appMasterEnv);

        // Set up resource type requirements for ApplicationMaster
        Resource capability = Records.newRecord(Resource.class);
        capability.setMemory(clusterSpecification.getMasterMemoryMB());
        capability.setVirtualCores(
                flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));

        final String customApplicationName = customName != null ? customName : applicationName;

        appContext.setApplicationName(customApplicationName);
        appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
        appContext.setAMContainerSpec(amContainer);
        appContext.setResource(capability);

        // Set priority for application
        int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);
        if (priorityNum >= 0) {
            Priority priority = Priority.newInstance(priorityNum);
            appContext.setPriority(priority);
        }

        if (yarnQueue != null) {
            appContext.setQueue(yarnQueue);
        }

        setApplicationNodeLabel(appContext);

        setApplicationTags(appContext);

        // add a hook to clean up in case deployment fails
        Thread deploymentFailureHook =
                new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
        Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
        LOG.info("Submitting application master " + appId);
        yarnClient.submitApplication(appContext);

        LOG.info("Waiting for the cluster to be allocated");
        final long startTime = System.currentTimeMillis();
        ApplicationReport report;
        YarnApplicationState lastAppState = YarnApplicationState.NEW;
        loop:
        while (true) {
            try {
                report = yarnClient.getApplicationReport(appId);
            } catch (IOException e) {
                throw new YarnDeploymentException("Failed to deploy the cluster.", e);
            }
            YarnApplicationState appState = report.getYarnApplicationState();
            LOG.debug("Application State: {}", appState);
            switch (appState) {
                case FAILED:
                case KILLED:
                    throw new YarnDeploymentException(
                            "The YARN application unexpectedly switched to state "
                                    + appState
                                    + " during deployment. \n"
                                    + "Diagnostics from YARN: "
                                    + report.getDiagnostics()
                                    + "\n"
                                    + "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
                                    + "yarn logs -applicationId "
                                    + appId);
                    // break ..
                case RUNNING:
                    LOG.info("YARN application has been deployed successfully.");
                    break loop;
                case FINISHED:
                    LOG.info("YARN application has been finished successfully.");
                    break loop;
                default:
                    if (appState != lastAppState) {
                        LOG.info("Deploying cluster, current state " + appState);
                    }
                    if (System.currentTimeMillis() - startTime > 60000) {
                        LOG.info(
                                "Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
                    }
            }
            lastAppState = appState;
            Thread.sleep(250);
        }

        // since deployment was successful, remove the hook
        ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
        return report;
    }
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐