From 49beea562a3b6bbb5eb56d89759e51b35c5fd4f1 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Mon, 22 Oct 2012 19:50:23 +0000 Subject: [PATCH] MAPREDUCE-4740. only .jars can be added to the Distributed Cache classpath. Contributed by Robert Joseph Evans git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1401054 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapreduce/v2/util/MRApps.java | 75 +++++++++++++++---- .../hadoop/mapreduce/v2/util/TestMRApps.java | 50 ++++++++++++- 3 files changed, 112 insertions(+), 16 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 56641fe122..d700c7b233 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -599,6 +599,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many reducers complete consecutively. (Jason Lowe via vinodkv) + MAPREDUCE-4740. only .jars can be added to the Distributed Cache + classpath. (Robert Joseph Evans via jlowe) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 169ba4b4c0..596802853f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -191,6 +191,7 @@ private static void setMRFrameworkClasspath( // TODO: Remove duplicates. } + @SuppressWarnings("deprecation") public static void setClasspath(Map environment, Configuration conf) throws IOException { boolean userClassesTakesPrecedence = @@ -218,11 +219,66 @@ public static void setClasspath(Map environment, environment, Environment.CLASSPATH.name(), Environment.PWD.$() + Path.SEPARATOR + "*"); + // a * in the classpath will only find a .jar, so we need to filter out + // all .jars and add everything else + addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf), + DistributedCache.getCacheFiles(conf), + conf, + environment); + addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf), + DistributedCache.getCacheArchives(conf), + conf, + environment); if (userClassesTakesPrecedence) { MRApps.setMRFrameworkClasspath(environment, conf); } } + /** + * Add the paths to the classpath if they are not jars + * @param paths the paths to add to the classpath + * @param withLinks the corresponding paths that may have a link name in them + * @param conf used to resolve the paths + * @param environment the environment to update CLASSPATH in + * @throws IOException if there is an error resolving any of the paths. + */ + private static void addToClasspathIfNotJar(Path[] paths, + URI[] withLinks, Configuration conf, + Map environment) throws IOException { + if (paths != null) { + HashMap linkLookup = new HashMap(); + if (withLinks != null) { + for (URI u: withLinks) { + Path p = new Path(u); + FileSystem remoteFS = p.getFileSystem(conf); + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory())); + String name = (null == u.getFragment()) + ? p.getName() : u.getFragment(); + if (!name.toLowerCase().endsWith(".jar")) { + linkLookup.put(p, name); + } + } + } + + for (Path p : paths) { + FileSystem remoteFS = p.getFileSystem(conf); + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory())); + String name = linkLookup.get(p); + if (name == null) { + name = p.getName(); + } + if(!name.toLowerCase().endsWith(".jar")) { + Apps.addToEnvironment( + environment, + Environment.CLASSPATH.name(), + Environment.PWD.$() + Path.SEPARATOR + name); + } + } + } + } + private static final String STAGING_CONSTANT = ".staging"; public static Path getStagingAreaDir(Configuration conf, String user) { return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, @@ -261,8 +317,7 @@ public static void setupDistributedCache( DistributedCache.getCacheArchives(conf), parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)), getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), - DistributedCache.getArchiveVisibilities(conf), - DistributedCache.getArchiveClassPaths(conf)); + DistributedCache.getArchiveVisibilities(conf)); // Cache files parseDistributedCacheArtifacts(conf, @@ -271,8 +326,7 @@ public static void setupDistributedCache( DistributedCache.getCacheFiles(conf), parseTimeStamps(DistributedCache.getFileTimestamps(conf)), getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), - DistributedCache.getFileVisibilities(conf), - DistributedCache.getFileClassPaths(conf)); + DistributedCache.getFileVisibilities(conf)); } private static String getResourceDescription(LocalResourceType type) { @@ -289,8 +343,8 @@ private static void parseDistributedCacheArtifacts( Configuration conf, Map localResources, LocalResourceType type, - URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], - Path[] pathsToPutOnClasspath) throws IOException { + URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[]) + throws IOException { if (uris != null) { // Sanity check @@ -304,15 +358,6 @@ private static void parseDistributedCacheArtifacts( ); } - Map classPaths = new HashMap(); - if (pathsToPutOnClasspath != null) { - for (Path p : pathsToPutOnClasspath) { - FileSystem remoteFS = p.getFileSystem(conf); - p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory())); - classPaths.put(p.toUri().getPath().toString(), p); - } - } for (int i = 0; i < uris.length; ++i) { URI u = uris[i]; Path p = new Path(u); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java index dbd3538602..5cf515bb25 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.util; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.util.HashMap; @@ -42,12 +44,36 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; - +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import static org.junit.Assert.*; import static org.mockito.Mockito.*; public class TestMRApps { + private static File testWorkDir = null; + + @BeforeClass + public static void setupTestDirs() throws IOException { + testWorkDir = new File("target", TestMRApps.class.getCanonicalName()); + delete(testWorkDir); + testWorkDir.mkdirs(); + testWorkDir = testWorkDir.getAbsoluteFile(); + } + + @AfterClass + public static void cleanupTestDirs() throws IOException { + if (testWorkDir != null) { + delete(testWorkDir); + } + } + + private static void delete(File dir) throws IOException { + Path p = new Path("file://"+dir.getAbsolutePath()); + Configuration conf = new Configuration(); + FileSystem fs = p.getFileSystem(conf); + fs.delete(p, true); + } @Test public void testJobIDtoString() { JobId jid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class); @@ -154,6 +180,28 @@ public class TestMRApps { } assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath)); } + + @Test public void testSetClasspathWithArchives () throws IOException { + File testTGZ = new File(testWorkDir, "test.tgz"); + FileOutputStream out = new FileOutputStream(testTGZ); + out.write(0); + out.close(); + Job job = Job.getInstance(); + Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.CLASSPATH_ARCHIVES, "file://" + + testTGZ.getAbsolutePath()); + conf.set(MRJobConfig.CACHE_ARCHIVES, "file://" + + testTGZ.getAbsolutePath() + "#testTGZ"); + Map environment = new HashMap(); + MRApps.setClasspath(environment, conf); + assertTrue(environment.get("CLASSPATH").startsWith("$PWD:")); + String confClasspath = job.getConfiguration().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH); + if (confClasspath != null) { + confClasspath = confClasspath.replaceAll(",\\s*", ":").trim(); + } + assertTrue(environment.get("CLASSPATH").contains(confClasspath)); + assertTrue(environment.get("CLASSPATH").contains("testTGZ")); + } @Test public void testSetClasspathWithUserPrecendence() { Configuration conf = new Configuration();