flink1.15源码笔记(run模式简单带过,主要看run-application)
从源码层面去查看flink的运行机制,时间有限,超不定时持续更新中。。。
·
Flink1.15源码
Flink1.15源码
flink入口类(bin/flink)
"org.apache.flink.client.cli.CliFrontend"
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR)找到配置文件路径
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. 加载全局配置
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. 加载命令行参数配置
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
int retCode = 31;
try {
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
// 解析参数并执行
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
} finally {
System.exit(retCode);
}
parseAndRun(args)
// 核对是否有参数
if (args.length < 1) {
CliFrontendParser.printHelp(customCommandLines);
System.out.println("Please specify an action.");
return 1;
}
// 提交模式
String action = args[0];
switch (action) {
case ACTION_RUN: // action为'run',其中有两种提交模式:'-t per-job' 和'-t yarn-session'
run(params);
return 0;
case ACTION_RUN_APPLICATION: // action为'run-application',提交模式为:'-t yarn-application'
runApplication(params);
return 0;
...
}
A、run(per-job&yarn-session)
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
// 添加配置参数:SAVEPOINT_PATH_OPTION,SAVEPOINT_ALLOW_NON_RESTORED_OPTION,SAVEPOINT_RESTORE_MODE
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
// 合并配置参数'commandOptions'和命令行输入的'args'参数
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// evaluate help flag ,"Show the help message for the CLI Frontend or the action."
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
// 验证参数是否可用
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
// 封装参数
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
// 返回可用jar包
final List<URL> jobJars = getJobJarAndDependencies(programOptions);
// 封装所有有效配置 -> Configuration
final Configuration effectiveConfiguration =
getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
executeProgram(effectiveConfiguration, program);
}
}
1、ProgramOptions.create(封装参数)
public static ProgramOptions create(CommandLine line) throws CliArgsException {
if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {
return createPythonProgramOptions(line);
} else {
// 返回项目参数,如 class,jar, parallelism
return new ProgramOptions(line);
}
}
1.1、ProgramOptions
protected ProgramOptions(CommandLine line) throws CliArgsException {
super(line);
...
...
this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line);
}
2、getJobJarAndDependencies(返回可用jar包)
private List<URL> getJobJarAndDependencies(ProgramOptions programOptions)
throws CliArgsException {
// 入口类
String entryPointClass = programOptions.getEntryPointClassName();
// jar包路径
String jarFilePath = programOptions.getJarFilePath();
try {
File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;
return PackagedProgram.getJobJarAndDependencies(jarFile, entryPointClass);
} ...
}
2.1、getJobJarAndDependencies
public static List<URL> getJobJarAndDependencies(
File jarFile, @Nullable String entryPointClassName) throws ProgramInvocationException {
URL jarFileUrl = loadJarFile(jarFile);
List<File> extractedTempLibraries =
jarFileUrl == null
? Collections.emptyList()
: extractContainedLibraries(jarFileUrl);
List<URL> libs = new ArrayList<URL>(extractedTempLibraries.size() + 1);
if (jarFileUrl != null) {
libs.add(jarFileUrl);
}
for (File tmpLib : extractedTempLibraries) {
try {
libs.add(tmpLib.getAbsoluteFile().toURI().toURL());
} catch (MalformedURLException e) {
throw new RuntimeException("URL is invalid. This should not happen.", e);
}
}
if (isPython(entryPointClassName)) {
libs.add(PackagedProgramUtils.getPythonJar());
}
return libs;
}
3、executeProgram
protected void executeProgram(final Configuration configuration, final PackagedProgram program)
throws ProgramInvocationException {
ClientUtils.executeProgram(
new DefaultExecutorServiceLoader(), configuration, program, false, false);
}
3.1、ClientUtils.executeProgram
public static void executeProgram(...)
throws ProgramInvocationException {
...
...
try {
program.invokeInteractiveModeForExecution();
} ...
}
3.2、invokeInteractiveModeForExecution
public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
...
try {
callMainMethod(mainClass, args);
} ...
}
3.3、callMainMethod
private static void callMainMethod(Class<?> entryClass, String[] args)
throws ProgramInvocationException {
Method mainMethod;
...
try {
mainMethod = entryClass.getMethod("main", String[].class);
}
...
try {
// 调用提交任务中jar包的class方法
mainMethod.invoke(null, (Object) args);
}
...
}
B、parseAndRun(run-application)
// package org.apache.flink.client.cli
protected void runApplication(String[] args) throws Exception {
LOG.info("Running 'run-application' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRunApplication(customCommandLines);
return;
}
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
//TODO ApplicationClusterDeployer是继承ApplicationDeployer
// 这里是重点
final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);
final ProgramOptions programOptions;
final Configuration effectiveConfiguration;
// No need to set a jarFile path for Pyflink job.
if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.emptyList());
} else {
programOptions = new ProgramOptions(commandLine);
programOptions.validate();
final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.singletonList(uri.toString()));
}
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
deployer.run(effectiveConfiguration, applicationConfiguration);
}
1、ApplicationClusterDeployer
// package org.apache.flink.client.deployment.application.cli;
public class ApplicationClusterDeployer implements ApplicationDeployer {
private static final Logger LOG = LoggerFactory.getLogger(ApplicationClusterDeployer.class);
private final ClusterClientServiceLoader clientServiceLoader;
public ApplicationClusterDeployer(final ClusterClientServiceLoader clientServiceLoader) {
this.clientServiceLoader = checkNotNull(clientServiceLoader);
}
public <ClusterID> void run(
final Configuration configuration,
final ApplicationConfiguration applicationConfiguration)
throws Exception {
checkNotNull(configuration);
checkNotNull(applicationConfiguration);
LOG.info("Submitting application in 'Application Mode'.");
// 获取ClusterClientFactory,ClusterClientFactory里面包含了clients创建在cluster的一些必要信息
final ClusterClientFactory<ClusterID> clientFactory =
clientServiceLoader.getClusterClientFactory(configuration);
// clusterDescriptor用于在集群部署用于集群通信的客户端的描述器
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clientFactory.createClusterDescriptor(configuration)) {
// 通过上文获取的clusterDescriptor,启动集群任务的一些基本信息
final ClusterSpecification clusterSpecification =
clientFactory.getClusterSpecification(configuration);
// 启动任务
clusterDescriptor.deployApplicationCluster(
clusterSpecification, applicationConfiguration);
}
}
}
1.1、ApplicationDeployer(注意看注释信息)
// package org.apache.flink.client.deployment;
@Internal
public interface ApplicationDeployer {
/**
* Submits a user program for execution and runs the main user method on the cluster.
*
* @param configuration the configuration containing all the necessary information about
* submitting the user program.
* @param applicationConfiguration an {@link ApplicationConfiguration} specific to the
* application to be executed.
*/
<ClusterID> void run(
final Configuration configuration,
final ApplicationConfiguration applicationConfiguration)
throws Exception;
}
1.2、ClusterSpecification
// package org.apache.flink.client.deployment;
public final class ClusterSpecification {
private final int masterMemoryMB;
private final int taskManagerMemoryMB;
private final int slotsPerTaskManager;
private ClusterSpecification(
int masterMemoryMB, int taskManagerMemoryMB, int slotsPerTaskManager) {
this.masterMemoryMB = masterMemoryMB;
this.taskManagerMemoryMB = taskManagerMemoryMB;
this.slotsPerTaskManager = slotsPerTaskManager;
}
public int getMasterMemoryMB() {
return masterMemoryMB;
}
public int getTaskManagerMemoryMB() {
return taskManagerMemoryMB;
}
public int getSlotsPerTaskManager() {
return slotsPerTaskManager;
}
@Override
public String toString() {
return "ClusterSpecification{"
+ "masterMemoryMB="
+ masterMemoryMB
+ ", taskManagerMemoryMB="
+ taskManagerMemoryMB
+ ", slotsPerTaskManager="
+ slotsPerTaskManager
+ '}';
}
/** Builder for the {@link ClusterSpecification} instance. */
public static class ClusterSpecificationBuilder {
private int masterMemoryMB = 768;
private int taskManagerMemoryMB = 1024;
private int slotsPerTaskManager = 1;
public ClusterSpecificationBuilder setMasterMemoryMB(int masterMemoryMB) {
this.masterMemoryMB = masterMemoryMB;
return this;
}
public ClusterSpecificationBuilder setTaskManagerMemoryMB(int taskManagerMemoryMB) {
this.taskManagerMemoryMB = taskManagerMemoryMB;
return this;
}
public ClusterSpecificationBuilder setSlotsPerTaskManager(int slotsPerTaskManager) {
this.slotsPerTaskManager = slotsPerTaskManager;
return this;
}
public ClusterSpecification createClusterSpecification() {
return new ClusterSpecification(
masterMemoryMB, taskManagerMemoryMB, slotsPerTaskManager);
}
}
}
1.3、clusterDescriptor.deployApplicationCluster
// package org.apache.flink.yarn;
@Override
public ClusterClientProvider<ApplicationId> deployApplicationCluster(
final ClusterSpecification clusterSpecification,
final ApplicationConfiguration applicationConfiguration)
throws ClusterDeploymentException {
checkNotNull(clusterSpecification);
checkNotNull(applicationConfiguration);
final YarnDeploymentTarget deploymentTarget =
YarnDeploymentTarget.fromConfig(flinkConfiguration);
if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
throw new ClusterDeploymentException(
"Couldn't deploy Yarn Application Cluster."
+ " Expected deployment.target="
+ YarnDeploymentTarget.APPLICATION.getName()
+ " but actual one was \""
+ deploymentTarget.getName()
+ "\"");
}
applicationConfiguration.applyToConfiguration(flinkConfiguration);
// No need to do pipelineJars validation if it is a PyFlink job.
if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName())
|| PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) {
final List<String> pipelineJars =
flinkConfiguration
.getOptional(PipelineOptions.JARS)
.orElse(Collections.emptyList());
Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
}
try {
return deployInternal(
clusterSpecification,
"Flink Application Cluster",
YarnApplicationClusterEntryPoint.class.getName(),
null,
false);
} catch (Exception e) {
throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", e);
}
}
1.3.1、return deployInternal(…)
// package org.apache.flink.yarn;
private ClusterClientProvider<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached)
throws Exception {
...
isReadyForDeployment(clusterSpecification);
// ------------------ Check if the specified queue exists --------------------
checkYarnQueues(yarnClient);
// ------------------ Check if the YARN ClusterClient has the requested resources
// 向yarn的resourcemanager(以下直接说成yarn)申请注册application
final YarnClientApplication yarnApplication = yarnClient.createApplication();
// yarn返回注册请求信息(资源等信息)
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
// 分配的最大资源
Resource maxRes = appResponse.getMaximumResourceCapability();
final ClusterResourceDescription freeClusterMem;
try {
// 查看集群资源信息: 1.3.1.1
freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch (YarnException | IOException e) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException(
"Could not retrieve information about free cluster resources.", e);
}
...
final ClusterSpecification validClusterSpecification;
// 这里就确定运行任务的资源了 master,taskManager
try {
validClusterSpecification =
validateClusterResources(
clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
} catch (YarnDeploymentException yde) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw yde;
}
...
// 启动AppMaster
ApplicationReport report =
startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
...
}
1.3.1.1 getCurrentFreeClusterResources
// package org.apache.flink.yarn;
private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient)
throws YarnException, IOException {
// 可用节点
List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
int totalFreeMemory = 0;
int containerLimit = 0;
int[] nodeManagersFree = new int[nodes.size()];
// 从节点获取内存资源信息
for (int i = 0; i < nodes.size(); i++) {
NodeReport rep = nodes.get(i);
int free =
rep.getCapability().getMemory()
- (rep.getUsed() != null ? rep.getUsed().getMemory() : 0);
nodeManagersFree[i] = free;
totalFreeMemory += free;
if (free > containerLimit) {
containerLimit = free;
}
}
// 返回总空闲内存资源,最大可用单节点资源,各节点可用资源
return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
}
1.3.1.2 startAppMaster(很长。慢慢看)
//package org.apache.flink.yarn;
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification)
throws Exception {
...
final FileSystem fs = FileSystem.get(yarnConfiguration);
...
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
final List<Path> providedLibDirs =
Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration);
Path stagingDirPath = getStagingDir(fs);
FileSystem stagingDirFs = stagingDirPath.getFileSystem(yarnConfiguration);
final YarnApplicationFileUploader fileUploader =
YarnApplicationFileUploader.from(
stagingDirFs,
stagingDirPath,
providedLibDirs,
appContext.getApplicationId(),
getFileReplication());
// The files need to be shipped and added to classpath.
Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
for (File file : shipFiles) {
systemShipFiles.add(file.getAbsoluteFile());
}
final String logConfigFilePath =
configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
if (logConfigFilePath != null) {
systemShipFiles.add(new File(logConfigFilePath));
}
// Set-up ApplicationSubmissionContext for the application
final ApplicationId appId = appContext.getApplicationId();
// ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
setHAClusterIdIfNotSet(configuration, appId);
if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
// activate re-execution of failed applications
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
activateHighAvailabilitySupport(appContext);
} else {
// set number of application retries to 1 in the default case
appContext.setMaxAppAttempts(
configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
}
final Set<Path> userJarFiles = new HashSet<>();
if (jobGraph != null) {
userJarFiles.addAll(
jobGraph.getUserJars().stream()
.map(f -> f.toUri())
.map(Path::new)
.collect(Collectors.toSet()));
}
final List<URI> jarUrls =
ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
if (jarUrls != null
&& YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
}
// only for per job mode
if (jobGraph != null) {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
jobGraph.getUserArtifacts().entrySet()) {
// only upload local files
if (!Utils.isRemotePath(entry.getValue().filePath)) {
Path localPath = new Path(entry.getValue().filePath);
Tuple2<Path, Long> remoteFileInfo =
fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
jobGraph.setUserArtifactRemotePath(
entry.getKey(), remoteFileInfo.f0.toString());
}
}
jobGraph.writeUserArtifactEntriesToConfiguration();
}
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
addLibFoldersToShipFiles(systemShipFiles);
}
// Register all files in provided lib dirs as local resources with public visibility
// and upload the remaining dependencies as local resources with APPLICATION visibility.
final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
final List<String> uploadedDependencies =
fileUploader.registerMultipleLocalResources(
systemShipFiles.stream()
.map(e -> new Path(e.toURI()))
.collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
systemClassPaths.addAll(uploadedDependencies);
// upload and register ship-only files
// Plugin files only need to be shipped and should not be added to classpath.
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
Set<File> shipOnlyFiles = new HashSet<>();
addPluginsFoldersToShipFiles(shipOnlyFiles);
fileUploader.registerMultipleLocalResources(
shipOnlyFiles.stream()
.map(e -> new Path(e.toURI()))
.collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
}
if (!shipArchives.isEmpty()) {
fileUploader.registerMultipleLocalResources(
shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.ARCHIVE);
}
// only for application mode
// Python jar file only needs to be shipped and should not be added to classpath.
if (YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)
&& PackagedProgramUtils.isPython(configuration.get(APPLICATION_MAIN_CLASS))) {
fileUploader.registerMultipleLocalResources(
Collections.singletonList(
new Path(PackagedProgramUtils.getPythonJar().toURI())),
ConfigConstants.DEFAULT_FLINK_OPT_DIR,
LocalResourceType.FILE);
}
// Upload and register user jars
final List<String> userClassPaths =
fileUploader.registerMultipleLocalResources(
userJarFiles,
userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
: Path.CUR_DIR,
LocalResourceType.FILE);
// usrlib will be automatically shipped if it exists.
if (ClusterEntrypointUtils.tryFindUserLibDirectory().isPresent()) {
final Set<File> usrLibShipFiles = new HashSet<>();
addUsrLibFolderToShipFiles(usrLibShipFiles);
final List<String> usrLibClassPaths =
fileUploader.registerMultipleLocalResources(
usrLibShipFiles.stream()
.map(e -> new Path(e.toURI()))
.collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
userClassPaths.addAll(usrLibClassPaths);
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
}
// normalize classpath by sorting
Collections.sort(systemClassPaths);
Collections.sort(userClassPaths);
// classpath assembler
StringBuilder classPathBuilder = new StringBuilder();
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
for (String classPath : systemClassPaths) {
classPathBuilder.append(classPath).append(File.pathSeparator);
}
// Setup jar for ApplicationMaster
final YarnLocalResourceDescriptor localResourceDescFlinkJar =
fileUploader.uploadFlinkDist(flinkJarPath);
classPathBuilder
.append(localResourceDescFlinkJar.getResourceKey())
.append(File.pathSeparator);
// write job graph to tmp file and add it to local resource
// TODO: server use user main method to generate job graph
if (jobGraph != null) {
File tmpJobGraphFile = null;
try {
tmpJobGraphFile = File.createTempFile(appId.toString(), null);
try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
obOutput.writeObject(jobGraph);
}
final String jobGraphFilename = "job.graph";
configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);
fileUploader.registerSingleLocalResource(
jobGraphFilename,
new Path(tmpJobGraphFile.toURI()),
"",
LocalResourceType.FILE,
true,
false);
classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
} catch (Exception e) {
LOG.warn("Add job graph to local resource fail.");
throw e;
} finally {
if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
}
}
}
// Upload the flink configuration
// write out configuration file
File tmpConfigurationFile = null;
try {
tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
// remove localhost bind hosts as they render production clusters unusable
removeLocalhostBindHostSetting(configuration, JobManagerOptions.BIND_HOST);
removeLocalhostBindHostSetting(configuration, TaskManagerOptions.BIND_HOST);
// this setting is unconditionally overridden anyway, so we remove it for clarity
configuration.removeConfig(TaskManagerOptions.HOST);
BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);
String flinkConfigKey = "flink-conf.yaml";
fileUploader.registerSingleLocalResource(
flinkConfigKey,
new Path(tmpConfigurationFile.getAbsolutePath()),
"",
LocalResourceType.FILE,
true,
true);
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
} finally {
if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
}
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
// To support Yarn Secure Integration Test Scenario
// In Integration test setup, the Yarn containers created by YarnMiniCluster does not have
// the Yarn site XML
// and KRB5 configuration files. We are adding these files as container local resources for
// the container
// applications (JM/TMs) to have proper secure cluster setup
Path remoteYarnSiteXmlPath = null;
if (System.getenv("IN_TESTS") != null) {
File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
LOG.info(
"Adding Yarn configuration {} to the AM container local resource bucket",
f.getAbsolutePath());
Path yarnSitePath = new Path(f.getAbsolutePath());
remoteYarnSiteXmlPath =
fileUploader
.registerSingleLocalResource(
Utils.YARN_SITE_FILE_NAME,
yarnSitePath,
"",
LocalResourceType.FILE,
false,
false)
.getPath();
if (System.getProperty("java.security.krb5.conf") != null) {
configuration.set(
SecurityOptions.KERBEROS_KRB5_PATH,
System.getProperty("java.security.krb5.conf"));
}
}
Path remoteKrb5Path = null;
boolean hasKrb5 = false;
String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
final File krb5 = new File(krb5Config);
LOG.info(
"Adding KRB5 configuration {} to the AM container local resource bucket",
krb5.getAbsolutePath());
final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
remoteKrb5Path =
fileUploader
.registerSingleLocalResource(
Utils.KRB5_FILE_NAME,
krb5ConfPath,
"",
LocalResourceType.FILE,
false,
false)
.getPath();
hasKrb5 = true;
}
Path remotePathKeytab = null;
String localizedKeytabPath = null;
String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if (keytab != null) {
boolean localizeKeytab =
flinkConfiguration.getBoolean(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
localizedKeytabPath =
flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
if (localizeKeytab) {
// Localize the keytab to YARN containers via local resource.
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
remotePathKeytab =
fileUploader
.registerSingleLocalResource(
localizedKeytabPath,
new Path(keytab),
"",
LocalResourceType.FILE,
false,
false)
.getPath();
} else {
// // Assume Keytab is pre-installed in the container.
localizedKeytabPath =
flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
}
}
final JobManagerProcessSpec processSpec =
JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);
final ContainerLaunchContext amContainer =
setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);
// setup security tokens
if (UserGroupInformation.isSecurityEnabled()) {
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container.");
final List<Path> pathsToObtainToken = new ArrayList<>();
boolean fetchToken =
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
if (fetchToken) {
List<Path> yarnAccessList =
ConfigUtils.decodeListFromConfig(
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
pathsToObtainToken.addAll(yarnAccessList);
pathsToObtainToken.addAll(fileUploader.getRemotePaths());
}
Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken);
}
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
fileUploader.close();
// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv = new HashMap<>();
// set user specified app master environment variables
appMasterEnv.putAll(
ConfigurationUtils.getPrefixedKeyValuePairs(
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
// set Flink app class path
appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());
// Set FLINK_OPT_DIR to `opt` folder under working dir in container
appMasterEnv.put(
ENV_FLINK_OPT_DIR, Path.CUR_DIR + "/" + ConfigConstants.DEFAULT_FLINK_OPT_DIR);
// set Flink on YARN internal configuration values
appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR, localResourceDescFlinkJar.toString());
appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fileUploader.getHomeDir().toString());
appMasterEnv.put(
YarnConfigKeys.ENV_CLIENT_SHIP_FILES,
encodeYarnLocalResourceDescriptorListToString(
fileUploader.getEnvShipResourceList()));
appMasterEnv.put(
YarnConfigKeys.FLINK_YARN_FILES,
fileUploader.getApplicationDir().toUri().toString());
// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
appMasterEnv.put(
YarnConfigKeys.ENV_HADOOP_USER_NAME,
UserGroupInformation.getCurrentUser().getUserName());
if (localizedKeytabPath != null) {
appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
if (remotePathKeytab != null) {
appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
}
}
// To support Yarn Secure Integration Test Scenario
if (remoteYarnSiteXmlPath != null) {
appMasterEnv.put(
YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
}
if (remoteKrb5Path != null) {
appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
}
// set classpath from YARN configuration
Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);
amContainer.setEnvironment(appMasterEnv);
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(clusterSpecification.getMasterMemoryMB());
capability.setVirtualCores(
flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));
final String customApplicationName = customName != null ? customName : applicationName;
appContext.setApplicationName(customApplicationName);
appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
// Set priority for application
int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);
if (priorityNum >= 0) {
Priority priority = Priority.newInstance(priorityNum);
appContext.setPriority(priority);
}
if (yarnQueue != null) {
appContext.setQueue(yarnQueue);
}
setApplicationNodeLabel(appContext);
setApplicationTags(appContext);
// add a hook to clean up in case deployment fails
Thread deploymentFailureHook =
new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);
LOG.info("Waiting for the cluster to be allocated");
final long startTime = System.currentTimeMillis();
ApplicationReport report;
YarnApplicationState lastAppState = YarnApplicationState.NEW;
loop:
while (true) {
try {
report = yarnClient.getApplicationReport(appId);
} catch (IOException e) {
throw new YarnDeploymentException("Failed to deploy the cluster.", e);
}
YarnApplicationState appState = report.getYarnApplicationState();
LOG.debug("Application State: {}", appState);
switch (appState) {
case FAILED:
case KILLED:
throw new YarnDeploymentException(
"The YARN application unexpectedly switched to state "
+ appState
+ " during deployment. \n"
+ "Diagnostics from YARN: "
+ report.getDiagnostics()
+ "\n"
+ "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
+ "yarn logs -applicationId "
+ appId);
// break ..
case RUNNING:
LOG.info("YARN application has been deployed successfully.");
break loop;
case FINISHED:
LOG.info("YARN application has been finished successfully.");
break loop;
default:
if (appState != lastAppState) {
LOG.info("Deploying cluster, current state " + appState);
}
if (System.currentTimeMillis() - startTime > 60000) {
LOG.info(
"Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
}
}
lastAppState = appState;
Thread.sleep(250);
}
// since deployment was successful, remove the hook
ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
return report;
}
更多推荐
已为社区贡献1条内容
所有评论(0)