MAPREDUCE-6846. Fragments specified for libjar paths are not handled correctly

(Contributed by Chris Trezzo via Daniel Templeton)
This commit is contained in:
Daniel Templeton 2017-04-05 17:24:09 -07:00
parent a2c57bb70d
commit fc0885da29
2 changed files with 492 additions and 92 deletions

View File

@ -23,6 +23,7 @@
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.logging.Log;
@ -91,7 +92,7 @@ public void uploadResources(Job job, Path submitJobDir) throws IOException {
submitJobDir = new Path(submitJobDir.toUri().getPath());
FsPermission mapredSysPerms =
new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
mkdirs(jtFs, submitJobDir, mapredSysPerms);
Collection<String> files = conf.getStringCollection("tmpfiles");
Collection<String> libjars = conf.getStringCollection("tmpjars");
@ -116,18 +117,20 @@ public void uploadResources(Job job, Path submitJobDir) throws IOException {
job.getCredentials());
}
private void uploadFiles(Configuration conf, Collection<String> files,
@VisibleForTesting
void uploadFiles(Configuration conf, Collection<String> files,
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
throws IOException {
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
if (!files.isEmpty()) {
FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
mkdirs(jtFs, filesDir, mapredSysPerms);
for (String tmpFile : files) {
URI tmpURI = null;
try {
tmpURI = new URI(tmpFile);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
throw new IllegalArgumentException("Error parsing files argument."
+ " Argument must be a valid URI: " + tmpFile, e);
}
Path tmp = new Path(tmpURI);
Path newPath = copyRemoteFiles(filesDir, tmp, conf, submitReplication);
@ -136,50 +139,83 @@ private void uploadFiles(Configuration conf, Collection<String> files,
DistributedCache.addCacheFile(pathURI, conf);
} catch (URISyntaxException ue) {
// should not throw a uri exception
throw new IOException("Failed to create uri for " + tmpFile, ue);
throw new IOException(
"Failed to create a URI (URISyntaxException) for the remote path "
+ newPath + ". This was based on the files parameter: "
+ tmpFile,
ue);
}
}
}
}
private void uploadLibJars(Configuration conf, Collection<String> libjars,
// Suppress warning for use of DistributedCache (it is everywhere).
@SuppressWarnings("deprecation")
@VisibleForTesting
void uploadLibJars(Configuration conf, Collection<String> libjars,
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
throws IOException {
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
if (!libjars.isEmpty()) {
FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
mkdirs(jtFs, libjarsDir, mapredSysPerms);
Collection<URI> libjarURIs = new LinkedList<>();
boolean foundFragment = false;
for (String tmpjars : libjars) {
Path tmp = new Path(tmpjars);
URI tmpURI = null;
try {
tmpURI = new URI(tmpjars);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Error parsing libjars argument."
+ " Argument must be a valid URI: " + tmpjars, e);
}
Path tmp = new Path(tmpURI);
Path newPath =
copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
// Add each file to the classpath
DistributedCache.addFileToClassPath(
new Path(newPath.toUri().getPath()), conf, jtFs, !useWildcard);
try {
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
if (!foundFragment) {
foundFragment = pathURI.getFragment() != null;
}
DistributedCache.addFileToClassPath(new Path(pathURI.getPath()), conf,
jtFs, false);
libjarURIs.add(pathURI);
} catch (URISyntaxException ue) {
// should not throw a uri exception
throw new IOException(
"Failed to create a URI (URISyntaxException) for the remote path "
+ newPath + ". This was based on the libjar parameter: "
+ tmpjars,
ue);
}
}
if (useWildcard) {
// Add the whole directory to the cache
if (useWildcard && !foundFragment) {
// Add the whole directory to the cache using a wild card
Path libJarsDirWildcard =
jtFs.makeQualified(new Path(libjarsDir, DistributedCache.WILDCARD));
DistributedCache.addCacheFile(libJarsDirWildcard.toUri(), conf);
} else {
for (URI uri : libjarURIs) {
DistributedCache.addCacheFile(uri, conf);
}
}
}
}
private void uploadArchives(Configuration conf, Collection<String> archives,
@VisibleForTesting
void uploadArchives(Configuration conf, Collection<String> archives,
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
throws IOException {
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
if (!archives.isEmpty()) {
FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);
mkdirs(jtFs, archivesDir, mapredSysPerms);
for (String tmpArchives : archives) {
URI tmpURI;
try {
tmpURI = new URI(tmpArchives);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
throw new IllegalArgumentException("Error parsing archives argument."
+ " Argument must be a valid URI: " + tmpArchives, e);
}
Path tmp = new Path(tmpURI);
Path newPath =
@ -189,13 +225,18 @@ private void uploadArchives(Configuration conf, Collection<String> archives,
DistributedCache.addCacheArchive(pathURI, conf);
} catch (URISyntaxException ue) {
// should not throw an uri excpetion
throw new IOException("Failed to create uri for " + tmpArchives, ue);
throw new IOException(
"Failed to create a URI (URISyntaxException) for the remote path"
+ newPath + ". This was based on the archive parameter: "
+ tmpArchives,
ue);
}
}
}
}
private void uploadJobJar(Job job, String jobJar, Path submitJobDir,
@VisibleForTesting
void uploadJobJar(Job job, String jobJar, Path submitJobDir,
short submitReplication) throws IOException {
if (jobJar != null) { // copy jar to JobTracker's fs
// use jar name if job is not named.
@ -273,7 +314,8 @@ Path stringToPath(String s) {
URI uri = new URI(s);
return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
throw new IllegalArgumentException(
"Error parsing argument." + " Argument must be a valid URI: " + s, e);
}
}
@ -380,9 +422,20 @@ FileStatus getFileStatus(Map<URI, FileStatus> statCache,
return status;
}
/**
* Create a new directory in the passed filesystem. This wrapper method exists
* so that it can be overridden/stubbed during testing.
*/
@VisibleForTesting
boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
throws IOException {
return FileSystem.mkdirs(fs, dir, permission);
}
// copies a file to the jobtracker filesystem and returns the path where it
// was copied to
private Path copyRemoteFiles(Path parentDir, Path originalPath,
@VisibleForTesting
Path copyRemoteFiles(Path parentDir, Path originalPath,
Configuration conf, short replication) throws IOException {
// check if we do not need to copy the files
// is jt using the same file system.
@ -400,10 +453,12 @@ private Path copyRemoteFiles(Path parentDir, Path originalPath,
Path newPath = new Path(parentDir, originalPath.getName());
FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf);
jtFs.setReplication(newPath, replication);
jtFs.makeQualified(newPath);
return newPath;
}
private void copyJar(Path originalJarPath, Path submitJarFile,
@VisibleForTesting
void copyJar(Path originalJarPath, Path submitJarFile,
short replication) throws IOException {
jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
jtFs.setReplication(submitJarFile, replication);
@ -427,7 +482,7 @@ private URI getPathURI(Path destPath, String fragment)
URI pathURI = destPath.toUri();
if (pathURI.getFragment() == null) {
if (fragment == null) {
pathURI = new URI(pathURI.toString() + "#" + destPath.getName());
// no fragment, just return existing pathURI from destPath
} else {
pathURI = new URI(pathURI.toString() + "#" + fragment);
}

View File

@ -23,13 +23,19 @@
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Assert;
import org.junit.Test;
@ -69,13 +75,13 @@ public void testStringToPath() throws IOException {
@Test
public void testAllDefaults() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
ResourceConf.Builder b = new ResourceConf.Builder();
runLimitsTest(b.build(), true, null);
}
@Test
public void testNoLimitsWithResources() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfDCArchives(1);
b.setNumOfDCFiles(1);
b.setNumOfTmpArchives(10);
@ -88,7 +94,7 @@ public void testNoLimitsWithResources() throws IOException {
@Test
public void testAtResourceLimit() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfDCArchives(1);
b.setNumOfDCFiles(1);
b.setNumOfTmpArchives(1);
@ -101,7 +107,7 @@ public void testAtResourceLimit() throws IOException {
@Test
public void testOverResourceLimit() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfDCArchives(1);
b.setNumOfDCFiles(1);
b.setNumOfTmpArchives(1);
@ -114,7 +120,7 @@ public void testOverResourceLimit() throws IOException {
@Test
public void testAtResourcesMBLimit() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfDCArchives(1);
b.setNumOfDCFiles(1);
b.setNumOfTmpArchives(1);
@ -128,7 +134,7 @@ public void testAtResourcesMBLimit() throws IOException {
@Test
public void testOverResourcesMBLimit() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfDCArchives(1);
b.setNumOfDCFiles(2);
b.setNumOfTmpArchives(1);
@ -142,7 +148,7 @@ public void testOverResourcesMBLimit() throws IOException {
@Test
public void testAtSingleResourceMBLimit() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfDCArchives(1);
b.setNumOfDCFiles(2);
b.setNumOfTmpArchives(1);
@ -156,7 +162,7 @@ public void testAtSingleResourceMBLimit() throws IOException {
@Test
public void testOverSingleResourceMBLimit() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfDCArchives(1);
b.setNumOfDCFiles(2);
b.setNumOfTmpArchives(1);
@ -168,20 +174,263 @@ public void testOverSingleResourceMBLimit() throws IOException {
runLimitsTest(b.build(), false, ResourceViolation.SINGLE_RESOURCE_SIZE);
}
private String destinationPathPrefix = "hdfs:///destinationPath/";
private String[] expectedFilesNoFrags =
{ destinationPathPrefix + "tmpFiles0.txt",
destinationPathPrefix + "tmpFiles1.txt",
destinationPathPrefix + "tmpFiles2.txt",
destinationPathPrefix + "tmpFiles3.txt",
destinationPathPrefix + "tmpFiles4.txt",
destinationPathPrefix + "tmpjars0.jar",
destinationPathPrefix + "tmpjars1.jar" };
private String[] expectedFilesWithFrags =
{ destinationPathPrefix + "tmpFiles0.txt#tmpFilesfragment0.txt",
destinationPathPrefix + "tmpFiles1.txt#tmpFilesfragment1.txt",
destinationPathPrefix + "tmpFiles2.txt#tmpFilesfragment2.txt",
destinationPathPrefix + "tmpFiles3.txt#tmpFilesfragment3.txt",
destinationPathPrefix + "tmpFiles4.txt#tmpFilesfragment4.txt",
destinationPathPrefix + "tmpjars0.jar#tmpjarsfragment0.jar",
destinationPathPrefix + "tmpjars1.jar#tmpjarsfragment1.jar" };
// We use the local fs for the submitFS in the StubedUploader, so libjars
// should be replaced with a single path.
private String[] expectedFilesWithWildcard =
{ destinationPathPrefix + "tmpFiles0.txt",
destinationPathPrefix + "tmpFiles1.txt",
destinationPathPrefix + "tmpFiles2.txt",
destinationPathPrefix + "tmpFiles3.txt",
destinationPathPrefix + "tmpFiles4.txt",
"file:///libjars-submit-dir/libjars/*" };
private String[] expectedArchivesNoFrags =
{ destinationPathPrefix + "tmpArchives0.tgz",
destinationPathPrefix + "tmpArchives1.tgz" };
private String[] expectedArchivesWithFrags =
{ destinationPathPrefix + "tmpArchives0.tgz#tmpArchivesfragment0.tgz",
destinationPathPrefix + "tmpArchives1.tgz#tmpArchivesfragment1.tgz" };
private String jobjarSubmitDir = "/jobjar-submit-dir";
private String expectedJobJar = jobjarSubmitDir + "/job.jar";
@Test
public void testPathsWithNoFragNoSchemeRelative() throws IOException {
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfTmpFiles(5);
b.setNumOfTmpLibJars(2);
b.setNumOfTmpArchives(2);
b.setJobJar(true);
b.setPathsWithScheme(false);
b.setPathsWithFrags(false);
ResourceConf rConf = b.build();
JobConf jConf = new JobConf();
JobResourceUploader uploader = new StubedUploader(jConf);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
expectedArchivesNoFrags, expectedJobJar);
}
@Test
public void testPathsWithNoFragNoSchemeAbsolute() throws IOException {
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfTmpFiles(5);
b.setNumOfTmpLibJars(2);
b.setNumOfTmpArchives(2);
b.setJobJar(true);
b.setPathsWithFrags(false);
b.setPathsWithScheme(false);
b.setAbsolutePaths(true);
ResourceConf rConf = b.build();
JobConf jConf = new JobConf();
JobResourceUploader uploader = new StubedUploader(jConf);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
expectedArchivesNoFrags, expectedJobJar);
}
@Test
public void testPathsWithFragNoSchemeAbsolute() throws IOException {
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfTmpFiles(5);
b.setNumOfTmpLibJars(2);
b.setNumOfTmpArchives(2);
b.setJobJar(true);
b.setPathsWithFrags(true);
b.setPathsWithScheme(false);
b.setAbsolutePaths(true);
ResourceConf rConf = b.build();
JobConf jConf = new JobConf();
JobResourceUploader uploader = new StubedUploader(jConf);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
expectedArchivesWithFrags, expectedJobJar);
}
@Test
public void testPathsWithFragNoSchemeRelative() throws IOException {
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfTmpFiles(5);
b.setNumOfTmpLibJars(2);
b.setNumOfTmpArchives(2);
b.setJobJar(true);
b.setPathsWithFrags(true);
b.setAbsolutePaths(false);
b.setPathsWithScheme(false);
ResourceConf rConf = b.build();
JobConf jConf = new JobConf();
JobResourceUploader uploader = new StubedUploader(jConf);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
expectedArchivesWithFrags, expectedJobJar);
}
@Test
public void testPathsWithFragSchemeAbsolute() throws IOException {
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfTmpFiles(5);
b.setNumOfTmpLibJars(2);
b.setNumOfTmpArchives(2);
b.setJobJar(true);
b.setPathsWithFrags(true);
b.setAbsolutePaths(true);
b.setPathsWithScheme(true);
ResourceConf rConf = b.build();
JobConf jConf = new JobConf();
JobResourceUploader uploader = new StubedUploader(jConf);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
expectedArchivesWithFrags, expectedJobJar);
}
@Test
public void testPathsWithNoFragWithSchemeAbsolute() throws IOException {
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfTmpFiles(5);
b.setNumOfTmpLibJars(2);
b.setNumOfTmpArchives(2);
b.setJobJar(true);
b.setPathsWithFrags(false);
b.setPathsWithScheme(true);
b.setAbsolutePaths(true);
ResourceConf rConf = b.build();
JobConf jConf = new JobConf();
JobResourceUploader uploader = new StubedUploader(jConf);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
expectedArchivesNoFrags, expectedJobJar);
}
@Test
public void testPathsWithNoFragAndWildCard() throws IOException {
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfTmpFiles(5);
b.setNumOfTmpLibJars(4);
b.setNumOfTmpArchives(2);
b.setJobJar(true);
b.setPathsWithFrags(false);
b.setPathsWithScheme(true);
b.setAbsolutePaths(true);
ResourceConf rConf = b.build();
JobConf jConf = new JobConf();
JobResourceUploader uploader = new StubedUploader(jConf, true);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithWildcard,
expectedArchivesNoFrags, expectedJobJar);
}
@Test
public void testPathsWithFragsAndWildCard() throws IOException {
ResourceConf.Builder b = new ResourceConf.Builder();
b.setNumOfTmpFiles(5);
b.setNumOfTmpLibJars(2);
b.setNumOfTmpArchives(2);
b.setJobJar(true);
b.setPathsWithFrags(true);
b.setPathsWithScheme(true);
b.setAbsolutePaths(true);
ResourceConf rConf = b.build();
JobConf jConf = new JobConf();
JobResourceUploader uploader = new StubedUploader(jConf, true);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
expectedArchivesWithFrags, expectedJobJar);
}
private void runTmpResourcePathTest(JobResourceUploader uploader,
ResourceConf rConf, JobConf jConf, String[] expectedFiles,
String[] expectedArchives, String expectedJobJar) throws IOException {
rConf.setupJobConf(jConf);
// We use a pre and post job object here because we need the post job object
// to get the new values set during uploadResources, but we need the pre job
// to set the job jar because JobResourceUploader#uploadJobJar uses the Job
// interface not the JobConf. The post job is automatically created in
// validateResourcePaths.
Job jobPre = Job.getInstance(jConf);
uploadResources(uploader, jConf, jobPre);
validateResourcePaths(jConf, expectedFiles, expectedArchives,
expectedJobJar, jobPre);
}
private void uploadResources(JobResourceUploader uploader, JobConf jConf,
Job job) throws IOException {
Collection<String> files = jConf.getStringCollection("tmpfiles");
Collection<String> libjars = jConf.getStringCollection("tmpjars");
Collection<String> archives = jConf.getStringCollection("tmparchives");
String jobJar = jConf.getJar();
uploader.uploadFiles(jConf, files, new Path("/files-submit-dir"), null,
(short) 3);
uploader.uploadArchives(jConf, archives, new Path("/archives-submit-dir"),
null, (short) 3);
uploader.uploadLibJars(jConf, libjars, new Path("/libjars-submit-dir"),
null, (short) 3);
uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3);
}
private void validateResourcePaths(JobConf jConf, String[] expectedFiles,
String[] expectedArchives, String expectedJobJar, Job preJob)
throws IOException {
Job j = Job.getInstance(jConf);
validateResourcePathsSub(j.getCacheFiles(), expectedFiles);
validateResourcePathsSub(j.getCacheArchives(), expectedArchives);
// We use a different job object here because the jobjar was set on a
// different job object
Assert.assertEquals("Job jar path is different than expected!",
expectedJobJar, preJob.getJar());
}
private void validateResourcePathsSub(URI[] actualURIs,
String[] expectedURIs) {
List<URI> actualList = Arrays.asList(actualURIs);
Set<String> expectedSet = new HashSet<>(Arrays.asList(expectedURIs));
if (actualList.size() != expectedSet.size()) {
Assert.fail("Expected list of resources (" + expectedSet.size()
+ ") and actual list of resources (" + actualList.size()
+ ") are different lengths!");
}
for (URI u : actualList) {
if (!expectedSet.contains(u.toString())) {
Assert.fail("Resource list contained unexpected path: " + u.toString());
}
}
}
private enum ResourceViolation {
NUMBER_OF_RESOURCES, TOTAL_RESOURCE_SIZE, SINGLE_RESOURCE_SIZE;
}
private void runLimitsTest(ResourceLimitsConf rlConf,
boolean checkShouldSucceed, ResourceViolation violation)
throws IOException {
private void runLimitsTest(ResourceConf rlConf, boolean checkShouldSucceed,
ResourceViolation violation) throws IOException {
if (!checkShouldSucceed && violation == null) {
Assert.fail("Test is misconfigured. checkShouldSucceed is set to false"
+ " and a ResourceViolation is not specified.");
}
JobConf conf = setupJobConf(rlConf);
JobConf conf = new JobConf();
rlConf.setupJobConf(conf);
JobResourceUploader uploader = new StubedUploader(conf);
long configuredSizeOfResourceBytes = rlConf.sizeOfResource * 1024 * 1024;
when(mockedStatus.getLen()).thenReturn(configuredSizeOfResourceBytes);
@ -230,43 +479,7 @@ private void runLimitsTest(ResourceLimitsConf rlConf,
private final FileStatus mockedStatus = mock(FileStatus.class);
private JobConf setupJobConf(ResourceLimitsConf rlConf) {
JobConf conf = new JobConf();
conf.setInt(MRJobConfig.MAX_RESOURCES, rlConf.maxResources);
conf.setLong(MRJobConfig.MAX_RESOURCES_MB, rlConf.maxResourcesMB);
conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB,
rlConf.maxSingleResourceMB);
conf.set("tmpfiles",
buildPathString("file:///tmpFiles", rlConf.numOfTmpFiles));
conf.set("tmpjars",
buildPathString("file:///tmpjars", rlConf.numOfTmpLibJars));
conf.set("tmparchives",
buildPathString("file:///tmpArchives", rlConf.numOfTmpArchives));
conf.set(MRJobConfig.CACHE_ARCHIVES,
buildPathString("file:///cacheArchives", rlConf.numOfDCArchives));
conf.set(MRJobConfig.CACHE_FILES,
buildPathString("file:///cacheFiles", rlConf.numOfDCFiles));
if (rlConf.jobJar) {
conf.setJar("file:///jobjar.jar");
}
return conf;
}
private String buildPathString(String pathPrefix, int numOfPaths) {
if (numOfPaths < 1) {
return "";
} else {
StringBuilder b = new StringBuilder();
b.append(pathPrefix + 0);
for (int i = 1; i < numOfPaths; i++) {
b.append("," + pathPrefix + i);
}
return b.toString();
}
}
final static class ResourceLimitsConf {
private static class ResourceConf {
private final int maxResources;
private final long maxResourcesMB;
private final long maxSingleResourceMB;
@ -277,14 +490,15 @@ final static class ResourceLimitsConf {
private final int numOfDCFiles;
private final int numOfDCArchives;
private final long sizeOfResource;
private final boolean pathsWithFrags;
private final boolean pathsWithScheme;
private final boolean absolutePaths;
static final ResourceLimitsConf DEFAULT = new ResourceLimitsConf();
private ResourceLimitsConf() {
private ResourceConf() {
this(new Builder());
}
private ResourceLimitsConf(Builder builder) {
private ResourceConf(Builder builder) {
this.maxResources = builder.maxResources;
this.maxResourcesMB = builder.maxResourcesMB;
this.maxSingleResourceMB = builder.maxSingleResourceMB;
@ -295,6 +509,9 @@ private ResourceLimitsConf(Builder builder) {
this.numOfDCFiles = builder.numOfDCFiles;
this.numOfDCArchives = builder.numOfDCArchives;
this.sizeOfResource = builder.sizeOfResource;
this.pathsWithFrags = builder.pathsWithFrags;
this.pathsWithScheme = builder.pathsWithScheme;
this.absolutePaths = builder.absolutePaths;
}
static class Builder {
@ -309,69 +526,176 @@ static class Builder {
private int numOfDCFiles = 0;
private int numOfDCArchives = 0;
private long sizeOfResource = 0;
private boolean pathsWithFrags = false;
private boolean pathsWithScheme = false;
private boolean absolutePaths = true;
Builder() {
private Builder() {
}
Builder setMaxResources(int max) {
private Builder setMaxResources(int max) {
this.maxResources = max;
return this;
}
Builder setMaxResourcesMB(long max) {
private Builder setMaxResourcesMB(long max) {
this.maxResourcesMB = max;
return this;
}
Builder setMaxSingleResourceMB(long max) {
private Builder setMaxSingleResourceMB(long max) {
this.maxSingleResourceMB = max;
return this;
}
Builder setNumOfTmpFiles(int num) {
private Builder setNumOfTmpFiles(int num) {
this.numOfTmpFiles = num;
return this;
}
Builder setNumOfTmpArchives(int num) {
private Builder setNumOfTmpArchives(int num) {
this.numOfTmpArchives = num;
return this;
}
Builder setNumOfTmpLibJars(int num) {
private Builder setNumOfTmpLibJars(int num) {
this.numOfTmpLibJars = num;
return this;
}
Builder setJobJar(boolean jar) {
private Builder setJobJar(boolean jar) {
this.jobJar = jar;
return this;
}
Builder setNumOfDCFiles(int num) {
private Builder setNumOfDCFiles(int num) {
this.numOfDCFiles = num;
return this;
}
Builder setNumOfDCArchives(int num) {
private Builder setNumOfDCArchives(int num) {
this.numOfDCArchives = num;
return this;
}
Builder setSizeOfResource(long sizeMB) {
private Builder setSizeOfResource(long sizeMB) {
this.sizeOfResource = sizeMB;
return this;
}
ResourceLimitsConf build() {
return new ResourceLimitsConf(this);
private Builder setPathsWithFrags(boolean fragments) {
this.pathsWithFrags = fragments;
return this;
}
private Builder setPathsWithScheme(boolean scheme) {
this.pathsWithScheme = scheme;
return this;
}
private Builder setAbsolutePaths(boolean absolute) {
this.absolutePaths = absolute;
return this;
}
ResourceConf build() {
return new ResourceConf(this);
}
}
private void setupJobConf(JobConf conf) {
conf.set("tmpfiles",
buildPathString("tmpFiles", this.numOfTmpFiles, ".txt"));
conf.set("tmpjars",
buildPathString("tmpjars", this.numOfTmpLibJars, ".jar"));
conf.set("tmparchives",
buildPathString("tmpArchives", this.numOfTmpArchives, ".tgz"));
conf.set(MRJobConfig.CACHE_ARCHIVES, buildDistributedCachePathString(
"cacheArchives", this.numOfDCArchives, ".tgz"));
conf.set(MRJobConfig.CACHE_FILES, buildDistributedCachePathString(
"cacheFiles", this.numOfDCFiles, ".txt"));
if (this.jobJar) {
String fragment = "";
if (pathsWithFrags) {
fragment = "#jobjarfrag.jar";
}
if (pathsWithScheme) {
conf.setJar("file:///jobjar.jar" + fragment);
} else {
if (absolutePaths) {
conf.setJar("/jobjar.jar" + fragment);
} else {
conf.setJar("jobjar.jar" + fragment);
}
}
}
conf.setInt(MRJobConfig.MAX_RESOURCES, this.maxResources);
conf.setLong(MRJobConfig.MAX_RESOURCES_MB, this.maxResourcesMB);
conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB,
this.maxSingleResourceMB);
}
// We always want absolute paths with a scheme in the DistributedCache, so
// we use a separate method to construct the path string.
private String buildDistributedCachePathString(String pathPrefix,
int numOfPaths, String extension) {
if (numOfPaths < 1) {
return "";
} else {
StringBuilder b = new StringBuilder();
b.append(buildPathStringSub(pathPrefix, "file:///" + pathPrefix,
extension, 0));
for (int i = 1; i < numOfPaths; i++) {
b.append("," + buildPathStringSub(pathPrefix, "file:///" + pathPrefix,
extension, i));
}
return b.toString();
}
}
private String buildPathString(String pathPrefix, int numOfPaths,
String extension) {
if (numOfPaths < 1) {
return "";
} else {
StringBuilder b = new StringBuilder();
String processedPath;
if (pathsWithScheme) {
processedPath = "file:///" + pathPrefix;
} else {
if (absolutePaths) {
processedPath = "/" + pathPrefix;
} else {
processedPath = pathPrefix;
}
}
b.append(buildPathStringSub(pathPrefix, processedPath, extension, 0));
for (int i = 1; i < numOfPaths; i++) {
b.append(","
+ buildPathStringSub(pathPrefix, processedPath, extension, i));
}
return b.toString();
}
}
private String buildPathStringSub(String pathPrefix, String processedPath,
String extension, int num) {
if (pathsWithFrags) {
return processedPath + num + extension + "#" + pathPrefix + "fragment"
+ num + extension;
} else {
return processedPath + num + extension;
}
}
}
class StubedUploader extends JobResourceUploader {
private class StubedUploader extends JobResourceUploader {
StubedUploader(JobConf conf) throws IOException {
super(FileSystem.getLocal(conf), false);
this(conf, false);
}
StubedUploader(JobConf conf, boolean useWildcard) throws IOException {
super(FileSystem.getLocal(conf), useWildcard);
}
@Override
@ -379,5 +703,26 @@ FileStatus getFileStatus(Map<URI, FileStatus> statCache, Configuration job,
Path p) throws IOException {
return mockedStatus;
}
@Override
boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
throws IOException {
// Do nothing. Stubbed out to avoid side effects. We don't actually need
// to create submit dirs.
return true;
}
@Override
Path copyRemoteFiles(Path parentDir, Path originalPath, Configuration conf,
short replication) throws IOException {
return new Path(destinationPathPrefix + originalPath.getName());
}
@Override
void copyJar(Path originalJarPath, Path submitJarFile, short replication)
throws IOException {
// Do nothing. Stubbed out to avoid side effects. We don't actually need
// to copy the jar to the remote fs.
}
}
}