MAPREDUCE-6825. YARNRunner#createApplicationSubmissionContext method is longer than 150 lines (Contributed by Gergely Novák via Daniel Templeton)

This commit is contained in:
Daniel Templeton 2017-02-22 15:38:11 -08:00
parent d150f061f4
commit 732ee6f0b5

View File

@ -292,7 +292,6 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
addHistoryToken(ts); addHistoryToken(ts);
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext = ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts); createApplicationSubmissionContext(conf, jobSubmitDir, ts);
@ -331,34 +330,15 @@ private LocalResource createApplicationResource(FileContext fs, Path p, LocalRes
return rsrc; return rsrc;
} }
public ApplicationSubmissionContext createApplicationSubmissionContext( private Map<String, LocalResource> setupLocalResources(Configuration jobConf,
Configuration jobConf, String jobSubmitDir) throws IOException {
String jobSubmitDir, Credentials ts) throws IOException { Map<String, LocalResource> localResources = new HashMap<>();
ApplicationId applicationId = resMgrDelegate.getApplicationId();
// Setup resource requirements
Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemorySize(
conf.getInt(
MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
)
);
capability.setVirtualCores(
conf.getInt(
MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
)
);
LOG.debug("AppMaster capability = " + capability);
// Setup LocalResources
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE); Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
URL yarnUrlForJobSubmitDir = URL.fromPath(defaultFileContext.getDefaultFileSystem() URL yarnUrlForJobSubmitDir = URL.fromPath(defaultFileContext
.resolvePath( .getDefaultFileSystem().resolvePath(
defaultFileContext.makeQualified(new Path(jobSubmitDir)))); defaultFileContext.makeQualified(new Path(jobSubmitDir))));
LOG.debug("Creating setup context, jobSubmitDir url is " LOG.debug("Creating setup context, jobSubmitDir url is "
+ yarnUrlForJobSubmitDir); + yarnUrlForJobSubmitDir);
@ -392,13 +372,11 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
new Path(jobSubmitDir, s), LocalResourceType.FILE)); new Path(jobSubmitDir, s), LocalResourceType.FILE));
} }
// Setup security tokens return localResources;
DataOutputBuffer dob = new DataOutputBuffer(); }
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
// Setup the command to run the AM private List<String> setupAMCommand(Configuration jobConf) {
List<String> vargs = new ArrayList<String>(8); List<String> vargs = new ArrayList<>(8);
vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME) vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
+ "/bin/java"); + "/bin/java");
@ -409,14 +387,22 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
MRApps.addLog4jSystemProperties(null, vargs, conf); MRApps.addLog4jSystemProperties(null, vargs, conf);
// Check for Java Lib Path usage in MAP and REDUCE configs // Check for Java Lib Path usage in MAP and REDUCE configs
warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map", warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS, ""),
MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV); "map",
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map", MRJobConfig.MAP_JAVA_OPTS,
MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV); MRJobConfig.MAP_ENV);
warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce", warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, ""),
MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV); "map",
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce", MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV); MRJobConfig.MAPRED_ADMIN_USER_ENV);
warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS, ""),
"reduce",
MRJobConfig.REDUCE_JAVA_OPTS,
MRJobConfig.REDUCE_ENV);
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, ""),
"reduce",
MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
MRJobConfig.MAPRED_ADMIN_USER_ENV);
// Add AM admin command opts before user command opts // Add AM admin command opts before user command opts
// so that it can be overridden by user // so that it can be overridden by user
@ -449,9 +435,14 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
Path.SEPARATOR + ApplicationConstants.STDOUT); Path.SEPARATOR + ApplicationConstants.STDOUT);
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
Path.SEPARATOR + ApplicationConstants.STDERR); Path.SEPARATOR + ApplicationConstants.STDERR);
return vargs;
}
private ContainerLaunchContext setupContainerLaunchContextForAM(
Configuration jobConf, Map<String, LocalResource> localResources,
ByteBuffer securityTokens, List<String> vargs) throws IOException {
Vector<String> vargsFinal = new Vector<String>(8); Vector<String> vargsFinal = new Vector<>(8);
// Final command // Final command
StringBuilder mergedCommand = new StringBuilder(); StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) { for (CharSequence str : vargs) {
@ -464,7 +455,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
// Setup the CLASSPATH in environment // Setup the CLASSPATH in environment
// i.e. add { Hadoop jars, job jar, CWD } to classpath. // i.e. add { Hadoop jars, job jar, CWD } to classpath.
Map<String, String> environment = new HashMap<String, String>(); Map<String, String> environment = new HashMap<>();
MRApps.setClasspath(environment, conf); MRApps.setClasspath(environment, conf);
// Shell // Shell
@ -487,18 +478,58 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
// Parse distributed cache // Parse distributed cache
MRApps.setupDistributedCache(jobConf, localResources); MRApps.setupDistributedCache(jobConf, localResources);
Map<ApplicationAccessType, String> acls Map<ApplicationAccessType, String> acls = new HashMap<>(2);
= new HashMap<ApplicationAccessType, String>(2);
acls.put(ApplicationAccessType.VIEW_APP, jobConf.get( acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(
MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB)); MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get( acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(
MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.JOB_ACL_MODIFY_JOB,
MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB)); MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
return ContainerLaunchContext.newInstance(localResources, environment,
vargsFinal, null, securityTokens, acls);
}
/**
* Constructs all the necessary information to start the MR AM.
* @param jobConf the configuration for the MR job
* @param jobSubmitDir the directory path for the job
* @param ts the security credentials for the job
* @return ApplicationSubmissionContext
* @throws IOException on IO error (e.g. path resolution)
*/
public ApplicationSubmissionContext createApplicationSubmissionContext(
Configuration jobConf, String jobSubmitDir, Credentials ts)
throws IOException {
ApplicationId applicationId = resMgrDelegate.getApplicationId();
// Setup resource requirements
Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemorySize(
conf.getInt(
MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
)
);
capability.setVirtualCores(
conf.getInt(
MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
)
);
LOG.debug("AppMaster capability = " + capability);
// Setup LocalResources
Map<String, LocalResource> localResources =
setupLocalResources(jobConf, jobSubmitDir);
// Setup security tokens
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
// Setup ContainerLaunchContext for AM container // Setup ContainerLaunchContext for AM container
ContainerLaunchContext amContainer = List<String> vargs = setupAMCommand(jobConf);
ContainerLaunchContext.newInstance(localResources, environment, ContainerLaunchContext amContainer = setupContainerLaunchContextForAM(
vargsFinal, null, securityTokens, acls); jobConf, localResources, securityTokens, vargs);
String regex = conf.get(MRJobConfig.MR_JOB_SEND_TOKEN_CONF); String regex = conf.get(MRJobConfig.MR_JOB_SEND_TOKEN_CONF);
if (regex != null && !regex.isEmpty()) { if (regex != null && !regex.isEmpty()) {
@ -566,7 +597,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE); appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
if (tagsFromConf != null && !tagsFromConf.isEmpty()) { if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
appContext.setApplicationTags(new HashSet<String>(tagsFromConf)); appContext.setApplicationTags(new HashSet<>(tagsFromConf));
} }
String jobPriority = jobConf.get(MRJobConfig.PRIORITY); String jobPriority = jobConf.get(MRJobConfig.PRIORITY);