diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 98fe5535cf..228c6af3b4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -291,8 +291,7 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { addHistoryToken(ts); - - // Construct necessary information to start the MR AM + ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); @@ -331,34 +330,15 @@ private LocalResource createApplicationResource(FileContext fs, Path p, LocalRes return rsrc; } - public ApplicationSubmissionContext createApplicationSubmissionContext( - Configuration jobConf, - String jobSubmitDir, Credentials ts) throws IOException { - ApplicationId applicationId = resMgrDelegate.getApplicationId(); - - // Setup resource requirements - Resource capability = recordFactory.newRecordInstance(Resource.class); - capability.setMemorySize( - conf.getInt( - MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB - ) - ); - capability.setVirtualCores( - conf.getInt( - MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES - ) - ); - LOG.debug("AppMaster capability = " + capability); - - // Setup LocalResources - Map localResources = - new HashMap(); + private Map setupLocalResources(Configuration jobConf, + String jobSubmitDir) throws IOException { + Map localResources = new HashMap<>(); Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE); - URL yarnUrlForJobSubmitDir = URL.fromPath(defaultFileContext.getDefaultFileSystem() - .resolvePath( - defaultFileContext.makeQualified(new Path(jobSubmitDir)))); + URL yarnUrlForJobSubmitDir = URL.fromPath(defaultFileContext + .getDefaultFileSystem().resolvePath( + defaultFileContext.makeQualified(new Path(jobSubmitDir)))); LOG.debug("Creating setup context, jobSubmitDir url is " + yarnUrlForJobSubmitDir); @@ -371,7 +351,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( FileContext.getFileContext(jobJarPath.toUri(), jobConf), jobJarPath, LocalResourceType.PATTERN); - String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, + String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); rc.setPattern(pattern); localResources.put(MRJobConfig.JOB_JAR, rc); @@ -392,13 +372,11 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( new Path(jobSubmitDir, s), LocalResourceType.FILE)); } - // Setup security tokens - DataOutputBuffer dob = new DataOutputBuffer(); - ts.writeTokenStorageToStream(dob); - ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + return localResources; + } - // Setup the command to run the AM - List vargs = new ArrayList(8); + private List setupAMCommand(Configuration jobConf) { + List vargs = new ArrayList<>(8); vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME) + "/bin/java"); @@ -409,27 +387,35 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( MRApps.addLog4jSystemProperties(null, vargs, conf); // Check for Java Lib Path usage in MAP and REDUCE configs - warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map", - MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV); - warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map", - MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV); - warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce", - MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV); - warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce", - MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV); + warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS, ""), + "map", + MRJobConfig.MAP_JAVA_OPTS, + MRJobConfig.MAP_ENV); + warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, ""), + "map", + MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, + MRJobConfig.MAPRED_ADMIN_USER_ENV); + warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS, ""), + "reduce", + MRJobConfig.REDUCE_JAVA_OPTS, + MRJobConfig.REDUCE_ENV); + warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, ""), + "reduce", + MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, + MRJobConfig.MAPRED_ADMIN_USER_ENV); // Add AM admin command opts before user command opts // so that it can be overridden by user String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS); - warnForJavaLibPath(mrAppMasterAdminOptions, "app master", + warnForJavaLibPath(mrAppMasterAdminOptions, "app master", MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV); vargs.add(mrAppMasterAdminOptions); - + // Add AM user command opts String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS); - warnForJavaLibPath(mrAppMasterUserOptions, "app master", + warnForJavaLibPath(mrAppMasterUserOptions, "app master", MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV); vargs.add(mrAppMasterUserOptions); @@ -449,9 +435,14 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( Path.SEPARATOR + ApplicationConstants.STDOUT); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDERR); + return vargs; + } + private ContainerLaunchContext setupContainerLaunchContextForAM( + Configuration jobConf, Map localResources, + ByteBuffer securityTokens, List vargs) throws IOException { - Vector vargsFinal = new Vector(8); + Vector vargsFinal = new Vector<>(8); // Final command StringBuilder mergedCommand = new StringBuilder(); for (CharSequence str : vargs) { @@ -464,7 +455,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( // Setup the CLASSPATH in environment // i.e. add { Hadoop jars, job jar, CWD } to classpath. - Map environment = new HashMap(); + Map environment = new HashMap<>(); MRApps.setClasspath(environment, conf); // Shell @@ -477,28 +468,68 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf); // Setup the environment variables for Admin first - MRApps.setEnvFromInputString(environment, + MRApps.setEnvFromInputString(environment, conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV, MRJobConfig.DEFAULT_MR_AM_ADMIN_USER_ENV), conf); // Setup the environment variables (LD_LIBRARY_PATH, etc) - MRApps.setEnvFromInputString(environment, + MRApps.setEnvFromInputString(environment, conf.get(MRJobConfig.MR_AM_ENV), conf); // Parse distributed cache MRApps.setupDistributedCache(jobConf, localResources); - Map acls - = new HashMap(2); + Map acls = new HashMap<>(2); acls.put(ApplicationAccessType.VIEW_APP, jobConf.get( MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB)); acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get( MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB)); + return ContainerLaunchContext.newInstance(localResources, environment, + vargsFinal, null, securityTokens, acls); + } + + /** + * Constructs all the necessary information to start the MR AM. + * @param jobConf the configuration for the MR job + * @param jobSubmitDir the directory path for the job + * @param ts the security credentials for the job + * @return ApplicationSubmissionContext + * @throws IOException on IO error (e.g. path resolution) + */ + public ApplicationSubmissionContext createApplicationSubmissionContext( + Configuration jobConf, String jobSubmitDir, Credentials ts) + throws IOException { + ApplicationId applicationId = resMgrDelegate.getApplicationId(); + + // Setup resource requirements + Resource capability = recordFactory.newRecordInstance(Resource.class); + capability.setMemorySize( + conf.getInt( + MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB + ) + ); + capability.setVirtualCores( + conf.getInt( + MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES + ) + ); + LOG.debug("AppMaster capability = " + capability); + + // Setup LocalResources + Map localResources = + setupLocalResources(jobConf, jobSubmitDir); + + // Setup security tokens + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + // Setup ContainerLaunchContext for AM container - ContainerLaunchContext amContainer = - ContainerLaunchContext.newInstance(localResources, environment, - vargsFinal, null, securityTokens, acls); + List vargs = setupAMCommand(jobConf); + ContainerLaunchContext amContainer = setupContainerLaunchContextForAM( + jobConf, localResources, securityTokens, vargs); String regex = conf.get(MRJobConfig.MR_JOB_SEND_TOKEN_CONF); if (regex != null && !regex.isEmpty()) { @@ -566,7 +597,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE); if (tagsFromConf != null && !tagsFromConf.isEmpty()) { - appContext.setApplicationTags(new HashSet(tagsFromConf)); + appContext.setApplicationTags(new HashSet<>(tagsFromConf)); } String jobPriority = jobConf.get(MRJobConfig.PRIORITY);