diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 4c78a909a2..0d2e88773c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -231,6 +231,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4062. AM Launcher thread can hang forever (tgraves via bobby) + MAPREDUCE-3988. mapreduce.job.local.dir doesn't point to a single directory + on a node. (Eric Payne via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java index 0687ab6cbd..01b29eaf17 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java @@ -54,6 +54,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.log4j.LogManager; @@ -236,11 +237,33 @@ private static void configureLocalDirs(Task task, JobConf job) throws IOExceptio job.setStrings(MRConfig.LOCAL_DIR, localSysDirs); LOG.info(MRConfig.LOCAL_DIR + " for child: " + job.get(MRConfig.LOCAL_DIR)); LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR); - Path workDir = lDirAlloc.getLocalPathForWrite("work", job); - FileSystem lfs = FileSystem.getLocal(job).getRaw(); - if (!lfs.mkdirs(workDir)) { - throw new IOException("Mkdirs failed to create " - + workDir.toString()); + Path workDir = null; + // First, try to find the JOB_LOCAL_DIR on this host. + try { + workDir = lDirAlloc.getLocalPathToRead("work", job); + } catch (DiskErrorException e) { + // DiskErrorException means dir not found. If not found, it will + // be created below. + } + if (workDir == null) { + // JOB_LOCAL_DIR doesn't exist on this host -- Create it. + workDir = lDirAlloc.getLocalPathForWrite("work", job); + FileSystem lfs = FileSystem.getLocal(job).getRaw(); + boolean madeDir = false; + try { + madeDir = lfs.mkdirs(workDir); + } catch (FileAlreadyExistsException e) { + // Since all tasks will be running in their own JVM, the race condition + // exists where multiple tasks could be trying to create this directory + // at the same time. If this task loses the race, it's okay because + // the directory already exists. + madeDir = true; + workDir = lDirAlloc.getLocalPathToRead("work", job); + } + if (!madeDir) { + throw new IOException("Mkdirs failed to create " + + workDir.toString()); + } } job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString()); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRChildTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRChildTask.java index 2b5ffd4f0d..429bde5f8c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRChildTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRChildTask.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; @@ -228,6 +229,10 @@ public void configure(JobConf job) { // check if X=$(tt's X var):/tmp for an old env variable inherited from // the tt checkEnv("PATH", path + ":/tmp", "noappend"); + + String jobLocalDir = job.get(MRJobConfig.JOB_LOCAL_DIR); + assertNotNull(MRJobConfig.JOB_LOCAL_DIR + " is null", + jobLocalDir); } public void map(WritableComparable key, Writable value,