MAPREDUCE-6862. Fragments are not handled correctly by resource limit checking. (Chris Trezzo via mingma)
This commit is contained in:
parent
6a5516c238
commit
ceab00ac62
@ -238,28 +238,42 @@ class JobResourceUploader {
|
||||
Collection<String> dcArchives =
|
||||
conf.getStringCollection(MRJobConfig.CACHE_ARCHIVES);
|
||||
|
||||
for (String path : dcFiles) {
|
||||
explorePath(conf, new Path(path), limitChecker, statCache);
|
||||
for (String uri : dcFiles) {
|
||||
explorePath(conf, stringToPath(uri), limitChecker, statCache);
|
||||
}
|
||||
|
||||
for (String path : dcArchives) {
|
||||
explorePath(conf, new Path(path), limitChecker, statCache);
|
||||
for (String uri : dcArchives) {
|
||||
explorePath(conf, stringToPath(uri), limitChecker, statCache);
|
||||
}
|
||||
|
||||
for (String path : files) {
|
||||
explorePath(conf, new Path(path), limitChecker, statCache);
|
||||
for (String uri : files) {
|
||||
explorePath(conf, stringToPath(uri), limitChecker, statCache);
|
||||
}
|
||||
|
||||
for (String path : libjars) {
|
||||
explorePath(conf, new Path(path), limitChecker, statCache);
|
||||
for (String uri : libjars) {
|
||||
explorePath(conf, stringToPath(uri), limitChecker, statCache);
|
||||
}
|
||||
|
||||
for (String path : archives) {
|
||||
explorePath(conf, new Path(path), limitChecker, statCache);
|
||||
for (String uri : archives) {
|
||||
explorePath(conf, stringToPath(uri), limitChecker, statCache);
|
||||
}
|
||||
|
||||
if (jobJar != null) {
|
||||
explorePath(conf, new Path(jobJar), limitChecker, statCache);
|
||||
explorePath(conf, stringToPath(jobJar), limitChecker, statCache);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a String to a Path and gracefully remove fragments/queries if they
|
||||
* exist in the String.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
Path stringToPath(String s) {
|
||||
try {
|
||||
URI uri = new URI(s);
|
||||
return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,6 +39,34 @@ import org.junit.Test;
|
||||
*/
|
||||
public class TestJobResourceUploader {
|
||||
|
||||
@Test
|
||||
public void testStringToPath() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
JobResourceUploader uploader =
|
||||
new JobResourceUploader(FileSystem.getLocal(conf), false);
|
||||
|
||||
Assert.assertEquals("Failed: absolute, no scheme, with fragment",
|
||||
"/testWithFragment.txt",
|
||||
uploader.stringToPath("/testWithFragment.txt#fragment.txt").toString());
|
||||
|
||||
Assert.assertEquals("Failed: absolute, with scheme, with fragment",
|
||||
"file:/testWithFragment.txt",
|
||||
uploader.stringToPath("file:///testWithFragment.txt#fragment.txt")
|
||||
.toString());
|
||||
|
||||
Assert.assertEquals("Failed: relative, no scheme, with fragment",
|
||||
"testWithFragment.txt",
|
||||
uploader.stringToPath("testWithFragment.txt#fragment.txt").toString());
|
||||
|
||||
Assert.assertEquals("Failed: relative, no scheme, no fragment",
|
||||
"testWithFragment.txt",
|
||||
uploader.stringToPath("testWithFragment.txt").toString());
|
||||
|
||||
Assert.assertEquals("Failed: absolute, with scheme, no fragment",
|
||||
"file:/testWithFragment.txt",
|
||||
uploader.stringToPath("file:///testWithFragment.txt").toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllDefaults() throws IOException {
|
||||
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
|
||||
@ -210,17 +238,17 @@ public class TestJobResourceUploader {
|
||||
rlConf.maxSingleResourceMB);
|
||||
|
||||
conf.set("tmpfiles",
|
||||
buildPathString("file://tmpFiles", rlConf.numOfTmpFiles));
|
||||
buildPathString("file:///tmpFiles", rlConf.numOfTmpFiles));
|
||||
conf.set("tmpjars",
|
||||
buildPathString("file://tmpjars", rlConf.numOfTmpLibJars));
|
||||
buildPathString("file:///tmpjars", rlConf.numOfTmpLibJars));
|
||||
conf.set("tmparchives",
|
||||
buildPathString("file://tmpArchives", rlConf.numOfTmpArchives));
|
||||
buildPathString("file:///tmpArchives", rlConf.numOfTmpArchives));
|
||||
conf.set(MRJobConfig.CACHE_ARCHIVES,
|
||||
buildPathString("file://cacheArchives", rlConf.numOfDCArchives));
|
||||
buildPathString("file:///cacheArchives", rlConf.numOfDCArchives));
|
||||
conf.set(MRJobConfig.CACHE_FILES,
|
||||
buildPathString("file://cacheFiles", rlConf.numOfDCFiles));
|
||||
buildPathString("file:///cacheFiles", rlConf.numOfDCFiles));
|
||||
if (rlConf.jobJar) {
|
||||
conf.setJar("file://jobjar.jar");
|
||||
conf.setJar("file:///jobjar.jar");
|
||||
}
|
||||
return conf;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user