YARN-467. Modify public distributed cache to localize files such that no local directory hits unix file count limits and thus prevent job failures. Contributed by Omkar Vinit Joshi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1463823 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-03 05:00:28 +00:00
parent 75ef1b4845
commit e67e3ff05d
11 changed files with 605 additions and 29 deletions

View File

@ -178,7 +178,11 @@ Release 2.0.5-beta - UNRELEASED
YARN-382. SchedulerUtils improve way normalizeRequest sets the resource YARN-382. SchedulerUtils improve way normalizeRequest sets the resource
capabilities. (Zhijie Shen via bikas) capabilities. (Zhijie Shen via bikas)
YARN-467. Modify public distributed cache to localize files such that no
local directory hits unix file count limits and thus prevent job failures.
(Omkar Vinit Joshi via vinodkv)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -256,4 +256,18 @@
<Bug pattern="IS2_INCONSISTENT_SYNC" /> <Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match> </Match>
<!-- Null pointer exception needs to be ignored here as this is never going to occur. -->
<Match>
<Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourcesTrackerImpl" />
<Method name="decrementFileCountForLocalCacheDirectory" />
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE" />
</Match>
<!-- Null pointer exception needs to be ignored here as this is never going to occur. -->
<Match>
<Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourcesTrackerImpl" />
<Method name="getPathForLocalization" />
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE" />
</Match>
</FindBugsFilter> </FindBugsFilter>

View File

@ -340,7 +340,15 @@ public class YarnConfiguration extends Configuration {
/**List of directories to store localized files in.*/ /**List of directories to store localized files in.*/
public static final String NM_LOCAL_DIRS = NM_PREFIX + "local-dirs"; public static final String NM_LOCAL_DIRS = NM_PREFIX + "local-dirs";
public static final String DEFAULT_NM_LOCAL_DIRS = "/tmp/nm-local-dir"; public static final String DEFAULT_NM_LOCAL_DIRS = "/tmp/nm-local-dir";
/**
* Number of files in each localized directories
* Avoid tuning this too low.
*/
public static final String NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY =
NM_PREFIX + "local-cache.max-files-per-directory";
public static final int DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY = 8192;
/** Address where the localizer IPC is.*/ /** Address where the localizer IPC is.*/
public static final String NM_LOCALIZER_ADDRESS = public static final String NM_LOCALIZER_ADDRESS =
NM_PREFIX + "localizer.address"; NM_PREFIX + "localizer.address";

View File

@ -359,6 +359,25 @@
<value>${hadoop.tmp.dir}/nm-local-dir</value> <value>${hadoop.tmp.dir}/nm-local-dir</value>
</property> </property>
<property>
<description>It limits the maximum number of files which will be localized
in a single local directory. If the limit is reached then sub-directories
will be created and new files will be localized in them. If it is set to
a value less than or equal to 36 [which are sub-directories (0-9 and then
a-z)] then NodeManager will fail to start. For example; [for public
cache] if this is configured with a value of 40 ( 4 files +
36 sub-directories) and the local-dir is "/tmp/local-dir1" then it will
allow 4 files to be created directly inside "/tmp/local-dir1/filecache".
For files that are localized further it will create a sub-directory "0"
inside "/tmp/local-dir1/filecache" and will localize files inside it
until it becomes full. If a file is removed from a sub-directory that
is marked full, then that sub-directory will be used back again to
localize files.
</description>
<name>yarn.nodemanager.local-cache.max-files-per-directory</name>
<value>8192</value>
</property>
<property> <property>
<description>Address where the localizer IPC is.</description> <description>Address where the localizer IPC is.</description>
<name>yarn.nodemanager.localizer.address</name> <name>yarn.nodemanager.localizer.address</name>

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* {@link LocalCacheDirectoryManager} is used for managing hierarchical
* directories for local cache. It will allow to restrict the number of files in
* a directory to
* {@link YarnConfiguration#NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY} which
* includes 36 sub-directories (named from 0 to 9 and a to z). Root directory is
* represented by an empty string. It internally maintains a vacant directory
* queue. As soon as the file count for the directory reaches its limit; new
* files will not be created in it until at least one file is deleted from it.
* New sub directories are not created unless a
* {@link LocalCacheDirectoryManager#getRelativePathForLocalization()} request
* is made and nonFullDirectories are empty.
*
* Note : this structure only returns relative localization path but doesn't
* create one on disk.
*/
public class LocalCacheDirectoryManager {
private final int perDirectoryFileLimit;
// total 36 = a to z plus 0 to 9
public static final int DIRECTORIES_PER_LEVEL = 36;
private Queue<Directory> nonFullDirectories;
private HashMap<String, Directory> knownDirectories;
private int totalSubDirectories;
public LocalCacheDirectoryManager(Configuration conf) {
totalSubDirectories = 0;
Directory rootDir = new Directory(totalSubDirectories);
nonFullDirectories = new LinkedList<Directory>();
knownDirectories = new HashMap<String, Directory>();
knownDirectories.put("", rootDir);
nonFullDirectories.add(rootDir);
this.perDirectoryFileLimit =
conf.getInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY) - 36;
}
/**
* This method will return relative path from the first available vacant
* directory.
*
* @return {@link String} relative path for localization
*/
public synchronized String getRelativePathForLocalization() {
if (nonFullDirectories.isEmpty()) {
totalSubDirectories++;
Directory newDir = new Directory(totalSubDirectories);
nonFullDirectories.add(newDir);
knownDirectories.put(newDir.getRelativePath(), newDir);
}
Directory subDir = nonFullDirectories.peek();
if (subDir.incrementAndGetCount() >= perDirectoryFileLimit) {
nonFullDirectories.remove();
}
return subDir.getRelativePath();
}
/**
* This method will reduce the file count for the directory represented by
* path. The root directory of this Local cache directory manager is
* represented by an empty string.
*/
public synchronized void decrementFileCountForPath(String relPath) {
relPath = relPath == null ? "" : relPath.trim();
Directory subDir = knownDirectories.get(relPath);
int oldCount = subDir.getCount();
if (subDir.decrementAndGetCount() < perDirectoryFileLimit
&& oldCount >= perDirectoryFileLimit) {
nonFullDirectories.add(subDir);
}
}
/*
* It limits the number of files and sub directories in the directory to the
* limit LocalCacheDirectoryManager#perDirectoryFileLimit.
*/
static class Directory {
private final String relativePath;
private int fileCount;
public Directory(int directoryNo) {
fileCount = 0;
if (directoryNo == 0) {
relativePath = "";
} else {
String tPath = Integer.toString(directoryNo - 1, DIRECTORIES_PER_LEVEL);
StringBuffer sb = new StringBuffer();
if (tPath.length() == 1) {
sb.append(tPath.charAt(0));
} else {
// this is done to make sure we also reuse 0th sub directory
sb.append(Integer.toString(
Integer.parseInt(tPath.substring(0, 1), DIRECTORIES_PER_LEVEL) - 1,
DIRECTORIES_PER_LEVEL));
}
for (int i = 1; i < tPath.length(); i++) {
sb.append(Path.SEPARATOR).append(tPath.charAt(i));
}
relativePath = sb.toString();
}
}
public int incrementAndGetCount() {
return ++fileCount;
}
public int decrementAndGetCount() {
return --fileCount;
}
public String getRelativePath() {
return relativePath;
}
public int getCount() {
return fileCount;
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@ -35,6 +36,11 @@ interface LocalResourcesTracker
boolean remove(LocalizedResource req, DeletionService delService); boolean remove(LocalizedResource req, DeletionService delService);
Path getPathForLocalization(LocalResourceRequest req, Path localDirPath);
String getUser(); String getUser();
// TODO: Remove this in favour of EventHandler.handle
void localizationCompleted(LocalResourceRequest req, boolean success);
} }

View File

@ -26,12 +26,13 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
/** /**
* A collection of {@link LocalizedResource}s all of same * A collection of {@link LocalizedResource}s all of same
@ -49,17 +50,43 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
private final String user; private final String user;
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc; private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
private Configuration conf;
/*
* This flag controls whether this resource tracker uses hierarchical
* directories or not. For PRIVATE and PUBLIC resource trackers it
* will be set whereas for APPLICATION resource tracker it would
* be false.
*/
private final boolean useLocalCacheDirectoryManager;
private ConcurrentHashMap<Path, LocalCacheDirectoryManager> directoryManagers;
/*
* It is used to keep track of resource into hierarchical directory
* while it is getting downloaded. It is useful for reference counting
* in case resource localization fails.
*/
private ConcurrentHashMap<LocalResourceRequest, Path>
inProgressLocalResourcesMap;
public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher) { public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
boolean useLocalCacheDirectoryManager, Configuration conf) {
this(user, dispatcher, this(user, dispatcher,
new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>()); new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
useLocalCacheDirectoryManager, conf);
} }
LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc) { ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
boolean useLocalCacheDirectoryManager, Configuration conf) {
this.user = user; this.user = user;
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
this.localrsrc = localrsrc; this.localrsrc = localrsrc;
this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
if ( this.useLocalCacheDirectoryManager) {
directoryManagers = new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
inProgressLocalResourcesMap =
new ConcurrentHashMap<LocalResourceRequest, Path>();
}
this.conf = conf;
} }
@Override @Override
@ -73,6 +100,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
LOG.info("Resource " + rsrc.getLocalPath() LOG.info("Resource " + rsrc.getLocalPath()
+ " is missing, localizing it again"); + " is missing, localizing it again");
localrsrc.remove(req); localrsrc.remove(req);
decrementFileCountForLocalCacheDirectory(req, rsrc);
rsrc = null; rsrc = null;
} }
if (null == rsrc) { if (null == rsrc) {
@ -90,7 +118,52 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
rsrc.handle(event); rsrc.handle(event);
} }
/** /*
* Update the file-count statistics for a local cache-directory.
* This will retrieve the localized path for the resource from
* 1) inProgressRsrcMap if the resource was under localization and it
* failed.
* 2) LocalizedResource if the resource is already localized.
* From this path it will identify the local directory under which the
* resource was localized. Then rest of the path will be used to decrement
* file count for the HierarchicalSubDirectory pointing to this relative
* path.
*/
private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req,
LocalizedResource rsrc) {
if ( useLocalCacheDirectoryManager) {
Path rsrcPath = null;
if (inProgressLocalResourcesMap.containsKey(req)) {
// This happens when localization of a resource fails.
rsrcPath = inProgressLocalResourcesMap.remove(req);
} else if (rsrc != null && rsrc.getLocalPath() != null) {
rsrcPath = rsrc.getLocalPath().getParent().getParent();
}
if (rsrcPath != null) {
Path parentPath = new Path(rsrcPath.toUri().getRawPath());
while (!directoryManagers.containsKey(parentPath)) {
parentPath = parentPath.getParent();
if ( parentPath == null) {
return;
}
}
if ( parentPath != null) {
String parentDir = parentPath.toUri().getRawPath().toString();
LocalCacheDirectoryManager dir = directoryManagers.get(parentPath);
String rsrcDir = rsrcPath.toUri().getRawPath();
if (rsrcDir.equals(parentDir)) {
dir.decrementFileCountForPath("");
} else {
dir.decrementFileCountForPath(
rsrcDir.substring(
parentDir.length() + 1));
}
}
}
}
}
/**
* This module checks if the resource which was localized is already present * This module checks if the resource which was localized is already present
* or not * or not
* *
@ -100,7 +173,8 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
public boolean isResourcePresent(LocalizedResource rsrc) { public boolean isResourcePresent(LocalizedResource rsrc) {
boolean ret = true; boolean ret = true;
if (rsrc.getState() == ResourceState.LOCALIZED) { if (rsrc.getState() == ResourceState.LOCALIZED) {
File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString()); File file = new File(rsrc.getLocalPath().toUri().getRawPath().
toString());
if (!file.exists()) { if (!file.exists()) {
ret = false; ret = false;
} }
@ -133,11 +207,11 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
if (ResourceState.LOCALIZED.equals(rsrc.getState())) { if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath())); delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
} }
decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
return true; return true;
} }
} }
/** /**
* Returns the path up to the random directory component. * Returns the path up to the random directory component.
*/ */
@ -163,4 +237,50 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
public Iterator<LocalizedResource> iterator() { public Iterator<LocalizedResource> iterator() {
return localrsrc.values().iterator(); return localrsrc.values().iterator();
} }
}
/**
* @return {@link Path} absolute path for localization which includes local
* directory path and the relative hierarchical path (if use local
* cache directory manager is enabled)
*
* @param {@link LocalResourceRequest} Resource localization request to
* localize the resource.
* @param {@link Path} local directory path
*/
@Override
public Path
getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
if (useLocalCacheDirectoryManager && localDirPath != null) {
if (!directoryManagers.containsKey(localDirPath)) {
directoryManagers.putIfAbsent(localDirPath,
new LocalCacheDirectoryManager(conf));
}
LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
Path rPath = localDirPath;
String hierarchicalPath = dir.getRelativePathForLocalization();
// For most of the scenarios we will get root path only which
// is an empty string
if (!hierarchicalPath.isEmpty()) {
rPath = new Path(localDirPath, hierarchicalPath);
}
inProgressLocalResourcesMap.put(req, rPath);
return rPath;
} else {
return localDirPath;
}
}
@Override
public void localizationCompleted(LocalResourceRequest req,
boolean success) {
if (useLocalCacheDirectoryManager) {
if (!success) {
decrementFileCountForLocalCacheDirectory(req, null);
} else {
inProgressLocalResourcesMap.remove(req);
}
}
}
}

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -130,7 +131,7 @@ public class ResourceLocalizationService extends CompositeService
private RecordFactory recordFactory; private RecordFactory recordFactory;
private final ScheduledExecutorService cacheCleanup; private final ScheduledExecutorService cacheCleanup;
private final LocalResourcesTracker publicRsrc; private LocalResourcesTracker publicRsrc;
private LocalDirsHandlerService dirsHandler; private LocalDirsHandlerService dirsHandler;
@ -158,7 +159,6 @@ public class ResourceLocalizationService extends CompositeService
this.delService = delService; this.delService = delService;
this.dirsHandler = dirsHandler; this.dirsHandler = dirsHandler;
this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
this.cacheCleanup = new ScheduledThreadPoolExecutor(1, this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder() new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup") .setNameFormat("ResourceLocalizationService Cache Cleanup")
@ -173,8 +173,26 @@ public class ResourceLocalizationService extends CompositeService
} }
} }
private void validateConf(Configuration conf) {
int perDirFileLimit =
conf.getInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY);
if (perDirFileLimit <= 36) {
LOG.error(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+ " parameter is configured with very low value.");
throw new YarnException(
YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+ " parameter is configured with a value less than 37.");
} else {
LOG.info("per directory file limit = " + perDirFileLimit);
}
}
@Override @Override
public void init(Configuration conf) { public void init(Configuration conf) {
this.validateConf(conf);
this.publicRsrc =
new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try { try {
@ -212,6 +230,7 @@ public class ResourceLocalizationService extends CompositeService
YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT); YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
localizerTracker = createLocalizerTracker(conf); localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker); addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker); dispatcher.register(LocalizerEventType.class, localizerTracker);
@ -306,15 +325,17 @@ public class ResourceLocalizationService extends CompositeService
private void handleInitApplicationResources(Application app) { private void handleInitApplicationResources(Application app) {
// 0) Create application tracking structs // 0) Create application tracking structs
String userName = app.getUser(); String userName = app.getUser();
privateRsrc.putIfAbsent(userName, privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
new LocalResourcesTrackerImpl(userName, dispatcher)); dispatcher, false, super.getConfig()));
if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()), if (null != appRsrc.putIfAbsent(
new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) { ConverterUtils.toString(app.getAppId()),
new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
.getConfig()))) {
LOG.warn("Initializing application " + app + " already present"); LOG.warn("Initializing application " + app + " already present");
assert false; // TODO: FIXME assert doesn't help assert false; // TODO: FIXME assert doesn't help
// ^ The condition is benign. Tests should fail and it // ^ The condition is benign. Tests should fail and it
// should appear in logs, but it's an internal error // should appear in logs, but it's an internal error
// that should have no effect on applications // that should have no effect on applications
} }
// 1) Signal container init // 1) Signal container init
// //
@ -620,6 +641,13 @@ public class ResourceLocalizationService extends CompositeService
Path publicDirDestPath = dirsHandler.getLocalPathForWrite( Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
"." + Path.SEPARATOR + ContainerLocalizer.FILECACHE, "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true); ContainerLocalizer.getEstimatedSize(resource), true);
Path hierarchicalPath =
publicRsrc.getPathForLocalization(key, publicDirDestPath);
if (!hierarchicalPath.equals(publicDirDestPath)) {
publicDirDestPath = hierarchicalPath;
DiskChecker.checkDir(
new File(publicDirDestPath.toUri().getPath()));
}
pending.put(queue.submit(new FSDownload( pending.put(queue.submit(new FSDownload(
lfs, null, conf, publicDirDestPath, resource, new Random())), lfs, null, conf, publicDirDestPath, resource, new Random())),
request); request);
@ -654,19 +682,21 @@ public class ResourceLocalizationService extends CompositeService
assoc.getResource().handle( assoc.getResource().handle(
new ResourceLocalizedEvent(key, new ResourceLocalizedEvent(key,
local, FileUtil.getDU(new File(local.toUri())))); local, FileUtil.getDU(new File(local.toUri()))));
publicRsrc.localizationCompleted(key, true);
synchronized (attempts) { synchronized (attempts) {
attempts.remove(key); attempts.remove(key);
} }
} catch (ExecutionException e) { } catch (ExecutionException e) {
LOG.info("Failed to download rsrc " + assoc.getResource(), LOG.info("Failed to download rsrc " + assoc.getResource(),
e.getCause()); e.getCause());
LocalResourceRequest req = assoc.getResource().getRequest();
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ContainerResourceFailedEvent( new ContainerResourceFailedEvent(
assoc.getContext().getContainerId(), assoc.getContext().getContainerId(),
assoc.getResource().getRequest(), e.getCause())); req, e.getCause()));
publicRsrc.localizationCompleted(req, false);
List<LocalizerResourceRequestEvent> reqs; List<LocalizerResourceRequestEvent> reqs;
synchronized (attempts) { synchronized (attempts) {
LocalResourceRequest req = assoc.getResource().getRequest();
reqs = attempts.get(req); reqs = attempts.get(req);
if (null == reqs) { if (null == reqs) {
LOG.error("Missing pending list for " + req); LOG.error("Missing pending list for " + req);
@ -1003,4 +1033,4 @@ public class ResourceLocalizationService extends CompositeService
del.delete(null, dirPath, new Path[] {}); del.delete(null, dirPath, new Path[] {});
} }
} }

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import junit.framework.Assert;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Test;
public class TestLocalCacheDirectoryManager {
@Test(timeout = 10000)
public void testHierarchicalSubDirectoryCreation() {
// setting per directory file limit to 1.
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
LocalCacheDirectoryManager hDir = new LocalCacheDirectoryManager(conf);
// Test root directory path = ""
Assert.assertTrue(hDir.getRelativePathForLocalization().isEmpty());
// Testing path generation from "0" to "0/0/z/z"
for (int i = 1; i <= 37 * 36 * 36; i++) {
StringBuffer sb = new StringBuffer();
String num = Integer.toString(i - 1, 36);
if (num.length() == 1) {
sb.append(num.charAt(0));
} else {
sb.append(Integer.toString(
Integer.parseInt(num.substring(0, 1), 36) - 1, 36));
}
for (int j = 1; j < num.length(); j++) {
sb.append(Path.SEPARATOR).append(num.charAt(j));
}
Assert.assertEquals(sb.toString(), hDir.getRelativePathForLocalization());
}
String testPath1 = "4";
String testPath2 = "2";
/*
* Making sure directory "4" and "2" becomes non-full so that they are
* reused for future getRelativePathForLocalization() calls in the order
* they are freed.
*/
hDir.decrementFileCountForPath(testPath1);
hDir.decrementFileCountForPath(testPath2);
// After below call directory "4" should become full.
Assert.assertEquals(testPath1, hDir.getRelativePathForLocalization());
Assert.assertEquals(testPath2, hDir.getRelativePathForLocalization());
}
@Test(timeout = 10000)
public void testMinimumPerDirectoryFileLimit() {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1");
Exception e = null;
ResourceLocalizationService service =
new ResourceLocalizationService(null, null, null, null);
try {
service.init(conf);
} catch (Exception e1) {
e = e1;
}
Assert.assertNotNull(e);
Assert.assertEquals(YarnException.class, e.getClass());
Assert.assertEquals(e.getMessage(),
YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+ " parameter is configured with a value less than 37.");
}
@Test(timeout = 1000)
public void testDirectoryStateChangeFromFullToNonFull() {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "40");
LocalCacheDirectoryManager dir = new LocalCacheDirectoryManager(conf);
// checking for first four paths
String rootPath = "";
String firstSubDir = "0";
for (int i = 0; i < 4; i++) {
Assert.assertEquals(rootPath, dir.getRelativePathForLocalization());
}
// Releasing two files from the root directory.
dir.decrementFileCountForPath(rootPath);
dir.decrementFileCountForPath(rootPath);
// Space for two files should be available in root directory.
Assert.assertEquals(rootPath, dir.getRelativePathForLocalization());
Assert.assertEquals(rootPath, dir.getRelativePathForLocalization());
// As no space is now available in root directory so it should be from
// first sub directory
Assert.assertEquals(firstSubDir, dir.getRelativePathForLocalization());
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -50,17 +51,17 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test; import org.junit.Test;
import org.mortbay.log.Log;
public class TestLocalResourcesTrackerImpl { public class TestLocalResourcesTrackerImpl {
@Test @Test(timeout=10000)
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void test() { public void test() {
String user = "testuser"; String user = "testuser";
DrainDispatcher dispatcher = null; DrainDispatcher dispatcher = null;
try { try {
dispatcher = createDispatcher(new Configuration()); Configuration conf = new Configuration();
dispatcher = createDispatcher(conf);
EventHandler<LocalizerEvent> localizerEventHandler = EventHandler<LocalizerEvent> localizerEventHandler =
mock(EventHandler.class); mock(EventHandler.class);
EventHandler<LocalizerEvent> containerEventHandler = EventHandler<LocalizerEvent> containerEventHandler =
@ -86,7 +87,8 @@ public class TestLocalResourcesTrackerImpl {
localrsrc.put(req1, lr1); localrsrc.put(req1, lr1);
localrsrc.put(req2, lr2); localrsrc.put(req2, lr2);
LocalResourcesTracker tracker = LocalResourcesTracker tracker =
new LocalResourcesTrackerImpl(user, dispatcher, localrsrc); new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, false,
conf);
ResourceEvent req11Event = ResourceEvent req11Event =
new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1); new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
@ -152,13 +154,14 @@ public class TestLocalResourcesTrackerImpl {
} }
} }
@Test @Test(timeout=10000)
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testConsistency() { public void testConsistency() {
String user = "testuser"; String user = "testuser";
DrainDispatcher dispatcher = null; DrainDispatcher dispatcher = null;
try { try {
dispatcher = createDispatcher(new Configuration()); Configuration conf = new Configuration();
dispatcher = createDispatcher(conf);
EventHandler<LocalizerEvent> localizerEventHandler = mock(EventHandler.class); EventHandler<LocalizerEvent> localizerEventHandler = mock(EventHandler.class);
EventHandler<LocalizerEvent> containerEventHandler = mock(EventHandler.class); EventHandler<LocalizerEvent> containerEventHandler = mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerEventHandler); dispatcher.register(LocalizerEventType.class, localizerEventHandler);
@ -172,7 +175,7 @@ public class TestLocalResourcesTrackerImpl {
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(); ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
localrsrc.put(req1, lr1); localrsrc.put(req1, lr1);
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
dispatcher, localrsrc); dispatcher, localrsrc, false, conf);
ResourceEvent req11Event = new ResourceRequestEvent(req1, ResourceEvent req11Event = new ResourceRequestEvent(req1,
LocalResourceVisibility.PUBLIC, lc1); LocalResourceVisibility.PUBLIC, lc1);
@ -221,6 +224,113 @@ public class TestLocalResourcesTrackerImpl {
} }
} }
@Test(timeout = 100000)
@SuppressWarnings("unchecked")
public void testHierarchicalLocalCacheDirectories() {
String user = "testuser";
DrainDispatcher dispatcher = null;
try {
Configuration conf = new Configuration();
// setting per directory file limit to 1.
conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
dispatcher = createDispatcher(conf);
EventHandler<LocalizerEvent> localizerEventHandler =
mock(EventHandler.class);
EventHandler<LocalizerEvent> containerEventHandler =
mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
dispatcher.register(ContainerEventType.class, containerEventHandler);
DeletionService mockDelService = mock(DeletionService.class);
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
dispatcher, localrsrc, true, conf);
// This is a random path. NO File creation will take place at this place.
Path localDir = new Path("/tmp");
// Container 1 needs lr1 resource
ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
LocalResourceVisibility.PUBLIC);
LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
// Container 1 requests lr1 to be localized
ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
LocalResourceVisibility.PUBLIC, lc1);
tracker.handle(reqEvent1);
// Simulate the process of localization of lr1
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
// Simulate lr1 getting localized
ResourceLocalizedEvent rle =
new ResourceLocalizedEvent(lr1,
new Path(hierarchicalPath1.toUri().toString() +
Path.SEPARATOR + "file1"), 120);
tracker.handle(rle);
// Localization successful.
tracker.localizationCompleted(lr1, true);
LocalResourceRequest lr2 = createLocalResourceRequest(user, 3, 3,
LocalResourceVisibility.PUBLIC);
Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
// localization failed.
tracker.localizationCompleted(lr2, false);
/*
* The path returned for two localization should be different because we
* are limiting one file per sub-directory.
*/
Assert.assertNotSame(hierarchicalPath1, hierarchicalPath2);
LocalResourceRequest lr3 = createLocalResourceRequest(user, 2, 2,
LocalResourceVisibility.PUBLIC);
ResourceEvent reqEvent3 = new ResourceRequestEvent(lr3,
LocalResourceVisibility.PUBLIC, lc1);
tracker.handle(reqEvent3);
Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir);
tracker.localizationCompleted(lr3, true);
// Verifying that path created is inside the subdirectory
Assert.assertEquals(hierarchicalPath3.toUri().toString(),
hierarchicalPath1.toUri().toString() + Path.SEPARATOR + "0");
// Container 1 releases resource lr1
ResourceEvent relEvent1 = new ResourceReleaseEvent(lr1, cId1);
tracker.handle(relEvent1);
// Validate the file counts now
int resources = 0;
Iterator<LocalizedResource> iter = tracker.iterator();
while (iter.hasNext()) {
iter.next();
resources++;
}
// There should be only two resources lr1 and lr3 now.
Assert.assertEquals(2, resources);
// Now simulate cache cleanup - removes unused resources.
iter = tracker.iterator();
while (iter.hasNext()) {
LocalizedResource rsrc = iter.next();
if (rsrc.getRefCount() == 0) {
Assert.assertTrue(tracker.remove(rsrc, mockDelService));
resources--;
}
}
// lr1 is not used by anyone and will be removed, only lr3 will hang
// around
Assert.assertEquals(1, resources);
} finally {
if (dispatcher != null) {
dispatcher.stop();
}
}
}
private boolean createdummylocalizefile(Path path) { private boolean createdummylocalizefile(Path path) {
boolean ret = false; boolean ret = false;
File file = new File(path.toUri().getRawPath().toString()); File file = new File(path.toUri().getRawPath().toString());

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -76,10 +77,11 @@ public class TestResourceRetention {
LocalResourcesTracker createMockTracker(String user, final long rsrcSize, LocalResourcesTracker createMockTracker(String user, final long rsrcSize,
long nRsrcs, long timestamp, long tsstep) { long nRsrcs, long timestamp, long tsstep) {
Configuration conf = new Configuration();
ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources = ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>(); new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null, LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
trackerResources)); trackerResources, false, conf));
for (int i = 0; i < nRsrcs; ++i) { for (int i = 0; i < nRsrcs; ++i) {
final LocalResourceRequest req = new LocalResourceRequest( final LocalResourceRequest req = new LocalResourceRequest(
new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep, new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,