From ceab00ac62f8057a07b4b936799e6f04271e6e41 Mon Sep 17 00:00:00 2001 From: Ming Ma Date: Wed, 29 Mar 2017 17:41:58 -0700 Subject: [PATCH] MAPREDUCE-6862. Fragments are not handled correctly by resource limit checking. (Chris Trezzo via mingma) --- .../hadoop/mapreduce/JobResourceUploader.java | 36 ++++++++++++----- .../mapreduce/TestJobResourceUploader.java | 40 ++++++++++++++++--- 2 files changed, 59 insertions(+), 17 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java index 4c48ff48c5..085c96612d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java @@ -238,28 +238,42 @@ void checkLocalizationLimits(Configuration conf, Collection files, Collection dcArchives = conf.getStringCollection(MRJobConfig.CACHE_ARCHIVES); - for (String path : dcFiles) { - explorePath(conf, new Path(path), limitChecker, statCache); + for (String uri : dcFiles) { + explorePath(conf, stringToPath(uri), limitChecker, statCache); } - for (String path : dcArchives) { - explorePath(conf, new Path(path), limitChecker, statCache); + for (String uri : dcArchives) { + explorePath(conf, stringToPath(uri), limitChecker, statCache); } - for (String path : files) { - explorePath(conf, new Path(path), limitChecker, statCache); + for (String uri : files) { + explorePath(conf, stringToPath(uri), limitChecker, statCache); } - for (String path : libjars) { - explorePath(conf, new Path(path), limitChecker, statCache); + for (String uri : libjars) { + explorePath(conf, stringToPath(uri), limitChecker, statCache); } - for (String path : archives) { - explorePath(conf, new Path(path), limitChecker, statCache); + for (String uri : archives) { + explorePath(conf, stringToPath(uri), limitChecker, statCache); } if (jobJar != null) { - explorePath(conf, new Path(jobJar), limitChecker, statCache); + explorePath(conf, stringToPath(jobJar), limitChecker, statCache); + } + } + + /** + * Convert a String to a Path and gracefully remove fragments/queries if they + * exist in the String. + */ + @VisibleForTesting + Path stringToPath(String s) { + try { + URI uri = new URI(s); + return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath()); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java index 36ea57af21..8ba50a66b3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java @@ -39,6 +39,34 @@ */ public class TestJobResourceUploader { + @Test + public void testStringToPath() throws IOException { + Configuration conf = new Configuration(); + JobResourceUploader uploader = + new JobResourceUploader(FileSystem.getLocal(conf), false); + + Assert.assertEquals("Failed: absolute, no scheme, with fragment", + "/testWithFragment.txt", + uploader.stringToPath("/testWithFragment.txt#fragment.txt").toString()); + + Assert.assertEquals("Failed: absolute, with scheme, with fragment", + "file:/testWithFragment.txt", + uploader.stringToPath("file:///testWithFragment.txt#fragment.txt") + .toString()); + + Assert.assertEquals("Failed: relative, no scheme, with fragment", + "testWithFragment.txt", + uploader.stringToPath("testWithFragment.txt#fragment.txt").toString()); + + Assert.assertEquals("Failed: relative, no scheme, no fragment", + "testWithFragment.txt", + uploader.stringToPath("testWithFragment.txt").toString()); + + Assert.assertEquals("Failed: absolute, with scheme, no fragment", + "file:/testWithFragment.txt", + uploader.stringToPath("file:///testWithFragment.txt").toString()); + } + @Test public void testAllDefaults() throws IOException { ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); @@ -210,17 +238,17 @@ private JobConf setupJobConf(ResourceLimitsConf rlConf) { rlConf.maxSingleResourceMB); conf.set("tmpfiles", - buildPathString("file://tmpFiles", rlConf.numOfTmpFiles)); + buildPathString("file:///tmpFiles", rlConf.numOfTmpFiles)); conf.set("tmpjars", - buildPathString("file://tmpjars", rlConf.numOfTmpLibJars)); + buildPathString("file:///tmpjars", rlConf.numOfTmpLibJars)); conf.set("tmparchives", - buildPathString("file://tmpArchives", rlConf.numOfTmpArchives)); + buildPathString("file:///tmpArchives", rlConf.numOfTmpArchives)); conf.set(MRJobConfig.CACHE_ARCHIVES, - buildPathString("file://cacheArchives", rlConf.numOfDCArchives)); + buildPathString("file:///cacheArchives", rlConf.numOfDCArchives)); conf.set(MRJobConfig.CACHE_FILES, - buildPathString("file://cacheFiles", rlConf.numOfDCFiles)); + buildPathString("file:///cacheFiles", rlConf.numOfDCFiles)); if (rlConf.jobJar) { - conf.setJar("file://jobjar.jar"); + conf.setJar("file:///jobjar.jar"); } return conf; }