From 7b541d619f96ef7e447b0c3263d3ead89c8a6901 Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Fri, 24 Aug 2012 23:36:55 +0000 Subject: [PATCH] MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached (rkanter via tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1377149 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../apache/hadoop/mapreduce/JobSubmitter.java | 14 ++++- .../org/apache/hadoop/mapred/YARNRunner.java | 4 +- .../mapreduce/v2/MiniMRYarnCluster.java | 6 +- .../hadoop/mapreduce/v2/TestMRJobs.java | 62 ++++++++++++++----- 5 files changed, 68 insertions(+), 21 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5587eb6d3a..37c9591995 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -144,6 +144,9 @@ Branch-2 ( Unreleased changes ) MAPREDUCE-4511. Add IFile readahead (ahmed via tucu) + MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached + (rkanter via tucu) + BUG FIXES MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java index 08a09c2a69..31081b332f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java @@ -232,9 +232,17 @@ private void copyAndConfigureFiles(Job job, Path submitJobDir, if ("".equals(job.getJobName())){ job.setJobName(new Path(jobJar).getName()); } - copyJar(new Path(jobJar), JobSubmissionFiles.getJobJar(submitJobDir), - replication); - job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString()); + Path jobJarPath = new Path(jobJar); + URI jobJarURI = jobJarPath.toUri(); + // If the job jar is already in fs, we don't need to copy it from local fs + if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null + || !(jobJarURI.getScheme().equals(jtFs.getUri().getScheme()) + && jobJarURI.getAuthority().equals( + jtFs.getUri().getAuthority()))) { + copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir), + replication); + job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString()); + } } else { LOG.warn("No job jar file set. User classes may not be found. "+ "See Job or Job#setJar(String)."); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 4555f86e88..74ae6446cd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -345,10 +345,10 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( createApplicationResource(defaultFileContext, jobConfPath, LocalResourceType.FILE)); if (jobConf.get(MRJobConfig.JAR) != null) { + Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR)); localResources.put(MRJobConfig.JOB_JAR, createApplicationResource(defaultFileContext, - new Path(jobSubmitDir, MRJobConfig.JOB_JAR), - LocalResourceType.ARCHIVE)); + jobJarPath, LocalResourceType.ARCHIVE)); } else { // Job jar may be null. For e.g, for pipes, the job jar is the hadoop // mapreduce jar itself which is already on the classpath. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java index de0ee249ad..8edf4f15d9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java @@ -72,8 +72,10 @@ public MiniMRYarnCluster(String testName, int noOfNMs) { @Override public void init(Configuration conf) { conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); - conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(), - "apps_staging_dir/").getAbsolutePath()); + if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) { + conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(), + "apps_staging_dir/").getAbsolutePath()); + } conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000"); try { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index 03fdc4e57f..4a30c3cfa6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -41,10 +41,10 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -80,15 +80,24 @@ public class TestMRJobs { private static final Log LOG = LogFactory.getLog(TestMRJobs.class); protected static MiniMRYarnCluster mrCluster; + protected static MiniDFSCluster dfsCluster; private static Configuration conf = new Configuration(); private static FileSystem localFs; + private static FileSystem remoteFs; static { try { localFs = FileSystem.getLocal(conf); } catch (IOException io) { throw new RuntimeException("problem getting local fs", io); } + try { + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) + .format(true).racks(null).build(); + remoteFs = dfsCluster.getFileSystem(); + } catch (IOException io) { + throw new RuntimeException("problem starting mini dfs cluster", io); + } } private static Path TEST_ROOT_DIR = new Path("target", @@ -107,6 +116,8 @@ public static void setup() throws IOException { if (mrCluster == null) { mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3); Configuration conf = new Configuration(); + conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir"); mrCluster.init(conf); mrCluster.start(); } @@ -123,6 +134,10 @@ public static void tearDown() { mrCluster.stop(); mrCluster = null; } + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } } @Test @@ -403,7 +418,6 @@ public void setup(Context context) throws IOException { Configuration conf = context.getConfiguration(); Path[] files = context.getLocalCacheFiles(); Path[] archives = context.getLocalCacheArchives(); - FileSystem fs = LocalFileSystem.get(conf); // Check that 4 (2 + appjar + DistrubutedCacheChecker jar) files // and 2 archives are present @@ -411,13 +425,13 @@ public void setup(Context context) throws IOException { Assert.assertEquals(2, archives.length); // Check lengths of the files - Assert.assertEquals(1, fs.getFileStatus(files[1]).getLen()); - Assert.assertTrue(fs.getFileStatus(files[2]).getLen() > 1); + Assert.assertEquals(1, localFs.getFileStatus(files[1]).getLen()); + Assert.assertTrue(localFs.getFileStatus(files[2]).getLen() > 1); // Check extraction of the archive - Assert.assertTrue(fs.exists(new Path(archives[0], + Assert.assertTrue(localFs.exists(new Path(archives[0], "distributed.jar.inside3"))); - Assert.assertTrue(fs.exists(new Path(archives[1], + Assert.assertTrue(localFs.exists(new Path(archives[1], "distributed.jar.inside4"))); // Check the class loaders @@ -448,8 +462,7 @@ public void setup(Context context) throws IOException { } } - @Test - public void testDistributedCache() throws Exception { + public void _testDistributedCache(String jobJarPath) throws Exception { if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test."); @@ -470,11 +483,13 @@ public void testDistributedCache() throws Exception { // Set the job jar to a new "dummy" jar so we can check that its extracted // properly - job.setJar(makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString())); + job.setJar(jobJarPath); // Because the job jar is a "dummy" jar, we need to include the jar with // DistributedCacheChecker or it won't be able to find it - job.addFileToClassPath(new Path( - JarFinder.getJar(DistributedCacheChecker.class))); + Path distributedCacheCheckerJar = new Path( + JarFinder.getJar(DistributedCacheChecker.class)); + job.addFileToClassPath(distributedCacheCheckerJar.makeQualified( + localFs.getUri(), distributedCacheCheckerJar.getParent())); job.setMapperClass(DistributedCacheChecker.class); job.setOutputFormatClass(NullOutputFormat.class); @@ -484,7 +499,9 @@ public void testDistributedCache() throws Exception { job.addCacheFile( new URI(first.toUri().toString() + "#distributed.first.symlink")); job.addFileToClassPath(second); - job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. + // The AppMaster jar itself + job.addFileToClassPath( + APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent())); job.addArchiveToClassPath(third); job.addCacheArchive(fourth.toUri()); job.setMaxMapAttempts(1); // speed up failures @@ -497,6 +514,23 @@ public void testDistributedCache() throws Exception { " but didn't Match Job ID " + jobId , trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/")); } + + @Test + public void testDistributedCache() throws Exception { + // Test with a local (file:///) Job Jar + Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString()); + _testDistributedCache(localJobJarPath.toUri().toString()); + + // Test with a remote (hdfs://) Job Jar + Path remoteJobJarPath = new Path(remoteFs.getUri().toString() + "/", + localJobJarPath.getName()); + remoteFs.moveFromLocalFile(localJobJarPath, remoteJobJarPath); + File localJobJarFile = new File(localJobJarPath.toUri().toString()); + if (localJobJarFile.exists()) { // just to make sure + localJobJarFile.delete(); + } + _testDistributedCache(remoteJobJarPath.toUri().toString()); + } private Path createTempFile(String filename, String contents) throws IOException { @@ -522,7 +556,7 @@ private Path makeJar(Path p, int index) throws FileNotFoundException, return p; } - private String makeJobJarWithLib(String testDir) throws FileNotFoundException, + private Path makeJobJarWithLib(String testDir) throws FileNotFoundException, IOException{ Path jobJarPath = new Path(testDir, "thejob.jar"); FileOutputStream fos = @@ -535,7 +569,7 @@ private String makeJobJarWithLib(String testDir) throws FileNotFoundException, new Path(testDir, "lib2.jar").toUri().getPath())); jos.close(); localFs.setPermission(jobJarPath, new FsPermission("700")); - return jobJarPath.toUri().toString(); + return jobJarPath; } private void createAndAddJarToJar(JarOutputStream jos, File jarFile)