MAPREDUCE-6690. Limit the number of resources a single map reduce job can submit for localization. Contributed by Chris Trezzo

This commit is contained in:
Jason Lowe 2016-08-17 16:21:20 +00:00
parent 7f05ff7a4e
commit f80a729832
6 changed files with 776 additions and 32 deletions

View File

@ -21,12 +21,16 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@ -34,6 +38,8 @@
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
@InterfaceStability.Unstable
class JobResourceUploader {
@ -86,31 +92,37 @@ public void uploadResources(Job job, Path submitJobDir) throws IOException {
FsPermission mapredSysPerms =
new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
// add all the command line files/ jars and archive
// first copy them to jobtrackers filesystem
uploadFiles(conf, submitJobDir, mapredSysPerms, replication);
uploadLibJars(conf, submitJobDir, mapredSysPerms, replication);
uploadArchives(conf, submitJobDir, mapredSysPerms, replication);
uploadJobJar(job, submitJobDir, replication);
Collection<String> files = conf.getStringCollection("tmpfiles");
Collection<String> libjars = conf.getStringCollection("tmpjars");
Collection<String> archives = conf.getStringCollection("tmparchives");
String jobJar = job.getJar();
Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
checkLocalizationLimits(conf, files, libjars, archives, jobJar, statCache);
uploadFiles(conf, files, submitJobDir, mapredSysPerms, replication);
uploadLibJars(conf, libjars, submitJobDir, mapredSysPerms, replication);
uploadArchives(conf, archives, submitJobDir, mapredSysPerms, replication);
uploadJobJar(job, jobJar, submitJobDir, replication);
addLog4jToDistributedCache(job, submitJobDir);
// set the timestamps of the archives and files
// set the public/private visibility of the archives and files
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf,
statCache);
// get DelegationToken for cached file
ClientDistributedCacheManager.getDelegationTokens(conf,
job.getCredentials());
}
private void uploadFiles(Configuration conf, Path submitJobDir,
FsPermission mapredSysPerms, short submitReplication) throws IOException {
String files = conf.get("tmpfiles");
private void uploadFiles(Configuration conf, Collection<String> files,
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
throws IOException {
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
if (files != null) {
if (!files.isEmpty()) {
FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
String[] fileArr = files.split(",");
for (String tmpFile : fileArr) {
for (String tmpFile : files) {
URI tmpURI = null;
try {
tmpURI = new URI(tmpFile);
@ -130,14 +142,13 @@ private void uploadFiles(Configuration conf, Path submitJobDir,
}
}
private void uploadLibJars(Configuration conf, Path submitJobDir,
FsPermission mapredSysPerms, short submitReplication) throws IOException {
String libjars = conf.get("tmpjars");
private void uploadLibJars(Configuration conf, Collection<String> libjars,
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
throws IOException {
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
if (libjars != null) {
if (!libjars.isEmpty()) {
FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
String[] libjarsArr = libjars.split(",");
for (String tmpjars : libjarsArr) {
for (String tmpjars : libjars) {
Path tmp = new Path(tmpjars);
Path newPath =
copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
@ -157,14 +168,13 @@ private void uploadLibJars(Configuration conf, Path submitJobDir,
}
}
private void uploadArchives(Configuration conf, Path submitJobDir,
FsPermission mapredSysPerms, short submitReplication) throws IOException {
String archives = conf.get("tmparchives");
private void uploadArchives(Configuration conf, Collection<String> archives,
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
throws IOException {
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
if (archives != null) {
if (!archives.isEmpty()) {
FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);
String[] archivesArr = archives.split(",");
for (String tmpArchives : archivesArr) {
for (String tmpArchives : archives) {
URI tmpURI;
try {
tmpURI = new URI(tmpArchives);
@ -185,9 +195,8 @@ private void uploadArchives(Configuration conf, Path submitJobDir,
}
}
private void uploadJobJar(Job job, Path submitJobDir, short submitReplication)
throws IOException {
String jobJar = job.getJar();
private void uploadJobJar(Job job, String jobJar, Path submitJobDir,
short submitReplication) throws IOException {
if (jobJar != null) { // copy jar to JobTracker's fs
// use jar name if job is not named.
if ("".equals(job.getJobName())) {
@ -208,6 +217,155 @@ private void uploadJobJar(Job job, Path submitJobDir, short submitReplication)
}
}
/**
* Verify that the resources this job is going to localize are within the
* localization limits.
*/
@VisibleForTesting
void checkLocalizationLimits(Configuration conf, Collection<String> files,
Collection<String> libjars, Collection<String> archives, String jobJar,
Map<URI, FileStatus> statCache) throws IOException {
LimitChecker limitChecker = new LimitChecker(conf);
if (!limitChecker.hasLimits()) {
// there are no limits set, so we are done.
return;
}
// Get the files and archives that are already in the distributed cache
Collection<String> dcFiles =
conf.getStringCollection(MRJobConfig.CACHE_FILES);
Collection<String> dcArchives =
conf.getStringCollection(MRJobConfig.CACHE_ARCHIVES);
for (String path : dcFiles) {
explorePath(conf, new Path(path), limitChecker, statCache);
}
for (String path : dcArchives) {
explorePath(conf, new Path(path), limitChecker, statCache);
}
for (String path : files) {
explorePath(conf, new Path(path), limitChecker, statCache);
}
for (String path : libjars) {
explorePath(conf, new Path(path), limitChecker, statCache);
}
for (String path : archives) {
explorePath(conf, new Path(path), limitChecker, statCache);
}
if (jobJar != null) {
explorePath(conf, new Path(jobJar), limitChecker, statCache);
}
}
@VisibleForTesting
protected static final String MAX_RESOURCE_ERR_MSG =
"This job has exceeded the maximum number of submitted resources";
@VisibleForTesting
protected static final String MAX_TOTAL_RESOURCE_MB_ERR_MSG =
"This job has exceeded the maximum size of submitted resources";
@VisibleForTesting
protected static final String MAX_SINGLE_RESOURCE_MB_ERR_MSG =
"This job has exceeded the maximum size of a single submitted resource";
private static class LimitChecker {
LimitChecker(Configuration conf) {
this.maxNumOfResources =
conf.getInt(MRJobConfig.MAX_RESOURCES,
MRJobConfig.MAX_RESOURCES_DEFAULT);
this.maxSizeMB =
conf.getLong(MRJobConfig.MAX_RESOURCES_MB,
MRJobConfig.MAX_RESOURCES_MB_DEFAULT);
this.maxSizeOfResourceMB =
conf.getLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB,
MRJobConfig.MAX_SINGLE_RESOURCE_MB_DEFAULT);
this.totalConfigSizeBytes = maxSizeMB * 1024 * 1024;
this.totalConfigSizeOfResourceBytes = maxSizeOfResourceMB * 1024 * 1024;
}
private long totalSizeBytes = 0;
private int totalNumberOfResources = 0;
private long currentMaxSizeOfFileBytes = 0;
private final long maxSizeMB;
private final int maxNumOfResources;
private final long maxSizeOfResourceMB;
private final long totalConfigSizeBytes;
private final long totalConfigSizeOfResourceBytes;
private boolean hasLimits() {
return maxNumOfResources > 0 || maxSizeMB > 0 || maxSizeOfResourceMB > 0;
}
private void addFile(Path p, long fileSizeBytes) throws IOException {
totalNumberOfResources++;
totalSizeBytes += fileSizeBytes;
if (fileSizeBytes > currentMaxSizeOfFileBytes) {
currentMaxSizeOfFileBytes = fileSizeBytes;
}
if (totalConfigSizeBytes > 0 && totalSizeBytes > totalConfigSizeBytes) {
throw new IOException(MAX_TOTAL_RESOURCE_MB_ERR_MSG + " (Max: "
+ maxSizeMB + "MB).");
}
if (maxNumOfResources > 0 &&
totalNumberOfResources > maxNumOfResources) {
throw new IOException(MAX_RESOURCE_ERR_MSG + " (Max: "
+ maxNumOfResources + ").");
}
if (totalConfigSizeOfResourceBytes > 0
&& currentMaxSizeOfFileBytes > totalConfigSizeOfResourceBytes) {
throw new IOException(MAX_SINGLE_RESOURCE_MB_ERR_MSG + " (Max: "
+ maxSizeOfResourceMB + "MB, Violating resource: " + p + ").");
}
}
}
/**
* Recursively explore the given path and enforce the limits for resource
* localization. This method assumes that there are no symlinks in the
* directory structure.
*/
private void explorePath(Configuration job, Path p,
LimitChecker limitChecker, Map<URI, FileStatus> statCache)
throws IOException {
Path pathWithScheme = p;
if (!pathWithScheme.toUri().isAbsolute()) {
// the path does not have a scheme, so we assume it is a path from the
// local filesystem
FileSystem localFs = FileSystem.getLocal(job);
pathWithScheme = localFs.makeQualified(p);
}
FileStatus status = getFileStatus(statCache, job, pathWithScheme);
if (status.isDirectory()) {
FileStatus[] statusArray =
pathWithScheme.getFileSystem(job).listStatus(pathWithScheme);
for (FileStatus s : statusArray) {
explorePath(job, s.getPath(), limitChecker, statCache);
}
} else {
limitChecker.addFile(pathWithScheme, status.getLen());
}
}
@VisibleForTesting
FileStatus getFileStatus(Map<URI, FileStatus> statCache,
Configuration job, Path p) throws IOException {
URI u = p.toUri();
FileStatus status = statCache.get(u);
if (status == null) {
status = p.getFileSystem(job).getFileStatus(p);
statCache.put(u, status);
}
return status;
}
// copies a file to the jobtracker filesystem and returns the path where it
// was copied to
private Path copyRemoteFiles(Path parentDir, Path originalPath,

View File

@ -966,6 +966,34 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB =
128;
/**
* The maximum number of resources a map reduce job is allowed to submit for
* localization via files, libjars, archives, and jobjar command line
* arguments and through the distributed cache. If set to 0 the limit is
* ignored.
*/
String MAX_RESOURCES = "mapreduce.job.cache.limit.max-resources";
int MAX_RESOURCES_DEFAULT = 0;
/**
* The maximum size (in MB) a map reduce job is allowed to submit for
* localization via files, libjars, archives, and jobjar command line
* arguments and through the distributed cache. If set to 0 the limit is
* ignored.
*/
String MAX_RESOURCES_MB = "mapreduce.job.cache.limit.max-resources-mb";
long MAX_RESOURCES_MB_DEFAULT = 0;
/**
* The maximum size (in MB) of a single resource a map reduce job is allow to
* submit for localization via files, libjars, archives, and jobjar command
* line arguments and through the distributed cache. If set to 0 the limit is
* ignored.
*/
String MAX_SINGLE_RESOURCE_MB =
"mapreduce.job.cache.limit.max-single-resource-mb";
long MAX_SINGLE_RESOURCE_MB_DEFAULT = 0;
/**
* Number of OPPORTUNISTIC Containers per 100 containers that will be
* requested by the MRAppMaster. The Default value is 0, which implies all

View File

@ -54,10 +54,23 @@ public class ClientDistributedCacheManager {
public static void determineTimestampsAndCacheVisibilities(Configuration job)
throws IOException {
Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
determineTimestampsAndCacheVisibilities(job, statCache);
}
/**
* See ClientDistributedCacheManager#determineTimestampsAndCacheVisibilities(
* Configuration).
*
* @param job Configuration of a job
* @param statCache A map containing cached file status objects
* @throws IOException if there is a problem with the underlying filesystem
*/
public static void determineTimestampsAndCacheVisibilities(Configuration job,
Map<URI, FileStatus> statCache) throws IOException {
determineTimestamps(job, statCache);
determineCacheVisibilities(job, statCache);
}
/**
* Determines timestamps of files to be cached, and stores those
* in the configuration. This is intended to be used internally by JobClient

View File

@ -1915,6 +1915,36 @@
<value>GET,OPTIONS,HEAD</value>
</property>
<property>
<name>mapreduce.job.cache.limit.max-resources</name>
<value>0</value>
<description>The maximum number of resources a map reduce job is allowed to
submit for localization via files, libjars, archives, and jobjar command
line arguments and through the distributed cache. If set to 0 the limit is
ignored.
</description>
</property>
<property>
<name>mapreduce.job.cache.limit.max-resources-mb</name>
<value>0</value>
<description>The maximum size (in MB) a map reduce job is allowed to submit
for localization via files, libjars, archives, and jobjar command line
arguments and through the distributed cache. If set to 0 the limit is
ignored.
</description>
</property>
<property>
<name>mapreduce.job.cache.limit.max-single-resource-mb</name>
<value>0</value>
<description>The maximum size (in MB) of a single resource a map reduce job
is allow to submit for localization via files, libjars, archives, and
jobjar command line arguments and through the distributed cache. If set to
0 the limit is ignored.
</description>
</property>
<property>
<description>
Value of the xframe-options

View File

@ -0,0 +1,355 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Assert;
import org.junit.Test;
/**
* A class for unit testing JobResourceUploader.
*/
public class TestJobResourceUploader {
@Test
public void testAllDefaults() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
runLimitsTest(b.build(), true, null);
}
@Test
public void testNoLimitsWithResources() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
b.setNumOfDCArchives(1);
b.setNumOfDCFiles(1);
b.setNumOfTmpArchives(10);
b.setNumOfTmpFiles(1);
b.setNumOfTmpLibJars(1);
b.setJobJar(true);
b.setSizeOfResource(10);
runLimitsTest(b.build(), true, null);
}
@Test
public void testAtResourceLimit() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
b.setNumOfDCArchives(1);
b.setNumOfDCFiles(1);
b.setNumOfTmpArchives(1);
b.setNumOfTmpFiles(1);
b.setNumOfTmpLibJars(1);
b.setJobJar(true);
b.setMaxResources(6);
runLimitsTest(b.build(), true, null);
}
@Test
public void testOverResourceLimit() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
b.setNumOfDCArchives(1);
b.setNumOfDCFiles(1);
b.setNumOfTmpArchives(1);
b.setNumOfTmpFiles(2);
b.setNumOfTmpLibJars(1);
b.setJobJar(true);
b.setMaxResources(6);
runLimitsTest(b.build(), false, ResourceViolation.NUMBER_OF_RESOURCES);
}
@Test
public void testAtResourcesMBLimit() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
b.setNumOfDCArchives(1);
b.setNumOfDCFiles(1);
b.setNumOfTmpArchives(1);
b.setNumOfTmpFiles(2);
b.setNumOfTmpLibJars(1);
b.setJobJar(true);
b.setMaxResourcesMB(7);
b.setSizeOfResource(1);
runLimitsTest(b.build(), true, null);
}
@Test
public void testOverResourcesMBLimit() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
b.setNumOfDCArchives(1);
b.setNumOfDCFiles(2);
b.setNumOfTmpArchives(1);
b.setNumOfTmpFiles(2);
b.setNumOfTmpLibJars(1);
b.setJobJar(true);
b.setMaxResourcesMB(7);
b.setSizeOfResource(1);
runLimitsTest(b.build(), false, ResourceViolation.TOTAL_RESOURCE_SIZE);
}
@Test
public void testAtSingleResourceMBLimit() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
b.setNumOfDCArchives(1);
b.setNumOfDCFiles(2);
b.setNumOfTmpArchives(1);
b.setNumOfTmpFiles(2);
b.setNumOfTmpLibJars(1);
b.setJobJar(true);
b.setMaxSingleResourceMB(1);
b.setSizeOfResource(1);
runLimitsTest(b.build(), true, null);
}
@Test
public void testOverSingleResourceMBLimit() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
b.setNumOfDCArchives(1);
b.setNumOfDCFiles(2);
b.setNumOfTmpArchives(1);
b.setNumOfTmpFiles(2);
b.setNumOfTmpLibJars(1);
b.setJobJar(true);
b.setMaxSingleResourceMB(1);
b.setSizeOfResource(10);
runLimitsTest(b.build(), false, ResourceViolation.SINGLE_RESOURCE_SIZE);
}
private enum ResourceViolation {
NUMBER_OF_RESOURCES, TOTAL_RESOURCE_SIZE, SINGLE_RESOURCE_SIZE;
}
private void runLimitsTest(ResourceLimitsConf rlConf,
boolean checkShouldSucceed, ResourceViolation violation)
throws IOException {
if (!checkShouldSucceed && violation == null) {
Assert.fail("Test is misconfigured. checkShouldSucceed is set to false"
+ " and a ResourceViolation is not specified.");
}
JobConf conf = setupJobConf(rlConf);
JobResourceUploader uploader = new StubedUploader(conf);
long configuredSizeOfResourceBytes = rlConf.sizeOfResource * 1024 * 1024;
when(mockedStatus.getLen()).thenReturn(configuredSizeOfResourceBytes);
when(mockedStatus.isDirectory()).thenReturn(false);
Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
try {
uploader.checkLocalizationLimits(conf,
conf.getStringCollection("tmpfiles"),
conf.getStringCollection("tmpjars"),
conf.getStringCollection("tmparchives"),
conf.getJar(), statCache);
Assert.assertTrue("Limits check succeeded when it should have failed.",
checkShouldSucceed);
} catch (IOException e) {
if (checkShouldSucceed) {
Assert.fail("Limits check failed when it should have succeeded: " + e);
}
switch (violation) {
case NUMBER_OF_RESOURCES:
if (!e.getMessage().contains(
JobResourceUploader.MAX_RESOURCE_ERR_MSG)) {
Assert.fail("Test failed unexpectedly: " + e);
}
break;
case TOTAL_RESOURCE_SIZE:
if (!e.getMessage().contains(
JobResourceUploader.MAX_TOTAL_RESOURCE_MB_ERR_MSG)) {
Assert.fail("Test failed unexpectedly: " + e);
}
break;
case SINGLE_RESOURCE_SIZE:
if (!e.getMessage().contains(
JobResourceUploader.MAX_SINGLE_RESOURCE_MB_ERR_MSG)) {
Assert.fail("Test failed unexpectedly: " + e);
}
break;
default:
Assert.fail("Test failed unexpectedly: " + e);
break;
}
}
}
private final FileStatus mockedStatus = mock(FileStatus.class);
private JobConf setupJobConf(ResourceLimitsConf rlConf) {
JobConf conf = new JobConf();
conf.setInt(MRJobConfig.MAX_RESOURCES, rlConf.maxResources);
conf.setLong(MRJobConfig.MAX_RESOURCES_MB, rlConf.maxResourcesMB);
conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB,
rlConf.maxSingleResourceMB);
conf.set("tmpfiles",
buildPathString("file://tmpFiles", rlConf.numOfTmpFiles));
conf.set("tmpjars",
buildPathString("file://tmpjars", rlConf.numOfTmpLibJars));
conf.set("tmparchives",
buildPathString("file://tmpArchives", rlConf.numOfTmpArchives));
conf.set(MRJobConfig.CACHE_ARCHIVES,
buildPathString("file://cacheArchives", rlConf.numOfDCArchives));
conf.set(MRJobConfig.CACHE_FILES,
buildPathString("file://cacheFiles", rlConf.numOfDCFiles));
if (rlConf.jobJar) {
conf.setJar("file://jobjar.jar");
}
return conf;
}
private String buildPathString(String pathPrefix, int numOfPaths) {
if (numOfPaths < 1) {
return "";
} else {
StringBuilder b = new StringBuilder();
b.append(pathPrefix + 0);
for (int i = 1; i < numOfPaths; i++) {
b.append("," + pathPrefix + i);
}
return b.toString();
}
}
final static class ResourceLimitsConf {
private final int maxResources;
private final long maxResourcesMB;
private final long maxSingleResourceMB;
private final int numOfTmpFiles;
private final int numOfTmpArchives;
private final int numOfTmpLibJars;
private final boolean jobJar;
private final int numOfDCFiles;
private final int numOfDCArchives;
private final long sizeOfResource;
static final ResourceLimitsConf DEFAULT = new ResourceLimitsConf();
private ResourceLimitsConf() {
this(new Builder());
}
private ResourceLimitsConf(Builder builder) {
this.maxResources = builder.maxResources;
this.maxResourcesMB = builder.maxResourcesMB;
this.maxSingleResourceMB = builder.maxSingleResourceMB;
this.numOfTmpFiles = builder.numOfTmpFiles;
this.numOfTmpArchives = builder.numOfTmpArchives;
this.numOfTmpLibJars = builder.numOfTmpLibJars;
this.jobJar = builder.jobJar;
this.numOfDCFiles = builder.numOfDCFiles;
this.numOfDCArchives = builder.numOfDCArchives;
this.sizeOfResource = builder.sizeOfResource;
}
static class Builder {
// Defaults
private int maxResources = 0;
private long maxResourcesMB = 0;
private long maxSingleResourceMB = 0;
private int numOfTmpFiles = 0;
private int numOfTmpArchives = 0;
private int numOfTmpLibJars = 0;
private boolean jobJar = false;
private int numOfDCFiles = 0;
private int numOfDCArchives = 0;
private long sizeOfResource = 0;
Builder() {
}
Builder setMaxResources(int max) {
this.maxResources = max;
return this;
}
Builder setMaxResourcesMB(long max) {
this.maxResourcesMB = max;
return this;
}
Builder setMaxSingleResourceMB(long max) {
this.maxSingleResourceMB = max;
return this;
}
Builder setNumOfTmpFiles(int num) {
this.numOfTmpFiles = num;
return this;
}
Builder setNumOfTmpArchives(int num) {
this.numOfTmpArchives = num;
return this;
}
Builder setNumOfTmpLibJars(int num) {
this.numOfTmpLibJars = num;
return this;
}
Builder setJobJar(boolean jar) {
this.jobJar = jar;
return this;
}
Builder setNumOfDCFiles(int num) {
this.numOfDCFiles = num;
return this;
}
Builder setNumOfDCArchives(int num) {
this.numOfDCArchives = num;
return this;
}
Builder setSizeOfResource(long sizeMB) {
this.sizeOfResource = sizeMB;
return this;
}
ResourceLimitsConf build() {
return new ResourceLimitsConf(this);
}
}
}
class StubedUploader extends JobResourceUploader {
StubedUploader(JobConf conf) throws IOException {
super(FileSystem.getLocal(conf), false);
}
@Override
FileStatus getFileStatus(Map<URI, FileStatus> statCache, Configuration job,
Path p) throws IOException {
return mockedStatus;
}
}
}

View File

@ -138,6 +138,8 @@ public class TestMRJobs {
static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
private static final String OUTPUT_ROOT_DIR = "/tmp/" +
TestMRJobs.class.getSimpleName();
private static final Path TEST_RESOURCES_DIR = new Path(TEST_ROOT_DIR,
"localizedResources");
@BeforeClass
public static void setup() throws IOException {
@ -173,7 +175,7 @@ public static void setup() throws IOException {
}
@AfterClass
public static void tearDown() {
public static void tearDown() throws IOException {
if (mrCluster != null) {
mrCluster.stop();
mrCluster = null;
@ -182,6 +184,10 @@ public static void tearDown() {
dfsCluster.shutdown();
dfsCluster = null;
}
if (localFs.exists(TEST_RESOURCES_DIR)) {
// clean up resource directory
localFs.delete(TEST_RESOURCES_DIR, true);
}
}
@After
@ -189,6 +195,39 @@ public void resetInit() {
numSleepReducers = DEFAULT_REDUCES;
}
private static void setupJobResourceDirs() throws IOException {
if (localFs.exists(TEST_RESOURCES_DIR)) {
// clean up directory
localFs.delete(TEST_RESOURCES_DIR, true);
}
localFs.mkdirs(TEST_RESOURCES_DIR);
FSDataOutputStream outF1 = null;
try {
// 10KB file
outF1 = localFs.create(new Path(TEST_RESOURCES_DIR, "file1.txt"));
outF1.write(new byte[10 * 1024]);
} finally {
if (outF1 != null) {
outF1.close();
}
}
localFs.createNewFile(new Path(TEST_RESOURCES_DIR, "file2.txt"));
Path subDir = new Path(TEST_RESOURCES_DIR, "subDir");
localFs.mkdirs(subDir);
FSDataOutputStream outF3 = null;
try {
// 1MB (plus 10 Bytes) file
outF3 = localFs.create(new Path(subDir, "file3.txt"));
outF3.write(new byte[(1 * 1024 * 1024) + 10]);
} finally {
if (outF3 != null) {
outF3.close();
}
}
localFs.createNewFile(new Path(subDir, "file4.txt"));
}
@Test (timeout = 300000)
public void testSleepJob() throws Exception {
testSleepJobInternal(false);
@ -199,16 +238,99 @@ public void testSleepJobWithRemoteJar() throws Exception {
testSleepJobInternal(true);
}
@Test(timeout = 300000)
public void testSleepJobWithLocalResourceUnderLimit() throws Exception {
Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set limits to well above what is expected
sleepConf.setInt(MRJobConfig.MAX_RESOURCES, 6);
sleepConf.setLong(MRJobConfig.MAX_RESOURCES_MB, 6);
setupJobResourceDirs();
sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString());
testSleepJobInternal(sleepConf, false, true, null);
}
@Test(timeout = 300000)
public void testSleepJobWithLocalResourceSizeOverLimit() throws Exception {
Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set limits to well below what is expected
sleepConf.setLong(MRJobConfig.MAX_RESOURCES_MB, 1);
setupJobResourceDirs();
sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString());
testSleepJobInternal(sleepConf, false, false,
ResourceViolation.TOTAL_RESOURCE_SIZE);
}
@Test(timeout = 300000)
public void testSleepJobWithLocalResourceNumberOverLimit() throws Exception {
Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set limits to well below what is expected
sleepConf.setInt(MRJobConfig.MAX_RESOURCES, 1);
setupJobResourceDirs();
sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString());
testSleepJobInternal(sleepConf, false, false,
ResourceViolation.NUMBER_OF_RESOURCES);
}
@Test(timeout = 300000)
public void testSleepJobWithLocalResourceCheckAndRemoteJar()
throws Exception {
Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set limits to well above what is expected
sleepConf.setInt(MRJobConfig.MAX_RESOURCES, 6);
sleepConf.setLong(MRJobConfig.MAX_RESOURCES_MB, 6);
setupJobResourceDirs();
sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString());
testSleepJobInternal(sleepConf, true, true, null);
}
@Test(timeout = 300000)
public void testSleepJobWithLocalIndividualResourceOverLimit()
throws Exception {
Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set limits to well below what is expected
sleepConf.setInt(MRJobConfig.MAX_SINGLE_RESOURCE_MB, 1);
setupJobResourceDirs();
sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString());
testSleepJobInternal(sleepConf, false, false,
ResourceViolation.SINGLE_RESOURCE_SIZE);
}
@Test(timeout = 300000)
public void testSleepJobWithLocalIndividualResourceUnderLimit()
throws Exception {
Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set limits to well below what is expected
sleepConf.setInt(MRJobConfig.MAX_SINGLE_RESOURCE_MB, 2);
setupJobResourceDirs();
sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString());
testSleepJobInternal(sleepConf, false, true, null);
}
private void testSleepJobInternal(boolean useRemoteJar) throws Exception {
testSleepJobInternal(new Configuration(mrCluster.getConfig()),
useRemoteJar, true, null);
}
private enum ResourceViolation {
NUMBER_OF_RESOURCES, TOTAL_RESOURCE_SIZE, SINGLE_RESOURCE_SIZE;
}
private void testSleepJobInternal(Configuration sleepConf,
boolean useRemoteJar, boolean jobSubmissionShouldSucceed,
ResourceViolation violation) throws Exception {
LOG.info("\n\n\nStarting testSleepJob: useRemoteJar=" + useRemoteJar);
if (!jobSubmissionShouldSucceed && violation == null) {
Assert.fail("Test is misconfigured. jobSubmissionShouldSucceed is set"
+ " to false and a ResourceViolation is not specified.");
}
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set master address to local to test that local mode applied iff framework == local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
@ -229,7 +351,45 @@ private void testSleepJobInternal(boolean useRemoteJar) throws Exception {
job.setJarByClass(SleepJob.class);
}
job.setMaxMapAttempts(1); // speed up failures
job.submit();
try {
job.submit();
Assert.assertTrue("JobSubmission succeeded when it should have failed.",
jobSubmissionShouldSucceed);
} catch (IOException e) {
if (jobSubmissionShouldSucceed) {
Assert
.fail("Job submission failed when it should have succeeded: " + e);
}
switch (violation) {
case NUMBER_OF_RESOURCES:
if (!e.getMessage().contains(
"This job has exceeded the maximum number of"
+ " submitted resources")) {
Assert.fail("Test failed unexpectedly: " + e);
}
break;
case TOTAL_RESOURCE_SIZE:
if (!e.getMessage().contains(
"This job has exceeded the maximum size of submitted resources")) {
Assert.fail("Test failed unexpectedly: " + e);
}
break;
case SINGLE_RESOURCE_SIZE:
if (!e.getMessage().contains(
"This job has exceeded the maximum size of a single submitted")) {
Assert.fail("Test failed unexpectedly: " + e);
}
break;
default:
Assert.fail("Test failed unexpectedly: " + e);
break;
}
// we are done with the test (job submission failed)
return;
}
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);