MAPREDUCE-3988. mapreduce.job.local.dir doesn't point to a single directory on a node. (Eric Payne via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1309086 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
20184e7ddc
commit
b45c450026
@ -231,6 +231,9 @@ Release 0.23.3 - UNRELEASED
|
|||||||
|
|
||||||
MAPREDUCE-4062. AM Launcher thread can hang forever (tgraves via bobby)
|
MAPREDUCE-4062. AM Launcher thread can hang forever (tgraves via bobby)
|
||||||
|
|
||||||
|
MAPREDUCE-3988. mapreduce.job.local.dir doesn't point to a single directory
|
||||||
|
on a node. (Eric Payne via bobby)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -54,6 +54,7 @@
|
|||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
@ -236,12 +237,34 @@ private static void configureLocalDirs(Task task, JobConf job) throws IOExceptio
|
|||||||
job.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
|
job.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
|
||||||
LOG.info(MRConfig.LOCAL_DIR + " for child: " + job.get(MRConfig.LOCAL_DIR));
|
LOG.info(MRConfig.LOCAL_DIR + " for child: " + job.get(MRConfig.LOCAL_DIR));
|
||||||
LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
|
LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
|
||||||
Path workDir = lDirAlloc.getLocalPathForWrite("work", job);
|
Path workDir = null;
|
||||||
|
// First, try to find the JOB_LOCAL_DIR on this host.
|
||||||
|
try {
|
||||||
|
workDir = lDirAlloc.getLocalPathToRead("work", job);
|
||||||
|
} catch (DiskErrorException e) {
|
||||||
|
// DiskErrorException means dir not found. If not found, it will
|
||||||
|
// be created below.
|
||||||
|
}
|
||||||
|
if (workDir == null) {
|
||||||
|
// JOB_LOCAL_DIR doesn't exist on this host -- Create it.
|
||||||
|
workDir = lDirAlloc.getLocalPathForWrite("work", job);
|
||||||
FileSystem lfs = FileSystem.getLocal(job).getRaw();
|
FileSystem lfs = FileSystem.getLocal(job).getRaw();
|
||||||
if (!lfs.mkdirs(workDir)) {
|
boolean madeDir = false;
|
||||||
|
try {
|
||||||
|
madeDir = lfs.mkdirs(workDir);
|
||||||
|
} catch (FileAlreadyExistsException e) {
|
||||||
|
// Since all tasks will be running in their own JVM, the race condition
|
||||||
|
// exists where multiple tasks could be trying to create this directory
|
||||||
|
// at the same time. If this task loses the race, it's okay because
|
||||||
|
// the directory already exists.
|
||||||
|
madeDir = true;
|
||||||
|
workDir = lDirAlloc.getLocalPathToRead("work", job);
|
||||||
|
}
|
||||||
|
if (!madeDir) {
|
||||||
throw new IOException("Mkdirs failed to create "
|
throw new IOException("Mkdirs failed to create "
|
||||||
+ workDir.toString());
|
+ workDir.toString());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString());
|
job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,6 +41,7 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapred.lib.IdentityReducer;
|
import org.apache.hadoop.mapred.lib.IdentityReducer;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
||||||
@ -228,6 +229,10 @@ public void configure(JobConf job) {
|
|||||||
// check if X=$(tt's X var):/tmp for an old env variable inherited from
|
// check if X=$(tt's X var):/tmp for an old env variable inherited from
|
||||||
// the tt
|
// the tt
|
||||||
checkEnv("PATH", path + ":/tmp", "noappend");
|
checkEnv("PATH", path + ":/tmp", "noappend");
|
||||||
|
|
||||||
|
String jobLocalDir = job.get(MRJobConfig.JOB_LOCAL_DIR);
|
||||||
|
assertNotNull(MRJobConfig.JOB_LOCAL_DIR + " is null",
|
||||||
|
jobLocalDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void map(WritableComparable key, Writable value,
|
public void map(WritableComparable key, Writable value,
|
||||||
|
Loading…
Reference in New Issue
Block a user