MAPREDUCE-5951. Add support for the YARN Shared Cache.

This commit is contained in:
Chris Trezzo 2017-10-12 10:58:02 -07:00
parent 13fcfb3d46
commit e46d5bb962
19 changed files with 1703 additions and 231 deletions

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
@ -1414,6 +1415,19 @@ public static String escapeString(String data) {
new char[] {'"', '=', '.'});
}
/*
* The goal is to make sure only the NM that hosts MRAppMaster will upload
* resources to shared cache. Clean up the shared cache policies for all
* resources so that later when TaskAttemptImpl creates
* ContainerLaunchContext, LocalResource.setShouldBeUploadedToSharedCache will
* be set up to false. In that way, the NMs that host the task containers
* won't try to upload the resources to shared cache.
*/
private static void cleanupSharedCacheUploadPolicies(Configuration conf) {
Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap());
Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap());
}
public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
@ -1492,6 +1506,8 @@ public JobStateInternal transition(JobImpl job, JobEvent event) {
job.allowedReduceFailuresPercent =
job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
cleanupSharedCacheUploadPolicies(job.conf);
// create the Tasks but don't start them yet
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);

View File

@ -21,6 +21,8 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -708,17 +710,38 @@ private int getCpuRequired(Configuration conf, TaskType taskType) {
/**
* Create a {@link LocalResource} record with all the given parameters.
* The NM that hosts AM container will upload resources to shared cache.
* Thus there is no need to ask task container's NM to upload the
* resources to shared cache. Set the shared cache upload policy to
* false.
*/
private static LocalResource createLocalResource(FileSystem fc, Path file,
LocalResourceType type, LocalResourceVisibility visibility)
throws IOException {
String fileSymlink, LocalResourceType type,
LocalResourceVisibility visibility) throws IOException {
FileStatus fstat = fc.getFileStatus(file);
URL resourceURL = URL.fromPath(fc.resolvePath(fstat.getPath()));
// We need to be careful when converting from path to URL to add a fragment
// so that the symlink name when localized will be correct.
Path qualifiedPath = fc.resolvePath(fstat.getPath());
URI uriWithFragment = null;
boolean useFragment = fileSymlink != null && !fileSymlink.equals("");
try {
if (useFragment) {
uriWithFragment = new URI(qualifiedPath.toUri() + "#" + fileSymlink);
} else {
uriWithFragment = qualifiedPath.toUri();
}
} catch (URISyntaxException e) {
throw new IOException(
"Error parsing local resource path."
+ " Path was not able to be converted to a URI: " + qualifiedPath,
e);
}
URL resourceURL = URL.fromURI(uriWithFragment);
long resourceSize = fstat.getLen();
long resourceModificationTime = fstat.getModificationTime();
return LocalResource.newInstance(resourceURL, type, visibility,
resourceSize, resourceModificationTime);
resourceSize, resourceModificationTime, false);
}
/**
@ -829,8 +852,18 @@ private static void configureJobJar(Configuration conf,
final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf);
Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(),
jobJarFs.getWorkingDirectory());
LocalResource rc = createLocalResource(jobJarFs, remoteJobJar,
LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION);
LocalResourceVisibility jobJarViz =
conf.getBoolean(MRJobConfig.JOBJAR_VISIBILITY,
MRJobConfig.JOBJAR_VISIBILITY_DEFAULT)
? LocalResourceVisibility.PUBLIC
: LocalResourceVisibility.APPLICATION;
// We hard code the job.jar localized symlink in the container directory.
// This is because the mapreduce app expects the job.jar to be named
// accordingly. Additionally we set the shared cache upload policy to
// false. Resources are uploaded by the AM if necessary.
LocalResource rc =
createLocalResource(jobJarFs, remoteJobJar, MRJobConfig.JOB_JAR,
LocalResourceType.PATTERN, jobJarViz);
String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
rc.setPattern(pattern);
@ -855,9 +888,12 @@ private static void configureJobConf(Configuration conf,
Path remoteJobConfPath =
new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
FileSystem remoteFS = FileSystem.get(conf);
// There is no point to ask task container's NM to upload the resource
// to shared cache (job conf is not shared). Therefore, createLocalResource
// will set the shared cache upload policy to false
localResources.put(MRJobConfig.JOB_CONF_FILE,
createLocalResource(remoteFS, remoteJobConfPath, LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION));
createLocalResource(remoteFS, remoteJobConfPath, null,
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
LOG.info("The job-conf file on the remote FS is "
+ remoteJobConfPath.toUri().toASCIIString());
}

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.util;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
/**
* Helper class for MR applications that parses distributed cache artifacts and
* creates a map of LocalResources.
*/
@SuppressWarnings("deprecation")
@Private
@Unstable
class LocalResourceBuilder {
public static final Log LOG = LogFactory.getLog(LocalResourceBuilder.class);
private Configuration conf;
private LocalResourceType type;
private URI[] uris;
private long[] timestamps;
private long[] sizes;
private boolean[] visibilities;
private Map<String, Boolean> sharedCacheUploadPolicies;
LocalResourceBuilder() {
}
void setConf(Configuration c) {
this.conf = c;
}
void setType(LocalResourceType t) {
this.type = t;
}
void setUris(URI[] u) {
this.uris = u;
}
void setTimestamps(long[] t) {
this.timestamps = t;
}
void setSizes(long[] s) {
this.sizes = s;
}
void setVisibilities(boolean[] v) {
this.visibilities = v;
}
void setSharedCacheUploadPolicies(Map<String, Boolean> policies) {
this.sharedCacheUploadPolicies = policies;
}
void createLocalResources(Map<String, LocalResource> localResources)
throws IOException {
if (uris != null) {
// Sanity check
if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
(uris.length != visibilities.length)) {
throw new IllegalArgumentException("Invalid specification for " +
"distributed-cache artifacts of type " + type + " :" +
" #uris=" + uris.length +
" #timestamps=" + timestamps.length +
" #visibilities=" + visibilities.length
);
}
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
Path p = new Path(u);
FileSystem remoteFS = p.getFileSystem(conf);
String linkName = null;
if (p.getName().equals(DistributedCache.WILDCARD)) {
p = p.getParent();
linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD;
}
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
// If there's no wildcard, try using the fragment for the link
if (linkName == null) {
linkName = u.getFragment();
// Because we don't know what's in the fragment, we have to handle
// it with care.
if (linkName != null) {
Path linkPath = new Path(linkName);
if (linkPath.isAbsolute()) {
throw new IllegalArgumentException("Resource name must be "
+ "relative");
}
linkName = linkPath.toUri().getPath();
}
} else if (u.getFragment() != null) {
throw new IllegalArgumentException("Invalid path URI: " + p +
" - cannot contain both a URI fragment and a wildcard");
}
// If there's no wildcard or fragment, just link to the file name
if (linkName == null) {
linkName = p.getName();
}
LocalResource orig = localResources.get(linkName);
if(orig != null && !orig.getResource().equals(URL.fromURI(p.toUri()))) {
throw new InvalidJobConfException(
getResourceDescription(orig.getType()) + orig.getResource()
+
" conflicts with " + getResourceDescription(type) + u);
}
Boolean sharedCachePolicy = sharedCacheUploadPolicies.get(u.toString());
sharedCachePolicy =
sharedCachePolicy == null ? Boolean.FALSE : sharedCachePolicy;
localResources.put(linkName, LocalResource.newInstance(URL.fromURI(p
.toUri()), type, visibilities[i] ? LocalResourceVisibility.PUBLIC
: LocalResourceVisibility.PRIVATE,
sizes[i], timestamps[i], sharedCachePolicy));
}
}
}
private static String getResourceDescription(LocalResourceType type) {
if (type == LocalResourceType.ARCHIVE
|| type == LocalResourceType.PATTERN) {
return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";
}
return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
}
}

View File

@ -42,7 +42,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskLog;
@ -67,12 +67,9 @@
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* Helper class for MR applications
@ -251,10 +248,16 @@ public static void setClasspath(Map<String, String> environment,
if (!userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
/*
* We use "*" for the name of the JOB_JAR instead of MRJobConfig.JOB_JAR for
* the case where the job jar is not necessarily named "job.jar". This can
* happen, for example, when the job is leveraging a resource from the YARN
* shared cache.
*/
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, conf);
MRJobConfig.JOB_JAR + Path.SEPARATOR + "*", conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
@ -471,27 +474,32 @@ public static Path getStartJobCommitFile(Configuration conf, String user,
return startCommitFile;
}
public static void setupDistributedCache(
Configuration conf,
Map<String, LocalResource> localResources)
throws IOException {
@SuppressWarnings("deprecation")
public static void setupDistributedCache(Configuration conf,
Map<String, LocalResource> localResources) throws IOException {
LocalResourceBuilder lrb = new LocalResourceBuilder();
lrb.setConf(conf);
// Cache archives
parseDistributedCacheArtifacts(conf, localResources,
LocalResourceType.ARCHIVE,
DistributedCache.getCacheArchives(conf),
DistributedCache.getArchiveTimestamps(conf),
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
DistributedCache.getArchiveVisibilities(conf));
lrb.setType(LocalResourceType.ARCHIVE);
lrb.setUris(DistributedCache.getCacheArchives(conf));
lrb.setTimestamps(DistributedCache.getArchiveTimestamps(conf));
lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES));
lrb.setVisibilities(DistributedCache.getArchiveVisibilities(conf));
lrb.setSharedCacheUploadPolicies(
Job.getArchiveSharedCacheUploadPolicies(conf));
lrb.createLocalResources(localResources);
// Cache files
parseDistributedCacheArtifacts(conf,
localResources,
LocalResourceType.FILE,
DistributedCache.getCacheFiles(conf),
DistributedCache.getFileTimestamps(conf),
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
DistributedCache.getFileVisibilities(conf));
lrb.setType(LocalResourceType.FILE);
lrb.setUris(DistributedCache.getCacheFiles(conf));
lrb.setTimestamps(DistributedCache.getFileTimestamps(conf));
lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES));
lrb.setVisibilities(DistributedCache.getFileVisibilities(conf));
lrb.setSharedCacheUploadPolicies(
Job.getFileSharedCacheUploadPolicies(conf));
lrb.createLocalResources(localResources);
}
/**
@ -550,89 +558,6 @@ public static void setupDistributedCacheLocal(Configuration conf)
}
}
private static String getResourceDescription(LocalResourceType type) {
if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) {
return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";
}
return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
}
// TODO - Move this to MR!
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
// long[], boolean[], Path[], FileType)
private static void parseDistributedCacheArtifacts(
Configuration conf,
Map<String, LocalResource> localResources,
LocalResourceType type,
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[])
throws IOException {
if (uris != null) {
// Sanity check
if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
(uris.length != visibilities.length)) {
throw new IllegalArgumentException("Invalid specification for " +
"distributed-cache artifacts of type " + type + " :" +
" #uris=" + uris.length +
" #timestamps=" + timestamps.length +
" #visibilities=" + visibilities.length
);
}
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
Path p = new Path(u);
FileSystem remoteFS = p.getFileSystem(conf);
String linkName = null;
if (p.getName().equals(DistributedCache.WILDCARD)) {
p = p.getParent();
linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD;
}
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
// If there's no wildcard, try using the fragment for the link
if (linkName == null) {
linkName = u.getFragment();
// Because we don't know what's in the fragment, we have to handle
// it with care.
if (linkName != null) {
Path linkPath = new Path(linkName);
if (linkPath.isAbsolute()) {
throw new IllegalArgumentException("Resource name must be "
+ "relative");
}
linkName = linkPath.toUri().getPath();
}
} else if (u.getFragment() != null) {
throw new IllegalArgumentException("Invalid path URI: " + p +
" - cannot contain both a URI fragment and a wildcard");
}
// If there's no wildcard or fragment, just link to the file name
if (linkName == null) {
linkName = p.getName();
}
LocalResource orig = localResources.get(linkName);
if(orig != null && !orig.getResource().equals(URL.fromURI(p.toUri()))) {
throw new InvalidJobConfException(
getResourceDescription(orig.getType()) + orig.getResource() +
" conflicts with " + getResourceDescription(type) + u);
}
localResources.put(linkName, LocalResource
.newInstance(URL.fromURI(p.toUri()), type, visibilities[i]
? LocalResourceVisibility.PUBLIC : LocalResourceVisibility.PRIVATE,
sizes[i], timestamps[i]));
}
}
}
// TODO - Move this to MR!
private static long[] getFileSizes(Configuration conf, String key) {
String[] strs = conf.getStrings(key);

View File

@ -30,6 +30,8 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@ -39,6 +41,7 @@
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@ -164,6 +167,9 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
});
DistributedCache.addCacheFile(file, conf);
Map<String, Boolean> policies = new HashMap<String, Boolean>();
policies.put(file.toString(), true);
Job.setFileSharedCacheUploadPolicies(conf, policies);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "201");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false");
@ -272,6 +278,9 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
DistributedCache.addCacheFile(file, conf);
DistributedCache.addCacheFile(file, conf);
Map<String, Boolean> policies = new HashMap<String, Boolean>();
policies.put(file.toString(), true);
Job.setFileSharedCacheUploadPolicies(conf, policies);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "201,201");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false,false");

View File

@ -261,7 +261,7 @@ public void testSetClasspathWithUserPrecendence() {
}
String env_str = env.get("CLASSPATH");
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/*",
"job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$$() + "/*"));
assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
@ -281,7 +281,7 @@ public void testSetClasspathWithNoUserPrecendence() {
}
String env_str = env.get("CLASSPATH");
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
Arrays.asList("job.jar/*", "job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$$() + "/*"));
assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in"
+ " the classpath!", env_str.contains(expectedClasspath));
@ -303,7 +303,7 @@ public void testSetClasspathWithJobClassloader() throws IOException {
assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!",
cp.contains("PWD"));
String expectedAppClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/*",
"job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$$() + "/*"));
assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app"
@ -332,7 +332,7 @@ public void testSetClasspathWithFramework() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, FRAMEWORK_CLASSPATH);
MRApps.setClasspath(env, conf);
final String stdClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
Arrays.asList("job.jar/*", "job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$$() + "/*"));
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList(ApplicationConstants.Environment.PWD.$$(),

View File

@ -51,6 +51,12 @@
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.skyscreamer</groupId>
<artifactId>jsonassert</artifactId>

View File

@ -21,12 +21,17 @@
import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.FileSystem;
@ -1303,6 +1308,227 @@ private void setUseNewAPI() throws IOException {
}
}
/**
* Add a file to job config for shared cache processing. If shared cache is
* enabled, it will return true, otherwise, return false. We don't check with
* SCM here given application might not be able to provide the job id;
* ClientSCMProtocol.use requires the application id. Job Submitter will read
* the files from job config and take care of things.
*
* @param resource The resource that Job Submitter will process later using
* shared cache.
* @param conf Configuration to add the resource to
* @return whether the resource has been added to the configuration
*/
@Unstable
public static boolean addFileToSharedCache(URI resource, Configuration conf) {
SharedCacheConfig scConfig = new SharedCacheConfig();
scConfig.init(conf);
if (scConfig.isSharedCacheFilesEnabled()) {
String files = conf.get(MRJobConfig.FILES_FOR_SHARED_CACHE);
conf.set(
MRJobConfig.FILES_FOR_SHARED_CACHE,
files == null ? resource.toString() : files + ","
+ resource.toString());
return true;
} else {
return false;
}
}
/**
* Add a file to job config for shared cache processing. If shared cache is
* enabled, it will return true, otherwise, return false. We don't check with
* SCM here given application might not be able to provide the job id;
* ClientSCMProtocol.use requires the application id. Job Submitter will read
* the files from job config and take care of things. Job Submitter will also
* add the file to classpath. Intended to be used by user code.
*
* @param resource The resource that Job Submitter will process later using
* shared cache.
* @param conf Configuration to add the resource to
* @return whether the resource has been added to the configuration
*/
@Unstable
public static boolean addFileToSharedCacheAndClasspath(URI resource,
Configuration conf) {
SharedCacheConfig scConfig = new SharedCacheConfig();
scConfig.init(conf);
if (scConfig.isSharedCacheLibjarsEnabled()) {
String files =
conf.get(MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE);
conf.set(
MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE,
files == null ? resource.toString() : files + ","
+ resource.toString());
return true;
} else {
return false;
}
}
/**
* Add an archive to job config for shared cache processing. If shared cache
* is enabled, it will return true, otherwise, return false. We don't check
* with SCM here given application might not be able to provide the job id;
* ClientSCMProtocol.use requires the application id. Job Submitter will read
* the files from job config and take care of things. Intended to be used by
* user code.
*
* @param resource The resource that Job Submitter will process later using
* shared cache.
* @param conf Configuration to add the resource to
* @return whether the resource has been added to the configuration
*/
@Unstable
public static boolean addArchiveToSharedCache(URI resource,
Configuration conf) {
SharedCacheConfig scConfig = new SharedCacheConfig();
scConfig.init(conf);
if (scConfig.isSharedCacheArchivesEnabled()) {
String files = conf.get(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE);
conf.set(
MRJobConfig.ARCHIVES_FOR_SHARED_CACHE,
files == null ? resource.toString() : files + ","
+ resource.toString());
return true;
} else {
return false;
}
}
/**
* This is to set the shared cache upload policies for files. If the parameter
* was previously set, this method will replace the old value with the new
* provided map.
*
* @param conf Configuration which stores the shared cache upload policies
* @param policies A map containing the shared cache upload policies for a set
* of resources. The key is the url of the resource and the value is
* the upload policy. True if it should be uploaded, false otherwise.
*/
@Unstable
public static void setFileSharedCacheUploadPolicies(Configuration conf,
Map<String, Boolean> policies) {
setSharedCacheUploadPolicies(conf, policies, true);
}
/**
* This is to set the shared cache upload policies for archives. If the
* parameter was previously set, this method will replace the old value with
* the new provided map.
*
* @param conf Configuration which stores the shared cache upload policies
* @param policies A map containing the shared cache upload policies for a set
* of resources. The key is the url of the resource and the value is
* the upload policy. True if it should be uploaded, false otherwise.
*/
@Unstable
public static void setArchiveSharedCacheUploadPolicies(Configuration conf,
Map<String, Boolean> policies) {
setSharedCacheUploadPolicies(conf, policies, false);
}
// We use a double colon because a colon is a reserved character in a URI and
// there should not be two colons next to each other.
private static final String DELIM = "::";
/**
* Set the shared cache upload policies config parameter. This is done by
* serializing the provided map of shared cache upload policies into a config
* parameter. If the parameter was previously set, this method will replace
* the old value with the new provided map.
*
* @param conf Configuration which stores the shared cache upload policies
* @param policies A map containing the shared cache upload policies for a set
* of resources. The key is the url of the resource and the value is
* the upload policy. True if it should be uploaded, false otherwise.
* @param areFiles True if these policies are for files, false if they are for
* archives.
*/
private static void setSharedCacheUploadPolicies(Configuration conf,
Map<String, Boolean> policies, boolean areFiles) {
if (policies != null) {
StringBuilder sb = new StringBuilder();
Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator();
Map.Entry<String, Boolean> e;
if (it.hasNext()) {
e = it.next();
sb.append(e.getKey() + DELIM + e.getValue());
} else {
// policies is an empty map, just skip setting the parameter
return;
}
while (it.hasNext()) {
e = it.next();
sb.append("," + e.getKey() + DELIM + e.getValue());
}
String confParam =
areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
: MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
conf.set(confParam, sb.toString());
}
}
/**
* Deserialize a map of shared cache upload policies from a config parameter.
*
* @param conf Configuration which stores the shared cache upload policies
* @param areFiles True if these policies are for files, false if they are for
* archives.
* @return A map containing the shared cache upload policies for a set of
* resources. The key is the url of the resource and the value is the
* upload policy. True if it should be uploaded, false otherwise.
*/
private static Map<String, Boolean> getSharedCacheUploadPolicies(
Configuration conf, boolean areFiles) {
String confParam =
areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
: MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
Collection<String> policies = conf.getStringCollection(confParam);
String[] policy;
Map<String, Boolean> policyMap = new LinkedHashMap<String, Boolean>();
for (String s : policies) {
policy = s.split(DELIM);
if (policy.length != 2) {
LOG.error(confParam
+ " is mis-formatted, returning empty shared cache upload policies."
+ " Error on [" + s + "]");
return new LinkedHashMap<String, Boolean>();
}
policyMap.put(policy[0], Boolean.parseBoolean(policy[1]));
}
return policyMap;
}
/**
* This is to get the shared cache upload policies for files.
*
* @param conf Configuration which stores the shared cache upload policies
* @return A map containing the shared cache upload policies for a set of
* resources. The key is the url of the resource and the value is the
* upload policy. True if it should be uploaded, false otherwise.
*/
@Unstable
public static Map<String, Boolean> getFileSharedCacheUploadPolicies(
Configuration conf) {
return getSharedCacheUploadPolicies(conf, true);
}
/**
* This is to get the shared cache upload policies for archives.
*
* @param conf Configuration which stores the shared cache upload policies
* @return A map containing the shared cache upload policies for a set of
* resources. The key is the url of the resource and the value is the
* upload policy. True if it should be uploaded, false otherwise.
*/
@Unstable
public static Map<String, Boolean> getArchiveSharedCacheUploadPolicies(
Configuration conf) {
return getSharedCacheUploadPolicies(conf, false);
}
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {

View File

@ -24,12 +24,13 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -40,30 +41,100 @@
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.SharedCacheClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
@InterfaceStability.Unstable
/**
* This class is responsible for uploading resources from the client to HDFS
* that are associated with a MapReduce job.
*/
@Private
@Unstable
class JobResourceUploader {
protected static final Log LOG = LogFactory.getLog(JobResourceUploader.class);
private final boolean useWildcard;
private final FileSystem jtFs;
private SharedCacheClient scClient = null;
private SharedCacheConfig scConfig = new SharedCacheConfig();
private ApplicationId appId = null;
JobResourceUploader(FileSystem submitFs, boolean useWildcard) {
this.jtFs = submitFs;
this.useWildcard = useWildcard;
}
private void initSharedCache(JobID jobid, Configuration conf) {
this.scConfig.init(conf);
if (this.scConfig.isSharedCacheEnabled()) {
this.scClient = createSharedCacheClient(conf);
appId = jobIDToAppId(jobid);
}
}
/*
* We added this method so that we could do the conversion between JobId and
* ApplicationId for the shared cache client. This logic is very similar to
* the org.apache.hadoop.mapreduce.TypeConverter#toYarn method. We don't use
* that because mapreduce-client-core can not depend on
* mapreduce-client-common.
*/
private ApplicationId jobIDToAppId(JobID jobId) {
return ApplicationId.newInstance(Long.parseLong(jobId.getJtIdentifier()),
jobId.getId());
}
private void stopSharedCache() {
if (scClient != null) {
scClient.stop();
scClient = null;
}
}
/**
* Create, initialize and start a new shared cache client.
*/
@VisibleForTesting
protected SharedCacheClient createSharedCacheClient(Configuration conf) {
SharedCacheClient scc = SharedCacheClient.createSharedCacheClient();
scc.init(conf);
scc.start();
return scc;
}
/**
* Upload and configure files, libjars, jobjars, and archives pertaining to
* the passed job.
*
* <p>
* This client will use the shared cache for libjars, files, archives and
* jobjars if it is enabled. When shared cache is enabled, it will try to use
* the shared cache and fall back to the default behavior when the scm isn't
* available.
* <p>
* 1. For the resources that have been successfully shared, we will continue
* to use them in a shared fashion.
* <p>
* 2. For the resources that weren't in the cache and need to be uploaded by
* NM, we won't ask NM to upload them.
*
* @param job the job containing the files to be uploaded
* @param submitJobDir the submission directory of the job
* @throws IOException
*/
public void uploadResources(Job job, Path submitJobDir) throws IOException {
try {
initSharedCache(job.getJobID(), job.getConfiguration());
uploadResourcesInternal(job, submitJobDir);
} finally {
stopSharedCache();
}
}
private void uploadResourcesInternal(Job job, Path submitJobDir)
throws IOException {
Configuration conf = job.getConfiguration();
short replication =
(short) conf.getInt(Job.SUBMIT_REPLICATION,
@ -90,6 +161,7 @@ public void uploadResources(Job job, Path submitJobDir) throws IOException {
+ " already exists!! This is unexpected.Please check what's there in"
+ " that directory");
}
// Create the submission directory for the MapReduce job.
submitJobDir = jtFs.makeQualified(submitJobDir);
submitJobDir = new Path(submitJobDir.toUri().getPath());
FsPermission mapredSysPerms =
@ -101,20 +173,45 @@ public void uploadResources(Job job, Path submitJobDir) throws IOException {
disableErasureCodingForPath(jtFs, submitJobDir);
}
// Get the resources that have been added via command line arguments in the
// GenericOptionsParser (i.e. files, libjars, archives).
Collection<String> files = conf.getStringCollection("tmpfiles");
Collection<String> libjars = conf.getStringCollection("tmpjars");
Collection<String> archives = conf.getStringCollection("tmparchives");
String jobJar = job.getJar();
// Merge resources that have been programmatically specified for the shared
// cache via the Job API.
files.addAll(conf.getStringCollection(MRJobConfig.FILES_FOR_SHARED_CACHE));
libjars.addAll(conf.getStringCollection(
MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE));
archives.addAll(conf
.getStringCollection(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE));
Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
checkLocalizationLimits(conf, files, libjars, archives, jobJar, statCache);
uploadFiles(conf, files, submitJobDir, mapredSysPerms, replication);
uploadLibJars(conf, libjars, submitJobDir, mapredSysPerms, replication);
uploadArchives(conf, archives, submitJobDir, mapredSysPerms, replication);
uploadJobJar(job, jobJar, submitJobDir, replication);
Map<String, Boolean> fileSCUploadPolicies =
new LinkedHashMap<String, Boolean>();
Map<String, Boolean> archiveSCUploadPolicies =
new LinkedHashMap<String, Boolean>();
uploadFiles(job, files, submitJobDir, mapredSysPerms, replication,
fileSCUploadPolicies, statCache);
uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication,
fileSCUploadPolicies, statCache);
uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication,
archiveSCUploadPolicies, statCache);
uploadJobJar(job, jobJar, submitJobDir, replication, statCache);
addLog4jToDistributedCache(job, submitJobDir);
// Note, we do not consider resources in the distributed cache for the
// shared cache at this time. Only resources specified via the
// GenericOptionsParser or the jobjar.
Job.setFileSharedCacheUploadPolicies(conf, fileSCUploadPolicies);
Job.setArchiveSharedCacheUploadPolicies(conf, archiveSCUploadPolicies);
// set the timestamps of the archives and files
// set the public/private visibility of the archives and files
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf,
@ -125,9 +222,11 @@ public void uploadResources(Job job, Path submitJobDir) throws IOException {
}
@VisibleForTesting
void uploadFiles(Configuration conf, Collection<String> files,
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
void uploadFiles(Job job, Collection<String> files,
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication,
Map<String, Boolean> fileSCUploadPolicies, Map<URI, FileStatus> statCache)
throws IOException {
Configuration conf = job.getConfiguration();
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
if (!files.isEmpty()) {
mkdirs(jtFs, filesDir, mapredSysPerms);
@ -140,17 +239,33 @@ void uploadFiles(Configuration conf, Collection<String> files,
+ " Argument must be a valid URI: " + tmpFile, e);
}
Path tmp = new Path(tmpURI);
Path newPath = copyRemoteFiles(filesDir, tmp, conf, submitReplication);
try {
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
DistributedCache.addCacheFile(pathURI, conf);
} catch (URISyntaxException ue) {
// should not throw a uri exception
throw new IOException(
"Failed to create a URI (URISyntaxException) for the remote path "
+ newPath + ". This was based on the files parameter: "
+ tmpFile,
ue);
URI newURI = null;
boolean uploadToSharedCache = false;
if (scConfig.isSharedCacheFilesEnabled()) {
newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
if (newURI == null) {
uploadToSharedCache = true;
}
}
if (newURI == null) {
Path newPath =
copyRemoteFiles(filesDir, tmp, conf, submitReplication);
try {
newURI = getPathURI(newPath, tmpURI.getFragment());
} catch (URISyntaxException ue) {
// should not throw a uri exception
throw new IOException(
"Failed to create a URI (URISyntaxException) for the"
+ " remote path " + newPath
+ ". This was based on the files parameter: " + tmpFile,
ue);
}
}
job.addCacheFile(newURI);
if (scConfig.isSharedCacheFilesEnabled()) {
fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
}
}
}
@ -159,9 +274,11 @@ void uploadFiles(Configuration conf, Collection<String> files,
// Suppress warning for use of DistributedCache (it is everywhere).
@SuppressWarnings("deprecation")
@VisibleForTesting
void uploadLibJars(Configuration conf, Collection<String> libjars,
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
void uploadLibJars(Job job, Collection<String> libjars, Path submitJobDir,
FsPermission mapredSysPerms, short submitReplication,
Map<String, Boolean> fileSCUploadPolicies, Map<URI, FileStatus> statCache)
throws IOException {
Configuration conf = job.getConfiguration();
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
if (!libjars.isEmpty()) {
mkdirs(jtFs, libjarsDir, mapredSysPerms);
@ -176,23 +293,53 @@ void uploadLibJars(Configuration conf, Collection<String> libjars,
+ " Argument must be a valid URI: " + tmpjars, e);
}
Path tmp = new Path(tmpURI);
Path newPath =
copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
try {
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
if (!foundFragment) {
foundFragment = pathURI.getFragment() != null;
URI newURI = null;
boolean uploadToSharedCache = false;
boolean fromSharedCache = false;
if (scConfig.isSharedCacheLibjarsEnabled()) {
newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
if (newURI == null) {
uploadToSharedCache = true;
} else {
fromSharedCache = true;
}
DistributedCache.addFileToClassPath(new Path(pathURI.getPath()), conf,
jtFs, false);
libjarURIs.add(pathURI);
} catch (URISyntaxException ue) {
// should not throw a uri exception
throw new IOException(
"Failed to create a URI (URISyntaxException) for the remote path "
+ newPath + ". This was based on the libjar parameter: "
+ tmpjars,
ue);
}
if (newURI == null) {
Path newPath =
copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
try {
newURI = getPathURI(newPath, tmpURI.getFragment());
} catch (URISyntaxException ue) {
// should not throw a uri exception
throw new IOException(
"Failed to create a URI (URISyntaxException) for the"
+ " remote path " + newPath
+ ". This was based on the libjar parameter: " + tmpjars,
ue);
}
}
if (!foundFragment) {
// We do not count shared cache paths containing fragments as a
// "foundFragment." This is because these resources are not in the
// staging directory and will be added to the distributed cache
// separately.
foundFragment = (newURI.getFragment() != null) && !fromSharedCache;
}
DistributedCache.addFileToClassPath(new Path(newURI.getPath()), conf,
jtFs, false);
if (fromSharedCache) {
// We simply add this URI to the distributed cache. It will not come
// from the staging directory (it is in the shared cache), so we
// must add it to the cache regardless of the wildcard feature.
DistributedCache.addCacheFile(newURI, conf);
} else {
libjarURIs.add(newURI);
}
if (scConfig.isSharedCacheLibjarsEnabled()) {
fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
}
}
@ -210,9 +357,11 @@ void uploadLibJars(Configuration conf, Collection<String> libjars,
}
@VisibleForTesting
void uploadArchives(Configuration conf, Collection<String> archives,
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
throws IOException {
void uploadArchives(Job job, Collection<String> archives,
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication,
Map<String, Boolean> archiveSCUploadPolicies,
Map<URI, FileStatus> statCache) throws IOException {
Configuration conf = job.getConfiguration();
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
if (!archives.isEmpty()) {
mkdirs(jtFs, archivesDir, mapredSysPerms);
@ -225,18 +374,34 @@ void uploadArchives(Configuration conf, Collection<String> archives,
+ " Argument must be a valid URI: " + tmpArchives, e);
}
Path tmp = new Path(tmpURI);
Path newPath =
copyRemoteFiles(archivesDir, tmp, conf, submitReplication);
try {
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
DistributedCache.addCacheArchive(pathURI, conf);
} catch (URISyntaxException ue) {
// should not throw an uri excpetion
throw new IOException(
"Failed to create a URI (URISyntaxException) for the remote path"
+ newPath + ". This was based on the archive parameter: "
+ tmpArchives,
ue);
URI newURI = null;
boolean uploadToSharedCache = false;
if (scConfig.isSharedCacheArchivesEnabled()) {
newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
if (newURI == null) {
uploadToSharedCache = true;
}
}
if (newURI == null) {
Path newPath =
copyRemoteFiles(archivesDir, tmp, conf, submitReplication);
try {
newURI = getPathURI(newPath, tmpURI.getFragment());
} catch (URISyntaxException ue) {
// should not throw a uri exception
throw new IOException(
"Failed to create a URI (URISyntaxException) for the"
+ " remote path " + newPath
+ ". This was based on the archive parameter: "
+ tmpArchives,
ue);
}
}
job.addCacheArchive(newURI);
if (scConfig.isSharedCacheArchivesEnabled()) {
archiveSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
}
}
}
@ -244,7 +409,9 @@ void uploadArchives(Configuration conf, Collection<String> archives,
@VisibleForTesting
void uploadJobJar(Job job, String jobJar, Path submitJobDir,
short submitReplication) throws IOException {
short submitReplication, Map<URI, FileStatus> statCache)
throws IOException {
Configuration conf = job.getConfiguration();
if (jobJar != null) { // copy jar to JobTracker's fs
// use jar name if job is not named.
if ("".equals(job.getJobName())) {
@ -252,12 +419,59 @@ void uploadJobJar(Job job, String jobJar, Path submitJobDir,
}
Path jobJarPath = new Path(jobJar);
URI jobJarURI = jobJarPath.toUri();
// If the job jar is already in a global fs,
// we don't need to copy it from local fs
if (jobJarURI.getScheme() == null || jobJarURI.getScheme().equals("file")) {
copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
submitReplication);
job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
Path newJarPath = null;
boolean uploadToSharedCache = false;
if (jobJarURI.getScheme() == null ||
jobJarURI.getScheme().equals("file")) {
// job jar is on the local file system
if (scConfig.isSharedCacheJobjarEnabled()) {
// We must have a qualified path for the shared cache client. We can
// assume this is for the local filesystem
jobJarPath = FileSystem.getLocal(conf).makeQualified(jobJarPath);
// Don't add a resource name here because the resource name (i.e.
// job.jar directory symlink) will always be hard coded to job.jar for
// the job.jar
URI newURI =
useSharedCache(jobJarPath.toUri(), null, statCache, conf, false);
if (newURI == null) {
uploadToSharedCache = true;
} else {
newJarPath = stringToPath(newURI.toString());
// The job jar is coming from the shared cache (i.e. a public
// place), so we want the job.jar to have a public visibility.
conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true);
}
}
if (newJarPath == null) {
newJarPath = JobSubmissionFiles.getJobJar(submitJobDir);
copyJar(jobJarPath, newJarPath, submitReplication);
}
} else {
// job jar is in a remote file system
if (scConfig.isSharedCacheJobjarEnabled()) {
// Don't add a resource name here because the resource name (i.e.
// job.jar directory symlink) will always be hard coded to job.jar for
// the job.jar
URI newURI = useSharedCache(jobJarURI, null, statCache, conf, false);
if (newURI == null) {
uploadToSharedCache = true;
newJarPath = jobJarPath;
} else {
newJarPath = stringToPath(newURI.toString());
// The job jar is coming from the shared cache (i.e. a public
// place), so we want the job.jar to have a public visibility.
conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true);
}
} else {
// we don't need to upload the jobjar to the staging directory because
// it is already in an accessible place
newJarPath = jobJarPath;
}
}
job.setJar(newJarPath.toString());
if (scConfig.isSharedCacheJobjarEnabled()) {
conf.setBoolean(MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY,
uploadToSharedCache);
}
} else {
LOG.warn("No job jar file set. User classes may not be found. "
@ -267,7 +481,9 @@ void uploadJobJar(Job job, String jobJar, Path submitJobDir,
/**
* Verify that the resources this job is going to localize are within the
* localization limits.
* localization limits. We count all resources towards these limits regardless
* of where they are coming from (i.e. local, distributed cache, or shared
* cache).
*/
@VisibleForTesting
void checkLocalizationLimits(Configuration conf, Collection<String> files,
@ -464,6 +680,80 @@ Path copyRemoteFiles(Path parentDir, Path originalPath,
return newPath;
}
/**
* Checksum a local resource file and call use for that resource with the scm.
*/
private URI useSharedCache(URI sourceFile, String resourceName,
Map<URI, FileStatus> statCache, Configuration conf, boolean honorFragment)
throws IOException {
if (scClient == null) {
return null;
}
Path filePath = new Path(sourceFile);
if (getFileStatus(statCache, conf, filePath).isDirectory()) {
LOG.warn("Shared cache does not support directories"
+ " (see YARN-6097)." + " Will not upload " + filePath
+ " to the shared cache.");
return null;
}
String rn = resourceName;
if (honorFragment) {
if (sourceFile.getFragment() != null) {
rn = sourceFile.getFragment();
}
}
// If for whatever reason, we can't even calculate checksum for
// a resource, something is really wrong with the file system;
// even non-SCM approach won't work. Let us just throw the exception.
String checksum = scClient.getFileChecksum(filePath);
URL url = null;
try {
url = scClient.use(this.appId, checksum);
} catch (YarnException e) {
LOG.warn("Error trying to contact the shared cache manager,"
+ " disabling the SCMClient for the rest of this job submission", e);
/*
* If we fail to contact the SCM, we do not use it for the rest of this
* JobResourceUploader's life. This prevents us from having to timeout
* each time we try to upload a file while the SCM is unavailable. Instead
* we timeout/error the first time and quickly revert to the default
* behavior without the shared cache. We do this by stopping the shared
* cache client and setting it to null.
*/
stopSharedCache();
}
if (url != null) {
// Because we deal with URI's in mapreduce, we need to convert the URL to
// a URI and add a fragment if necessary.
URI uri = null;
try {
String name = new Path(url.getFile()).getName();
if (rn != null && !name.equals(rn)) {
// A name was specified that is different then the URL in the shared
// cache. Therefore, we need to set the fragment portion of the URI to
// preserve the user's desired name. We assume that there is no
// existing fragment in the URL since the shared cache manager does
// not use fragments.
uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(),
url.getPort(), url.getFile(), null, rn);
} else {
uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(),
url.getPort(), url.getFile(), null, null);
}
return uri;
} catch (URISyntaxException e) {
LOG.warn("Error trying to convert URL received from shared cache to"
+ " a URI: " + url.toString());
return null;
}
} else {
return null;
}
}
@VisibleForTesting
void copyJar(Path originalJarPath, Path submitJarFile,
short replication) throws IOException {

View File

@ -193,6 +193,77 @@ public interface MRJobConfig {
public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";
/**
* This parameter controls the visibility of the localized job jar on the node
* manager. If set to true, the visibility will be set to
* LocalResourceVisibility.PUBLIC. If set to false, the visibility will be set
* to LocalResourceVisibility.APPLICATION. This is a generated parameter and
* should not be set manually via config files.
*/
String JOBJAR_VISIBILITY = "mapreduce.job.jobjar.visibility";
boolean JOBJAR_VISIBILITY_DEFAULT = false;
/**
* This is a generated parameter and should not be set manually via config
* files.
*/
String JOBJAR_SHARED_CACHE_UPLOAD_POLICY =
"mapreduce.job.jobjar.sharedcache.uploadpolicy";
boolean JOBJAR_SHARED_CACHE_UPLOAD_POLICY_DEFAULT = false;
/**
* This is a generated parameter and should not be set manually via config
* files.
*/
String CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES =
"mapreduce.job.cache.files.sharedcache.uploadpolicies";
/**
* This is a generated parameter and should not be set manually via config
* files.
*/
String CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES =
"mapreduce.job.cache.archives.sharedcache.uploadpolicies";
/**
* A comma delimited list of file resources that are needed for this MapReduce
* job. These resources, if the files resource type is enabled, should either
* use the shared cache or be added to the shared cache. This parameter can be
* modified programmatically using the MapReduce Job api.
*/
String FILES_FOR_SHARED_CACHE = "mapreduce.job.cache.sharedcache.files";
/**
* A comma delimited list of libjar resources that are needed for this
* MapReduce job. These resources, if the libjars resource type is enabled,
* should either use the shared cache or be added to the shared cache. These
* resources will also be added to the classpath of all tasks for this
* MapReduce job. This parameter can be modified programmatically using the
* MapReduce Job api.
*/
String FILES_FOR_CLASSPATH_AND_SHARED_CACHE =
"mapreduce.job.cache.sharedcache.files.addtoclasspath";
/**
* A comma delimited list of archive resources that are needed for this
* MapReduce job. These resources, if the archives resource type is enabled,
* should either use the shared cache or be added to the shared cache. This
* parameter can be modified programmatically using the MapReduce Job api.
*/
String ARCHIVES_FOR_SHARED_CACHE =
"mapreduce.job.cache.sharedcache.archives";
/**
* A comma delimited list of resource categories that are enabled for the
* shared cache. If a category is enabled, resources in that category will be
* uploaded to the shared cache. The valid categories are: jobjar, libjars,
* files, archives. If "disabled" is specified then all categories are
* disabled. If "enabled" is specified then all categories are enabled.
*/
String SHARED_CACHE_MODE = "mapreduce.job.sharedcache.mode";
String SHARED_CACHE_MODE_DEFAULT = "disabled";
/**
* @deprecated Symlinks are always on and cannot be disabled.
*/

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* A class for parsing configuration parameters associated with the shared
* cache.
*/
@Private
@Unstable
public class SharedCacheConfig {
protected static final Log LOG = LogFactory.getLog(SharedCacheConfig.class);
private boolean sharedCacheFilesEnabled = false;
private boolean sharedCacheLibjarsEnabled = false;
private boolean sharedCacheArchivesEnabled = false;
private boolean sharedCacheJobjarEnabled = false;
public void init(Configuration conf) {
if (!MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(
MRConfig.FRAMEWORK_NAME))) {
// Shared cache is only valid if the job runs on yarn
return;
}
if(!conf.getBoolean(YarnConfiguration.SHARED_CACHE_ENABLED,
YarnConfiguration.DEFAULT_SHARED_CACHE_ENABLED)) {
return;
}
Collection<String> configs = StringUtils.getTrimmedStringCollection(
conf.get(MRJobConfig.SHARED_CACHE_MODE,
MRJobConfig.SHARED_CACHE_MODE_DEFAULT));
if (configs.contains("files")) {
this.sharedCacheFilesEnabled = true;
}
if (configs.contains("libjars")) {
this.sharedCacheLibjarsEnabled = true;
}
if (configs.contains("archives")) {
this.sharedCacheArchivesEnabled = true;
}
if (configs.contains("jobjar")) {
this.sharedCacheJobjarEnabled = true;
}
if (configs.contains("enabled")) {
this.sharedCacheFilesEnabled = true;
this.sharedCacheLibjarsEnabled = true;
this.sharedCacheArchivesEnabled = true;
this.sharedCacheJobjarEnabled = true;
}
if (configs.contains("disabled")) {
this.sharedCacheFilesEnabled = false;
this.sharedCacheLibjarsEnabled = false;
this.sharedCacheArchivesEnabled = false;
this.sharedCacheJobjarEnabled = false;
}
}
public boolean isSharedCacheFilesEnabled() {
return sharedCacheFilesEnabled;
}
public boolean isSharedCacheLibjarsEnabled() {
return sharedCacheLibjarsEnabled;
}
public boolean isSharedCacheArchivesEnabled() {
return sharedCacheArchivesEnabled;
}
public boolean isSharedCacheJobjarEnabled() {
return sharedCacheJobjarEnabled;
}
public boolean isSharedCacheEnabled() {
return (sharedCacheFilesEnabled || sharedCacheLibjarsEnabled ||
sharedCacheArchivesEnabled || sharedCacheJobjarEnabled);
}
}

View File

@ -648,6 +648,17 @@
</description>
</property>
<property>
<name>mapreduce.job.sharedcache.mode</name>
<value>disabled</value>
<description>
A comma delimited list of resource categories to submit to the shared cache.
The valid categories are: jobjar, libjars, files, archives.
If "disabled" is specified then the job submission code will not use
the shared cache.
</description>
</property>
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value>0</value>

View File

@ -0,0 +1,100 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
MR Support for YARN Shared Cache
==================
<!-- MACRO{toc|fromDepth=0|toDepth=2} -->
Overview
-------
MapReduce support for the YARN shared cache allows MapReduce jobs to take advantage
of additional resource caching. This saves network bandwidth between the job
submission client as well as within the YARN cluster itself. This will reduce job
submission time and overall job runtime.
Enabling/Disabling the shared cache
-------
First, your YARN cluster must have the shared cache service running. Please see YARN documentation
for information on how to setup the shared cache service.
A MapReduce user can specify what resources are eligible to be uploaded to the shared cache
based on resource type. This is done using a configuration parameter in mapred-site.xml:
```
<property>
<name>mapreduce.job.sharedcache.mode</name>
<value>disabled</value>
<description>
A comma delimited list of resource categories to submit to the
shared cache. The valid categories are: jobjar, libjars, files,
archives. If "disabled" is specified then the job submission code
will not use the shared cache.
</description>
</property>
```
If a resource type is listed, it will check the shared cache to see if the resource is already in the
cache. If so, it will use the cached resource, if not, it will specify that the resource needs to be
uploaded asynchronously.
Specifying resources for the cache
-------
A MapReduce user has 3 ways to specify resources for a MapReduce job:
1. **The command line via the generic options parser (i.e. -files, -archives, -libjars):** If a
resource is specified via the command line and the resource type is enabled for the
shared cache, that resource will use the shared cache.
2. **The distributed cache api:** If a resource is specified via the distributed cache the
resource will not use the shared cache regardless of if the resource type is enabled for
the shared cache.
3. **The shared cache api:** This is a new set of methods added to the
org.apache.hadoop.mapreduce.Job api. It allows users to add a file to the shared cache,
add it to the shared cache and the classpath and add an archive to the shared cache.
These resources will be placed in the distributed cache and, if their resource type is
enabled the client will use the shared cache as well.
Resource naming
-------
It is important to ensure that each resource for a MapReduce job has a unique file name.
This prevents symlink clobbering when YARN containers running MapReduce tasks are localized
during container launch. A user can specify their own resource name by using the fragment
portion of a URI. For example, for file resources specified on the command line, it could look
like this:
```
-files /local/path/file1.txt#foo.txt,/local/path2/file1.txt#bar.txt
```
In the above example two files, named file1.txt, will be localized with two different names: foo.txt
and bar.txt.
Resource Visibility
-------
All resources in the shared cache have a PUBLIC visibility.
MapReduce client behavior while the shared cache is unavailable
-------
In the event that the shared cache manager is unavailable, the MapReduce client uses a fail-fast
mechanism. If the MapReduce client fails to contact the shared cache manager, the client will
no longer use the shared cache for the rest of that job submission. This
prevents the MapReduce client from timing out each time it tries to check for a resource
in the shared cache. The MapReduce client quickly reverts to the default behavior and submits a
Job as if the shared cache was never enabled in the first place.

View File

@ -220,7 +220,7 @@ public void testOverSingleResourceMBLimit() throws IOException {
destinationPathPrefix + "tmpArchives1.tgz#tmpArchivesfragment1.tgz" };
private String jobjarSubmitDir = "/jobjar-submit-dir";
private String expectedJobJar = jobjarSubmitDir + "/job.jar";
private String basicExpectedJobJar = jobjarSubmitDir + "/job.jar";
@Test
public void testPathsWithNoFragNoSchemeRelative() throws IOException {
@ -236,7 +236,7 @@ public void testPathsWithNoFragNoSchemeRelative() throws IOException {
JobResourceUploader uploader = new StubedUploader(jConf);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
expectedArchivesNoFrags, expectedJobJar);
expectedArchivesNoFrags, basicExpectedJobJar);
}
@Test
@ -254,7 +254,7 @@ public void testPathsWithNoFragNoSchemeAbsolute() throws IOException {
JobResourceUploader uploader = new StubedUploader(jConf);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
expectedArchivesNoFrags, expectedJobJar);
expectedArchivesNoFrags, basicExpectedJobJar);
}
@Test
@ -272,7 +272,7 @@ public void testPathsWithFragNoSchemeAbsolute() throws IOException {
JobResourceUploader uploader = new StubedUploader(jConf);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
expectedArchivesWithFrags, expectedJobJar);
expectedArchivesWithFrags, basicExpectedJobJar);
}
@Test
@ -290,7 +290,7 @@ public void testPathsWithFragNoSchemeRelative() throws IOException {
JobResourceUploader uploader = new StubedUploader(jConf);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
expectedArchivesWithFrags, expectedJobJar);
expectedArchivesWithFrags, basicExpectedJobJar);
}
@Test
@ -308,7 +308,7 @@ public void testPathsWithFragSchemeAbsolute() throws IOException {
JobResourceUploader uploader = new StubedUploader(jConf);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
expectedArchivesWithFrags, expectedJobJar);
expectedArchivesWithFrags, basicExpectedJobJar);
}
@Test
@ -326,7 +326,7 @@ public void testPathsWithNoFragWithSchemeAbsolute() throws IOException {
JobResourceUploader uploader = new StubedUploader(jConf);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
expectedArchivesNoFrags, expectedJobJar);
expectedArchivesNoFrags, basicExpectedJobJar);
}
@Test
@ -344,7 +344,7 @@ public void testPathsWithNoFragAndWildCard() throws IOException {
JobResourceUploader uploader = new StubedUploader(jConf, true);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithWildcard,
expectedArchivesNoFrags, expectedJobJar);
expectedArchivesNoFrags, basicExpectedJobJar);
}
@Test
@ -362,7 +362,7 @@ public void testPathsWithFragsAndWildCard() throws IOException {
JobResourceUploader uploader = new StubedUploader(jConf, true);
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
expectedArchivesWithFrags, expectedJobJar);
expectedArchivesWithFrags, basicExpectedJobJar);
}
@Test
@ -402,44 +402,39 @@ private void testErasureCodingSetting(boolean defaultBehavior)
private void runTmpResourcePathTest(JobResourceUploader uploader,
ResourceConf rConf, JobConf jConf, String[] expectedFiles,
String[] expectedArchives, String expectedJobJar) throws IOException {
rConf.setupJobConf(jConf);
// We use a pre and post job object here because we need the post job object
// to get the new values set during uploadResources, but we need the pre job
// to set the job jar because JobResourceUploader#uploadJobJar uses the Job
// interface not the JobConf. The post job is automatically created in
// validateResourcePaths.
Job jobPre = Job.getInstance(jConf);
uploadResources(uploader, jConf, jobPre);
validateResourcePaths(jConf, expectedFiles, expectedArchives,
expectedJobJar, jobPre);
Job job = rConf.setupJobConf(jConf);
uploadResources(uploader, job);
validateResourcePaths(job, expectedFiles, expectedArchives, expectedJobJar);
}
private void uploadResources(JobResourceUploader uploader, JobConf jConf,
Job job) throws IOException {
Collection<String> files = jConf.getStringCollection("tmpfiles");
Collection<String> libjars = jConf.getStringCollection("tmpjars");
Collection<String> archives = jConf.getStringCollection("tmparchives");
String jobJar = jConf.getJar();
uploader.uploadFiles(jConf, files, new Path("/files-submit-dir"), null,
(short) 3);
uploader.uploadArchives(jConf, archives, new Path("/archives-submit-dir"),
null, (short) 3);
uploader.uploadLibJars(jConf, libjars, new Path("/libjars-submit-dir"),
null, (short) 3);
uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3);
}
private void validateResourcePaths(JobConf jConf, String[] expectedFiles,
String[] expectedArchives, String expectedJobJar, Job preJob)
private void uploadResources(JobResourceUploader uploader, Job job)
throws IOException {
Job j = Job.getInstance(jConf);
validateResourcePathsSub(j.getCacheFiles(), expectedFiles);
validateResourcePathsSub(j.getCacheArchives(), expectedArchives);
Configuration conf = job.getConfiguration();
Collection<String> files = conf.getStringCollection("tmpfiles");
Collection<String> libjars = conf.getStringCollection("tmpjars");
Collection<String> archives = conf.getStringCollection("tmparchives");
Map<URI, FileStatus> statCache = new HashMap<>();
Map<String, Boolean> fileSCUploadPolicies = new HashMap<>();
String jobJar = job.getJar();
uploader.uploadFiles(job, files, new Path("/files-submit-dir"), null,
(short) 3, fileSCUploadPolicies, statCache);
uploader.uploadArchives(job, archives, new Path("/archives-submit-dir"),
null, (short) 3, fileSCUploadPolicies, statCache);
uploader.uploadLibJars(job, libjars, new Path("/libjars-submit-dir"), null,
(short) 3, fileSCUploadPolicies, statCache);
uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3,
statCache);
}
private void validateResourcePaths(Job job, String[] expectedFiles,
String[] expectedArchives, String expectedJobJar)
throws IOException {
validateResourcePathsSub(job.getCacheFiles(), expectedFiles);
validateResourcePathsSub(job.getCacheArchives(), expectedArchives);
// We use a different job object here because the jobjar was set on a
// different job object
Assert.assertEquals("Job jar path is different than expected!",
expectedJobJar, preJob.getJar());
expectedJobJar, job.getJar());
}
private void validateResourcePathsSub(URI[] actualURIs,
@ -645,7 +640,7 @@ ResourceConf build() {
}
}
private void setupJobConf(JobConf conf) {
private Job setupJobConf(JobConf conf) throws IOException {
conf.set("tmpfiles",
buildPathString("tmpFiles", this.numOfTmpFiles, ".txt"));
conf.set("tmpjars",
@ -675,6 +670,7 @@ private void setupJobConf(JobConf conf) {
conf.setLong(MRJobConfig.MAX_RESOURCES_MB, this.maxResourcesMB);
conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB,
this.maxSingleResourceMB);
return new Job(conf);
}
// We always want absolute paths with a scheme in the DistributedCache, so

View File

@ -0,0 +1,365 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.SharedCacheClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Tests the JobResourceUploader class with the shared cache.
*/
public class TestJobResourceUploaderWithSharedCache {
protected static final Log LOG = LogFactory
.getLog(TestJobResourceUploaderWithSharedCache.class);
private static MiniDFSCluster dfs;
private static FileSystem localFs;
private static FileSystem remoteFs;
private static Configuration conf = new Configuration();
private static Path testRootDir;
private static Path remoteStagingDir =
new Path(MRJobConfig.DEFAULT_MR_AM_STAGING_DIR);
private String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
@Before
public void cleanup() throws Exception {
remoteFs.delete(remoteStagingDir, true);
}
@BeforeClass
public static void setup() throws IOException {
// create configuration, dfs, file system
localFs = FileSystem.getLocal(conf);
testRootDir =
new Path("target",
TestJobResourceUploaderWithSharedCache.class.getName() + "-tmpDir")
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
dfs = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
remoteFs = dfs.getFileSystem();
}
@AfterClass
public static void tearDown() {
try {
if (localFs != null) {
localFs.close();
}
if (remoteFs != null) {
remoteFs.close();
}
if (dfs != null) {
dfs.shutdown();
}
} catch (IOException ioe) {
LOG.info("IO exception in closing file system");
ioe.printStackTrace();
}
}
private class MyFileUploader extends JobResourceUploader {
// The mocked SharedCacheClient that will be fed into the FileUploader
private SharedCacheClient mockscClient = mock(SharedCacheClient.class);
// A real client for checksum calculation
private SharedCacheClient scClient = SharedCacheClient
.createSharedCacheClient();
MyFileUploader(FileSystem submitFs, Configuration conf)
throws IOException {
super(submitFs, false);
// Initialize the real client, but don't start it. We don't need or want
// to create an actual proxy because we only use this for mocking out the
// getFileChecksum method.
scClient.init(conf);
when(mockscClient.getFileChecksum(any(Path.class))).thenAnswer(
new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
Path file = (Path) invocation.getArguments()[0];
// Use the real scClient to generate the checksum. We use an
// answer/mock combination to avoid having to spy on a real
// SharedCacheClient object.
return scClient.getFileChecksum(file);
}
});
}
// This method is to prime the mock client with the correct checksum, so it
// looks like a given resource is present in the shared cache.
public void mockFileInSharedCache(Path localFile, URL remoteFile)
throws YarnException, IOException {
// when the resource is referenced, simply return the remote path to the
// caller
when(mockscClient.use(any(ApplicationId.class),
eq(scClient.getFileChecksum(localFile)))).thenReturn(remoteFile);
}
@Override
protected SharedCacheClient createSharedCacheClient(Configuration c) {
// Feed the mocked SharedCacheClient into the FileUploader logic
return mockscClient;
}
}
@Test
public void testSharedCacheDisabled() throws Exception {
JobConf jobConf = createJobConf();
Job job = new Job(jobConf);
job.setJobID(new JobID("567789", 1));
// shared cache is disabled by default
uploadFilesToRemoteFS(job, jobConf, 0, 0, 0, false);
}
@Test
public void testSharedCacheEnabled() throws Exception {
JobConf jobConf = createJobConf();
jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled");
Job job = new Job(jobConf);
job.setJobID(new JobID("567789", 1));
// shared cache is enabled for every file type
// the # of times SharedCacheClient.use is called should ==
// total # of files/libjars/archive/jobjar
uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, false);
}
@Test
public void testSharedCacheEnabledWithJobJarInSharedCache()
throws Exception {
JobConf jobConf = createJobConf();
jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled");
Job job = new Job(jobConf);
job.setJobID(new JobID("567789", 1));
// shared cache is enabled for every file type
// the # of times SharedCacheClient.use is called should ==
// total # of files/libjars/archive/jobjar
uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, true);
}
@Test
public void testSharedCacheArchivesAndLibjarsEnabled() throws Exception {
JobConf jobConf = createJobConf();
jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "archives,libjars");
Job job = new Job(jobConf);
job.setJobID(new JobID("567789", 1));
// shared cache is enabled for archives and libjars type
// the # of times SharedCacheClient.use is called should ==
// total # of libjars and archives
uploadFilesToRemoteFS(job, jobConf, 5, 1, 2, true);
}
private JobConf createJobConf() {
JobConf jobConf = new JobConf();
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
jobConf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
jobConf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, remoteFs.getUri()
.toString());
return jobConf;
}
private Path copyToRemote(Path jar) throws IOException {
Path remoteFile = new Path("/tmp", jar.getName());
remoteFs.copyFromLocalFile(jar, remoteFile);
return remoteFile;
}
private void makeJarAvailableInSharedCache(Path jar,
MyFileUploader fileUploader) throws YarnException, IOException {
// copy file to remote file system
Path remoteFile = copyToRemote(jar);
// prime mocking so that it looks like this file is in the shared cache
fileUploader.mockFileInSharedCache(jar, URL.fromPath(remoteFile));
}
private void uploadFilesToRemoteFS(Job job, JobConf jobConf,
int useCallCountExpected,
int numOfFilesShouldBeUploadedToSharedCacheExpected,
int numOfArchivesShouldBeUploadedToSharedCacheExpected,
boolean jobJarInSharedCacheBeforeUpload) throws Exception {
MyFileUploader fileUploader = new MyFileUploader(remoteFs, jobConf);
SharedCacheConfig sharedCacheConfig = new SharedCacheConfig();
sharedCacheConfig.init(jobConf);
Path firstFile = createTempFile("first-input-file", "x");
Path secondFile = createTempFile("second-input-file", "xx");
// Add files to job conf via distributed cache API as well as command line
boolean fileAdded = Job.addFileToSharedCache(firstFile.toUri(), jobConf);
assertEquals(sharedCacheConfig.isSharedCacheFilesEnabled(), fileAdded);
if (!fileAdded) {
Path remoteFile = copyToRemote(firstFile);
job.addCacheFile(remoteFile.toUri());
}
jobConf.set("tmpfiles", secondFile.toString());
// Create jars with a single file inside them.
Path firstJar = makeJar(new Path(testRootDir, "distributed.first.jar"), 1);
Path secondJar =
makeJar(new Path(testRootDir, "distributed.second.jar"), 2);
// Verify duplicated contents can be handled properly.
Path thirdJar = new Path(testRootDir, "distributed.third.jar");
localFs.copyFromLocalFile(secondJar, thirdJar);
// make secondJar cache available
makeJarAvailableInSharedCache(secondJar, fileUploader);
// Add libjars to job conf via distributed cache API as well as command
// line
boolean libjarAdded =
Job.addFileToSharedCacheAndClasspath(firstJar.toUri(), jobConf);
assertEquals(sharedCacheConfig.isSharedCacheLibjarsEnabled(), libjarAdded);
if (!libjarAdded) {
Path remoteJar = copyToRemote(firstJar);
job.addFileToClassPath(remoteJar);
}
jobConf.set("tmpjars", secondJar.toString() + "," + thirdJar.toString());
Path firstArchive = makeArchive("first-archive.zip", "first-file");
Path secondArchive = makeArchive("second-archive.zip", "second-file");
// Add archives to job conf via distributed cache API as well as command
// line
boolean archiveAdded =
Job.addArchiveToSharedCache(firstArchive.toUri(), jobConf);
assertEquals(sharedCacheConfig.isSharedCacheArchivesEnabled(),
archiveAdded);
if (!archiveAdded) {
Path remoteArchive = copyToRemote(firstArchive);
job.addCacheArchive(remoteArchive.toUri());
}
jobConf.set("tmparchives", secondArchive.toString());
// Add job jar to job conf
Path jobJar = makeJar(new Path(testRootDir, "test-job.jar"), 4);
if (jobJarInSharedCacheBeforeUpload) {
makeJarAvailableInSharedCache(jobJar, fileUploader);
}
jobConf.setJar(jobJar.toString());
fileUploader.uploadResources(job, remoteStagingDir);
verify(fileUploader.mockscClient, times(useCallCountExpected)).use(
any(ApplicationId.class), anyString());
int numOfFilesShouldBeUploadedToSharedCache = 0;
Map<String, Boolean> filesSharedCacheUploadPolicies =
Job.getFileSharedCacheUploadPolicies(jobConf);
for (Boolean policy : filesSharedCacheUploadPolicies.values()) {
if (policy) {
numOfFilesShouldBeUploadedToSharedCache++;
}
}
assertEquals(numOfFilesShouldBeUploadedToSharedCacheExpected,
numOfFilesShouldBeUploadedToSharedCache);
int numOfArchivesShouldBeUploadedToSharedCache = 0;
Map<String, Boolean> archivesSharedCacheUploadPolicies =
Job.getArchiveSharedCacheUploadPolicies(jobConf);
for (Boolean policy : archivesSharedCacheUploadPolicies.values()) {
if (policy) {
numOfArchivesShouldBeUploadedToSharedCache++;
}
}
assertEquals(numOfArchivesShouldBeUploadedToSharedCacheExpected,
numOfArchivesShouldBeUploadedToSharedCache);
}
private Path createTempFile(String filename, String contents)
throws IOException {
Path path = new Path(testRootDir, filename);
FSDataOutputStream os = localFs.create(path);
os.writeBytes(contents);
os.close();
localFs.setPermission(path, new FsPermission("700"));
return path;
}
private Path makeJar(Path p, int index) throws FileNotFoundException,
IOException {
FileOutputStream fos =
new FileOutputStream(new File(p.toUri().getPath()));
JarOutputStream jos = new JarOutputStream(fos);
ZipEntry ze = new ZipEntry("distributed.jar.inside" + index);
jos.putNextEntry(ze);
jos.write(("inside the jar!" + index).getBytes());
jos.closeEntry();
jos.close();
localFs.setPermission(p, new FsPermission("700"));
return p;
}
private Path makeArchive(String archiveFile, String filename)
throws Exception {
Path archive = new Path(testRootDir, archiveFile);
Path file = new Path(testRootDir, filename);
DataOutputStream out = localFs.create(archive);
ZipOutputStream zos = new ZipOutputStream(out);
ZipEntry ze = new ZipEntry(file.toString());
zos.putNextEntry(ze);
zos.write(input.getBytes("UTF-8"));
zos.closeEntry();
zos.close();
return archive;
}
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@ -338,16 +340,41 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
}
}
private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type)
throws IOException {
private LocalResource createApplicationResource(FileContext fs, Path p,
LocalResourceType type) throws IOException {
return createApplicationResource(fs, p, null, type,
LocalResourceVisibility.APPLICATION, false);
}
private LocalResource createApplicationResource(FileContext fs, Path p,
String fileSymlink, LocalResourceType type, LocalResourceVisibility viz,
Boolean uploadToSharedCache) throws IOException {
LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
FileStatus rsrcStat = fs.getFileStatus(p);
rsrc.setResource(URL.fromPath(fs
.getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
// We need to be careful when converting from path to URL to add a fragment
// so that the symlink name when localized will be correct.
Path qualifiedPath =
fs.getDefaultFileSystem().resolvePath(rsrcStat.getPath());
URI uriWithFragment = null;
boolean useFragment = fileSymlink != null && !fileSymlink.equals("");
try {
if (useFragment) {
uriWithFragment = new URI(qualifiedPath.toUri() + "#" + fileSymlink);
} else {
uriWithFragment = qualifiedPath.toUri();
}
} catch (URISyntaxException e) {
throw new IOException(
"Error parsing local resource path."
+ " Path was not able to be converted to a URI: " + qualifiedPath,
e);
}
rsrc.setResource(URL.fromURI(uriWithFragment));
rsrc.setSize(rsrcStat.getLen());
rsrc.setTimestamp(rsrcStat.getModificationTime());
rsrc.setType(type);
rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
rsrc.setVisibility(viz);
rsrc.setShouldBeUploadedToSharedCache(uploadToSharedCache);
return rsrc;
}
@ -368,10 +395,21 @@ private Map<String, LocalResource> setupLocalResources(Configuration jobConf,
jobConfPath, LocalResourceType.FILE));
if (jobConf.get(MRJobConfig.JAR) != null) {
Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
// We hard code the job.jar symlink because mapreduce code expects the
// job.jar to be named that way.
FileContext fccc =
FileContext.getFileContext(jobJarPath.toUri(), jobConf);
LocalResourceVisibility jobJarViz =
jobConf.getBoolean(MRJobConfig.JOBJAR_VISIBILITY,
MRJobConfig.JOBJAR_VISIBILITY_DEFAULT)
? LocalResourceVisibility.PUBLIC
: LocalResourceVisibility.APPLICATION;
LocalResource rc = createApplicationResource(
FileContext.getFileContext(jobJarPath.toUri(), jobConf),
jobJarPath,
LocalResourceType.PATTERN);
FileContext.getFileContext(jobJarPath.toUri(), jobConf), jobJarPath,
MRJobConfig.JOB_JAR, LocalResourceType.PATTERN, jobJarViz,
jobConf.getBoolean(
MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY,
MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY_DEFAULT));
String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
rc.setPattern(pattern);

View File

@ -132,6 +132,58 @@ public void testJobMaxMapConfig() throws Exception {
}
}
/**
* Test local job submission with a file option.
*
* @throws IOException
*/
@Test
public void testLocalJobFilesOption() throws IOException {
Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
conf.set(MRConfig.FRAMEWORK_NAME, "local");
final String[] args =
{"-jt", "local", "-files", jarPath.toString(), "-m", "1", "-r", "1",
"-mt", "1", "-rt", "1"};
int res = -1;
try {
res = ToolRunner.run(conf, new SleepJob(), args);
} catch (Exception e) {
System.out.println("Job failed with " + e.getLocalizedMessage());
e.printStackTrace(System.out);
fail("Job failed");
}
assertEquals("dist job res is not 0:", 0, res);
}
/**
* Test local job submission with an archive option.
*
* @throws IOException
*/
@Test
public void testLocalJobArchivesOption() throws IOException {
Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
conf.set(MRConfig.FRAMEWORK_NAME, "local");
final String[] args =
{"-jt", "local", "-archives", jarPath.toString(), "-m", "1", "-r",
"1", "-mt", "1", "-rt", "1"};
int res = -1;
try {
res = ToolRunner.run(conf, new SleepJob(), args);
} catch (Exception e) {
System.out.println("Job failed with " + e.getLocalizedMessage());
e.printStackTrace(System.out);
fail("Job failed");
}
assertEquals("dist job res is not 0:", 0, res);
}
private Path makeJar(Path p) throws IOException {
FileOutputStream fos = new FileOutputStream(new File(p.toString()));
JarOutputStream jos = new JarOutputStream(fos);

View File

@ -1298,6 +1298,65 @@ private void createAndAddJarToJar(JarOutputStream jos, File jarFile)
jarFile.delete();
}
@Test
public void testSharedCache() throws Exception {
Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString());
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
Job job = Job.getInstance(mrCluster.getConfig());
Configuration jobConf = job.getConfiguration();
jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled");
Path inputFile = createTempFile("input-file", "x");
// Create jars with a single file inside them.
Path second = makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third = makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth = makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
// Add libjars to job conf
jobConf.set("tmpjars", second.toString() + "," + third.toString() + ","
+ fourth.toString());
// Because the job jar is a "dummy" jar, we need to include the jar with
// DistributedCacheChecker or it won't be able to find it
Path distributedCacheCheckerJar =
new Path(JarFinder.getJar(SharedCacheChecker.class));
job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
localFs.getUri(), distributedCacheCheckerJar.getParent()));
job.setMapperClass(SharedCacheChecker.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, inputFile);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
Assert.assertTrue(job.waitForCompletion(true));
Assert.assertTrue("Tracking URL was " + trackingUrl
+ " but didn't Match Job ID " + jobId,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
}
/**
* An identity mapper for testing the shared cache.
*/
public static class SharedCacheChecker extends
Mapper<LongWritable, Text, NullWritable, NullWritable> {
@Override
public void setup(Context context) throws IOException {
}
}
public static class ConfVerificationMapper extends SleepMapper {
@Override
protected void setup(Context context)

View File

@ -112,6 +112,7 @@
<item name="Encrypted Shuffle" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html"/>
<item name="Pluggable Shuffle/Sort" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/PluggableShuffleAndPluggableSort.html"/>
<item name="Distributed Cache Deploy" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/DistributedCacheDeploy.html"/>
<item name="Support for YARN Shared Cache" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/SharedCacheSupport.html"/>
</menu>
<menu name="MapReduce REST APIs" inherit="top">