MAPREDUCE-5813. Fix YarnChild to explicitly load job.xml from the local-filesystem, rather than rely on the classpath. Contributed by Gera Shegalov.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1583050 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2014-03-29 19:14:53 +00:00
parent b4c41f341b
commit b7b3a7e011
3 changed files with 53 additions and 2 deletions

View File

@ -272,6 +272,10 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5810. Removed the faulty and failing streaming test MAPREDUCE-5810. Removed the faulty and failing streaming test
TestStreamingTaskLog. (Akira Ajisaka via vinodkv) TestStreamingTaskLog. (Akira Ajisaka via vinodkv)
MAPREDUCE-5813. Fix YarnChild to explicitly load job.xml from the
local-filesystem, rather than rely on the classpath. (Gera Shegalov via
acmurthy)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -76,10 +76,9 @@ public static void main(String[] args) throws Throwable {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
LOG.debug("Child starting"); LOG.debug("Child starting");
final JobConf job = new JobConf(); final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
// Initing with our JobConf allows us to avoid loading confs twice // Initing with our JobConf allows us to avoid loading confs twice
Limits.init(job); Limits.init(job);
job.addResource(MRJobConfig.JOB_CONF_FILE);
UserGroupInformation.setConfiguration(job); UserGroupInformation.setConfiguration(job);
String host = args[0]; String host = args[0];

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.RandomTextWriterJob; import org.apache.hadoop.RandomTextWriterJob;
import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat; import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat;
import org.apache.hadoop.SleepJob; import org.apache.hadoop.SleepJob;
import org.apache.hadoop.SleepJob.SleepMapper;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -98,6 +99,7 @@ public class TestMRJobs {
private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES = private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES =
EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED); EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
private static final int NUM_NODE_MGRS = 3; private static final int NUM_NODE_MGRS = 3;
private static final String TEST_IO_SORT_MB = "11";
protected static MiniMRYarnCluster mrCluster; protected static MiniMRYarnCluster mrCluster;
protected static MiniDFSCluster dfsCluster; protected static MiniDFSCluster dfsCluster;
@ -205,6 +207,38 @@ public void testSleepJob() throws IOException, InterruptedException,
// JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value // JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
} }
@Test(timeout = 300000)
public void testJobClassloader() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testJobClassloader().");
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
final Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set master address to local to test that local mode applied iff framework == local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB);
sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.REDUCE_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.MAP_JAVA_OPTS, "-verbose:class");
final SleepJob sleepJob = new SleepJob();
sleepJob.setConf(sleepConf);
final Job job = sleepJob.createJob(1, 1, 10, 1, 10, 1);
job.setMapperClass(ConfVerificationMapper.class);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(SleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
succeeded);
}
protected void verifySleepJobCounters(Job job) throws InterruptedException, protected void verifySleepJobCounters(Job job) throws InterruptedException,
IOException { IOException {
Counters counters = job.getCounters(); Counters counters = job.getCounters();
@ -795,4 +829,18 @@ private void createAndAddJarToJar(JarOutputStream jos, File jarFile)
jos.closeEntry(); jos.closeEntry();
jarFile.delete(); jarFile.delete();
} }
public static class ConfVerificationMapper extends SleepMapper {
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
final Configuration conf = context.getConfiguration();
final String ioSortMb = conf.get(MRJobConfig.IO_SORT_MB);
if (!TEST_IO_SORT_MB.equals(ioSortMb)) {
throw new IOException("io.sort.mb expected: " + TEST_IO_SORT_MB
+ ", actual: " + ioSortMb);
}
}
}
} }