MAPREDUCE-5960. JobSubmitter's check whether job.jar is local is incorrect with no authority in job jar path. Contributed by Gera Shegalov

This commit is contained in:
Jason Lowe 2014-11-06 15:10:40 +00:00
parent 80d7d183cd
commit 10f9f5101c
7 changed files with 56 additions and 53 deletions

View File

@ -430,6 +430,9 @@ public static FileContext getFileContext(final URI defaultFsUri,
final Configuration aConf) throws UnsupportedFileSystemException { final Configuration aConf) throws UnsupportedFileSystemException {
UserGroupInformation currentUser = null; UserGroupInformation currentUser = null;
AbstractFileSystem defaultAfs = null; AbstractFileSystem defaultAfs = null;
if (defaultFsUri.getScheme() == null) {
return getFileContext(aConf);
}
try { try {
currentUser = UserGroupInformation.getCurrentUser(); currentUser = UserGroupInformation.getCurrentUser();
defaultAfs = getAbstractFileSystem(currentUser, defaultFsUri, aConf); defaultAfs = getAbstractFileSystem(currentUser, defaultFsUri, aConf);

View File

@ -459,6 +459,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-6048. Fixed TestJavaSerialization failure. (Varun Vasudev via MAPREDUCE-6048. Fixed TestJavaSerialization failure. (Varun Vasudev via
jianhe) jianhe)
MAPREDUCE-5960. JobSubmitter's check whether job.jar is local is incorrect
with no authority in job jar path. (Gera Shegalov via jlowe)
Release 2.5.2 - UNRELEASED Release 2.5.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -657,9 +657,11 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
// //////////// Set up JobJar to be localized properly on the remote NM. // //////////// Set up JobJar to be localized properly on the remote NM.
String jobJar = conf.get(MRJobConfig.JAR); String jobJar = conf.get(MRJobConfig.JAR);
if (jobJar != null) { if (jobJar != null) {
Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS final Path jobJarPath = new Path(jobJar);
.getUri(), remoteFS.getWorkingDirectory()); final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf);
LocalResource rc = createLocalResource(remoteFS, remoteJobJar, Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(),
jobJarFs.getWorkingDirectory());
LocalResource rc = createLocalResource(jobJarFs, remoteJobJar,
LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION); LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION);
String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();

View File

@ -250,11 +250,10 @@ private void copyAndConfigureFiles(Job job, Path submitJobDir,
} }
Path jobJarPath = new Path(jobJar); Path jobJarPath = new Path(jobJar);
URI jobJarURI = jobJarPath.toUri(); URI jobJarURI = jobJarPath.toUri();
// If the job jar is already in fs, we don't need to copy it from local fs // If the job jar is already in a global fs,
if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null // we don't need to copy it from local fs
|| !(jobJarURI.getScheme().equals(jtFs.getUri().getScheme()) if ( jobJarURI.getScheme() == null
&& jobJarURI.getAuthority().equals( || jobJarURI.getScheme().equals("file")) {
jtFs.getUri().getAuthority()))) {
copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir), copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
replication); replication);
job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString()); job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());

View File

@ -357,8 +357,9 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
jobConfPath, LocalResourceType.FILE)); jobConfPath, LocalResourceType.FILE));
if (jobConf.get(MRJobConfig.JAR) != null) { if (jobConf.get(MRJobConfig.JAR) != null) {
Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR)); Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
LocalResource rc = createApplicationResource(defaultFileContext, LocalResource rc = createApplicationResource(
jobJarPath, FileContext.getFileContext(jobJarPath.toUri(), jobConf),
jobJarPath,
LocalResourceType.PATTERN); LocalResourceType.PATTERN);
String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.FailingMapper; import org.apache.hadoop.FailingMapper;
import org.apache.hadoop.RandomTextWriterJob; import org.apache.hadoop.RandomTextWriterJob;
import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat; import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat;
import org.apache.hadoop.fs.viewfs.ConfigUtil;
import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.mapreduce.SleepJob.SleepMapper; import org.apache.hadoop.mapreduce.SleepJob.SleepMapper;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -89,6 +90,7 @@
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ApplicationClassLoader; import org.apache.hadoop.util.ApplicationClassLoader;
import org.apache.hadoop.util.ClassUtil;
import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -97,6 +99,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -111,6 +114,9 @@ public class TestMRJobs {
private static final String TEST_IO_SORT_MB = "11"; private static final String TEST_IO_SORT_MB = "11";
private static final String TEST_GROUP_MAX = "200"; private static final String TEST_GROUP_MAX = "200";
private static final int DEFAULT_REDUCES = 2;
protected int numSleepReducers = DEFAULT_REDUCES;
protected static MiniMRYarnCluster mrCluster; protected static MiniMRYarnCluster mrCluster;
protected static MiniDFSCluster dfsCluster; protected static MiniDFSCluster dfsCluster;
@ -175,10 +181,23 @@ public static void tearDown() {
} }
} }
@After
public void resetInit() {
numSleepReducers = DEFAULT_REDUCES;
}
@Test (timeout = 300000) @Test (timeout = 300000)
public void testSleepJob() throws IOException, InterruptedException, public void testSleepJob() throws Exception {
ClassNotFoundException { testSleepJobInternal(false);
LOG.info("\n\n\nStarting testSleepJob()."); }
@Test (timeout = 300000)
public void testSleepJobWithRemoteJar() throws Exception {
testSleepJobInternal(true);
}
private void testSleepJobInternal(boolean useRemoteJar) throws Exception {
LOG.info("\n\n\nStarting testSleepJob: useRemoteJar=" + useRemoteJar);
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@ -192,14 +211,20 @@ public void testSleepJob() throws IOException, InterruptedException,
SleepJob sleepJob = new SleepJob(); SleepJob sleepJob = new SleepJob();
sleepJob.setConf(sleepConf); sleepJob.setConf(sleepConf);
int numReduces = sleepConf.getInt("TestMRJobs.testSleepJob.reduces", 2); // or sleepConf.getConfig().getInt(MRJobConfig.NUM_REDUCES, 2);
// job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each: // job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each:
Job job = sleepJob.createJob(3, numReduces, 10000, 1, 5000, 1); Job job = sleepJob.createJob(3, numSleepReducers, 10000, 1, 5000, 1);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(SleepJob.class); if (useRemoteJar) {
final Path localJar = new Path(
ClassUtil.findContainingJar(SleepJob.class));
ConfigUtil.addLink(job.getConfiguration(), "/jobjars",
localFs.makeQualified(localJar.getParent()).toUri());
job.setJar("viewfs:///jobjars/" + localJar.getName());
} else {
job.setJarByClass(SleepJob.class);
}
job.setMaxMapAttempts(1); // speed up failures job.setMaxMapAttempts(1); // speed up failures
job.submit(); job.submit();
String trackingUrl = job.getTrackingURL(); String trackingUrl = job.getTrackingURL();
@ -381,7 +406,7 @@ protected void verifySleepJobCounters(Job job) throws InterruptedException,
.getValue()); .getValue());
Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
.getValue()); .getValue());
Assert.assertEquals(2, Assert.assertEquals(numSleepReducers,
counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue()); counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
Assert Assert
.assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null

View File

@ -20,7 +20,6 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -40,8 +39,7 @@
public class TestUberAM extends TestMRJobs { public class TestUberAM extends TestMRJobs {
private static final Log LOG = LogFactory.getLog(TestUberAM.class); private static final Log LOG = LogFactory.getLog(TestUberAM.class);
private int numSleepReducers;
@BeforeClass @BeforeClass
public static void setup() throws IOException { public static void setup() throws IOException {
TestMRJobs.setup(); TestMRJobs.setup();
@ -54,21 +52,15 @@ public static void setup() throws IOException {
@Override @Override
@Test @Test
public void testSleepJob() public void testSleepJob()
throws IOException, InterruptedException, ClassNotFoundException { throws Exception {
numSleepReducers = 1; numSleepReducers = 1;
if (mrCluster != null) {
mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", numSleepReducers);
}
super.testSleepJob(); super.testSleepJob();
} }
@Test @Test
public void testSleepJobWithMultipleReducers() public void testSleepJobWithMultipleReducers()
throws IOException, InterruptedException, ClassNotFoundException { throws Exception {
numSleepReducers = 3; numSleepReducers = 3;
if (mrCluster != null) {
mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", numSleepReducers);
}
super.testSleepJob(); super.testSleepJob();
} }
@ -76,20 +68,7 @@ public void testSleepJobWithMultipleReducers()
protected void verifySleepJobCounters(Job job) throws InterruptedException, protected void verifySleepJobCounters(Job job) throws InterruptedException,
IOException { IOException {
Counters counters = job.getCounters(); Counters counters = job.getCounters();
super.verifySleepJobCounters(job);
Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
.getValue());
Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
.getValue());
Assert.assertEquals(numSleepReducers,
counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
Assert
.assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
&& counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
Assert
.assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
&& counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
Assert.assertEquals(3, Assert.assertEquals(3,
counters.findCounter(JobCounter.NUM_UBER_SUBMAPS).getValue()); counters.findCounter(JobCounter.NUM_UBER_SUBMAPS).getValue());
Assert.assertEquals(numSleepReducers, Assert.assertEquals(numSleepReducers,
@ -168,16 +147,7 @@ public void testFailingMapper()
protected void verifyFailingMapperCounters(Job job) protected void verifyFailingMapperCounters(Job job)
throws InterruptedException, IOException { throws InterruptedException, IOException {
Counters counters = job.getCounters(); Counters counters = job.getCounters();
Assert.assertEquals(2, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS) super.verifyFailingMapperCounters(job);
.getValue());
Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
.getValue());
Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
.getValue());
Assert
.assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
&& counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
Assert.assertEquals(2, Assert.assertEquals(2,
counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue()); counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS) Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS)