diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b07e6814d8..501509ac83 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -178,7 +178,11 @@ Release 2.0.5-beta - UNRELEASED
YARN-382. SchedulerUtils improve way normalizeRequest sets the resource
capabilities. (Zhijie Shen via bikas)
-
+
+ YARN-467. Modify public distributed cache to localize files such that no
+ local directory hits unix file count limits and thus prevent job failures.
+ (Omkar Vinit Joshi via vinodkv)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 2d63dad48b..247406434e 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -256,4 +256,18 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 7ad8a7e6b2..01aac81b3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -340,7 +340,15 @@ public class YarnConfiguration extends Configuration {
/**List of directories to store localized files in.*/
public static final String NM_LOCAL_DIRS = NM_PREFIX + "local-dirs";
public static final String DEFAULT_NM_LOCAL_DIRS = "/tmp/nm-local-dir";
-
+
+ /**
+ * Number of files in each localized directories
+ * Avoid tuning this too low.
+ */
+ public static final String NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY =
+ NM_PREFIX + "local-cache.max-files-per-directory";
+ public static final int DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY = 8192;
+
/** Address where the localizer IPC is.*/
public static final String NM_LOCALIZER_ADDRESS =
NM_PREFIX + "localizer.address";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index a26407a165..ed99f9a799 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -359,6 +359,25 @@
${hadoop.tmp.dir}/nm-local-dir
+
+ It limits the maximum number of files which will be localized
+ in a single local directory. If the limit is reached then sub-directories
+ will be created and new files will be localized in them. If it is set to
+ a value less than or equal to 36 [which are sub-directories (0-9 and then
+ a-z)] then NodeManager will fail to start. For example; [for public
+ cache] if this is configured with a value of 40 ( 4 files +
+ 36 sub-directories) and the local-dir is "/tmp/local-dir1" then it will
+ allow 4 files to be created directly inside "/tmp/local-dir1/filecache".
+ For files that are localized further it will create a sub-directory "0"
+ inside "/tmp/local-dir1/filecache" and will localize files inside it
+ until it becomes full. If a file is removed from a sub-directory that
+ is marked full, then that sub-directory will be used back again to
+ localize files.
+
+ yarn.nodemanager.local-cache.max-files-per-directory
+ 8192
+
+
Address where the localizer IPC is.
yarn.nodemanager.localizer.address
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
new file mode 100644
index 0000000000..e32551faea
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * {@link LocalCacheDirectoryManager} is used for managing hierarchical
+ * directories for local cache. It will allow to restrict the number of files in
+ * a directory to
+ * {@link YarnConfiguration#NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY} which
+ * includes 36 sub-directories (named from 0 to 9 and a to z). Root directory is
+ * represented by an empty string. It internally maintains a vacant directory
+ * queue. As soon as the file count for the directory reaches its limit; new
+ * files will not be created in it until at least one file is deleted from it.
+ * New sub directories are not created unless a
+ * {@link LocalCacheDirectoryManager#getRelativePathForLocalization()} request
+ * is made and nonFullDirectories are empty.
+ *
+ * Note : this structure only returns relative localization path but doesn't
+ * create one on disk.
+ */
+public class LocalCacheDirectoryManager {
+
+ private final int perDirectoryFileLimit;
+ // total 36 = a to z plus 0 to 9
+ public static final int DIRECTORIES_PER_LEVEL = 36;
+
+ private Queue nonFullDirectories;
+ private HashMap knownDirectories;
+ private int totalSubDirectories;
+
+ public LocalCacheDirectoryManager(Configuration conf) {
+ totalSubDirectories = 0;
+ Directory rootDir = new Directory(totalSubDirectories);
+ nonFullDirectories = new LinkedList();
+ knownDirectories = new HashMap();
+ knownDirectories.put("", rootDir);
+ nonFullDirectories.add(rootDir);
+ this.perDirectoryFileLimit =
+ conf.getInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
+ YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY) - 36;
+ }
+
+ /**
+ * This method will return relative path from the first available vacant
+ * directory.
+ *
+ * @return {@link String} relative path for localization
+ */
+ public synchronized String getRelativePathForLocalization() {
+ if (nonFullDirectories.isEmpty()) {
+ totalSubDirectories++;
+ Directory newDir = new Directory(totalSubDirectories);
+ nonFullDirectories.add(newDir);
+ knownDirectories.put(newDir.getRelativePath(), newDir);
+ }
+ Directory subDir = nonFullDirectories.peek();
+ if (subDir.incrementAndGetCount() >= perDirectoryFileLimit) {
+ nonFullDirectories.remove();
+ }
+ return subDir.getRelativePath();
+ }
+
+ /**
+ * This method will reduce the file count for the directory represented by
+ * path. The root directory of this Local cache directory manager is
+ * represented by an empty string.
+ */
+ public synchronized void decrementFileCountForPath(String relPath) {
+ relPath = relPath == null ? "" : relPath.trim();
+ Directory subDir = knownDirectories.get(relPath);
+ int oldCount = subDir.getCount();
+ if (subDir.decrementAndGetCount() < perDirectoryFileLimit
+ && oldCount >= perDirectoryFileLimit) {
+ nonFullDirectories.add(subDir);
+ }
+ }
+
+ /*
+ * It limits the number of files and sub directories in the directory to the
+ * limit LocalCacheDirectoryManager#perDirectoryFileLimit.
+ */
+ static class Directory {
+
+ private final String relativePath;
+ private int fileCount;
+
+ public Directory(int directoryNo) {
+ fileCount = 0;
+ if (directoryNo == 0) {
+ relativePath = "";
+ } else {
+ String tPath = Integer.toString(directoryNo - 1, DIRECTORIES_PER_LEVEL);
+ StringBuffer sb = new StringBuffer();
+ if (tPath.length() == 1) {
+ sb.append(tPath.charAt(0));
+ } else {
+ // this is done to make sure we also reuse 0th sub directory
+ sb.append(Integer.toString(
+ Integer.parseInt(tPath.substring(0, 1), DIRECTORIES_PER_LEVEL) - 1,
+ DIRECTORIES_PER_LEVEL));
+ }
+ for (int i = 1; i < tPath.length(); i++) {
+ sb.append(Path.SEPARATOR).append(tPath.charAt(i));
+ }
+ relativePath = sb.toString();
+ }
+ }
+
+ public int incrementAndGetCount() {
+ return ++fileCount;
+ }
+
+ public int decrementAndGetCount() {
+ return --fileCount;
+ }
+
+ public String getRelativePath() {
+ return relativePath;
+ }
+
+ public int getCount() {
+ return fileCount;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
index b24d8afb8a..5f451182d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -35,6 +36,11 @@ interface LocalResourcesTracker
boolean remove(LocalizedResource req, DeletionService delService);
+ Path getPathForLocalization(LocalResourceRequest req, Path localDirPath);
+
String getUser();
+ // TODO: Remove this in favour of EventHandler.handle
+ void localizationCompleted(LocalResourceRequest req, boolean success);
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
index 01ec38397b..c025d611d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
@@ -26,12 +26,13 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+
/**
* A collection of {@link LocalizedResource}s all of same
@@ -49,17 +50,43 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
private final String user;
private final Dispatcher dispatcher;
private final ConcurrentMap localrsrc;
+ private Configuration conf;
+ /*
+ * This flag controls whether this resource tracker uses hierarchical
+ * directories or not. For PRIVATE and PUBLIC resource trackers it
+ * will be set whereas for APPLICATION resource tracker it would
+ * be false.
+ */
+ private final boolean useLocalCacheDirectoryManager;
+ private ConcurrentHashMap directoryManagers;
+ /*
+ * It is used to keep track of resource into hierarchical directory
+ * while it is getting downloaded. It is useful for reference counting
+ * in case resource localization fails.
+ */
+ private ConcurrentHashMap
+ inProgressLocalResourcesMap;
- public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher) {
+ public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+ boolean useLocalCacheDirectoryManager, Configuration conf) {
this(user, dispatcher,
- new ConcurrentHashMap());
+ new ConcurrentHashMap(),
+ useLocalCacheDirectoryManager, conf);
}
LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
- ConcurrentMap localrsrc) {
+ ConcurrentMap localrsrc,
+ boolean useLocalCacheDirectoryManager, Configuration conf) {
this.user = user;
this.dispatcher = dispatcher;
this.localrsrc = localrsrc;
+ this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
+ if ( this.useLocalCacheDirectoryManager) {
+ directoryManagers = new ConcurrentHashMap();
+ inProgressLocalResourcesMap =
+ new ConcurrentHashMap();
+ }
+ this.conf = conf;
}
@Override
@@ -73,6 +100,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
LOG.info("Resource " + rsrc.getLocalPath()
+ " is missing, localizing it again");
localrsrc.remove(req);
+ decrementFileCountForLocalCacheDirectory(req, rsrc);
rsrc = null;
}
if (null == rsrc) {
@@ -90,7 +118,52 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
rsrc.handle(event);
}
- /**
+ /*
+ * Update the file-count statistics for a local cache-directory.
+ * This will retrieve the localized path for the resource from
+ * 1) inProgressRsrcMap if the resource was under localization and it
+ * failed.
+ * 2) LocalizedResource if the resource is already localized.
+ * From this path it will identify the local directory under which the
+ * resource was localized. Then rest of the path will be used to decrement
+ * file count for the HierarchicalSubDirectory pointing to this relative
+ * path.
+ */
+ private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req,
+ LocalizedResource rsrc) {
+ if ( useLocalCacheDirectoryManager) {
+ Path rsrcPath = null;
+ if (inProgressLocalResourcesMap.containsKey(req)) {
+ // This happens when localization of a resource fails.
+ rsrcPath = inProgressLocalResourcesMap.remove(req);
+ } else if (rsrc != null && rsrc.getLocalPath() != null) {
+ rsrcPath = rsrc.getLocalPath().getParent().getParent();
+ }
+ if (rsrcPath != null) {
+ Path parentPath = new Path(rsrcPath.toUri().getRawPath());
+ while (!directoryManagers.containsKey(parentPath)) {
+ parentPath = parentPath.getParent();
+ if ( parentPath == null) {
+ return;
+ }
+ }
+ if ( parentPath != null) {
+ String parentDir = parentPath.toUri().getRawPath().toString();
+ LocalCacheDirectoryManager dir = directoryManagers.get(parentPath);
+ String rsrcDir = rsrcPath.toUri().getRawPath();
+ if (rsrcDir.equals(parentDir)) {
+ dir.decrementFileCountForPath("");
+ } else {
+ dir.decrementFileCountForPath(
+ rsrcDir.substring(
+ parentDir.length() + 1));
+ }
+ }
+ }
+ }
+ }
+
+/**
* This module checks if the resource which was localized is already present
* or not
*
@@ -100,7 +173,8 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
public boolean isResourcePresent(LocalizedResource rsrc) {
boolean ret = true;
if (rsrc.getState() == ResourceState.LOCALIZED) {
- File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString());
+ File file = new File(rsrc.getLocalPath().toUri().getRawPath().
+ toString());
if (!file.exists()) {
ret = false;
}
@@ -133,11 +207,11 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
}
+ decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
return true;
}
}
-
/**
* Returns the path up to the random directory component.
*/
@@ -163,4 +237,50 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
public Iterator iterator() {
return localrsrc.values().iterator();
}
-}
+
+ /**
+ * @return {@link Path} absolute path for localization which includes local
+ * directory path and the relative hierarchical path (if use local
+ * cache directory manager is enabled)
+ *
+ * @param {@link LocalResourceRequest} Resource localization request to
+ * localize the resource.
+ * @param {@link Path} local directory path
+ */
+ @Override
+ public Path
+ getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
+ if (useLocalCacheDirectoryManager && localDirPath != null) {
+
+ if (!directoryManagers.containsKey(localDirPath)) {
+ directoryManagers.putIfAbsent(localDirPath,
+ new LocalCacheDirectoryManager(conf));
+ }
+ LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
+
+ Path rPath = localDirPath;
+ String hierarchicalPath = dir.getRelativePathForLocalization();
+ // For most of the scenarios we will get root path only which
+ // is an empty string
+ if (!hierarchicalPath.isEmpty()) {
+ rPath = new Path(localDirPath, hierarchicalPath);
+ }
+ inProgressLocalResourcesMap.put(req, rPath);
+ return rPath;
+ } else {
+ return localDirPath;
+ }
+ }
+
+ @Override
+ public void localizationCompleted(LocalResourceRequest req,
+ boolean success) {
+ if (useLocalCacheDirectoryManager) {
+ if (!success) {
+ decrementFileCountForLocalCacheDirectory(req, null);
+ } else {
+ inProgressLocalResourcesMap.remove(req);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 29971c5b65..35b2fb5f47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -130,7 +131,7 @@ public class ResourceLocalizationService extends CompositeService
private RecordFactory recordFactory;
private final ScheduledExecutorService cacheCleanup;
- private final LocalResourcesTracker publicRsrc;
+ private LocalResourcesTracker publicRsrc;
private LocalDirsHandlerService dirsHandler;
@@ -158,7 +159,6 @@ public class ResourceLocalizationService extends CompositeService
this.delService = delService;
this.dirsHandler = dirsHandler;
- this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
@@ -173,8 +173,26 @@ public class ResourceLocalizationService extends CompositeService
}
}
+ private void validateConf(Configuration conf) {
+ int perDirFileLimit =
+ conf.getInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
+ YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY);
+ if (perDirFileLimit <= 36) {
+ LOG.error(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+ + " parameter is configured with very low value.");
+ throw new YarnException(
+ YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+ + " parameter is configured with a value less than 37.");
+ } else {
+ LOG.info("per directory file limit = " + perDirFileLimit);
+ }
+ }
+
@Override
public void init(Configuration conf) {
+ this.validateConf(conf);
+ this.publicRsrc =
+ new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
@@ -212,6 +230,7 @@ public class ResourceLocalizationService extends CompositeService
YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
+
localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker);
@@ -306,15 +325,17 @@ public class ResourceLocalizationService extends CompositeService
private void handleInitApplicationResources(Application app) {
// 0) Create application tracking structs
String userName = app.getUser();
- privateRsrc.putIfAbsent(userName,
- new LocalResourcesTrackerImpl(userName, dispatcher));
- if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
- new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
+ privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
+ dispatcher, false, super.getConfig()));
+ if (null != appRsrc.putIfAbsent(
+ ConverterUtils.toString(app.getAppId()),
+ new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
+ .getConfig()))) {
LOG.warn("Initializing application " + app + " already present");
assert false; // TODO: FIXME assert doesn't help
// ^ The condition is benign. Tests should fail and it
- // should appear in logs, but it's an internal error
- // that should have no effect on applications
+ // should appear in logs, but it's an internal error
+ // that should have no effect on applications
}
// 1) Signal container init
//
@@ -620,6 +641,13 @@ public class ResourceLocalizationService extends CompositeService
Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
"." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
+ Path hierarchicalPath =
+ publicRsrc.getPathForLocalization(key, publicDirDestPath);
+ if (!hierarchicalPath.equals(publicDirDestPath)) {
+ publicDirDestPath = hierarchicalPath;
+ DiskChecker.checkDir(
+ new File(publicDirDestPath.toUri().getPath()));
+ }
pending.put(queue.submit(new FSDownload(
lfs, null, conf, publicDirDestPath, resource, new Random())),
request);
@@ -654,19 +682,21 @@ public class ResourceLocalizationService extends CompositeService
assoc.getResource().handle(
new ResourceLocalizedEvent(key,
local, FileUtil.getDU(new File(local.toUri()))));
+ publicRsrc.localizationCompleted(key, true);
synchronized (attempts) {
attempts.remove(key);
}
} catch (ExecutionException e) {
LOG.info("Failed to download rsrc " + assoc.getResource(),
e.getCause());
+ LocalResourceRequest req = assoc.getResource().getRequest();
dispatcher.getEventHandler().handle(
new ContainerResourceFailedEvent(
assoc.getContext().getContainerId(),
- assoc.getResource().getRequest(), e.getCause()));
+ req, e.getCause()));
+ publicRsrc.localizationCompleted(req, false);
List reqs;
synchronized (attempts) {
- LocalResourceRequest req = assoc.getResource().getRequest();
reqs = attempts.get(req);
if (null == reqs) {
LOG.error("Missing pending list for " + req);
@@ -1003,4 +1033,4 @@ public class ResourceLocalizationService extends CompositeService
del.delete(null, dirPath, new Path[] {});
}
-}
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
new file mode 100644
index 0000000000..057d7cce6f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Test;
+
+public class TestLocalCacheDirectoryManager {
+
+ @Test(timeout = 10000)
+ public void testHierarchicalSubDirectoryCreation() {
+ // setting per directory file limit to 1.
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
+
+ LocalCacheDirectoryManager hDir = new LocalCacheDirectoryManager(conf);
+ // Test root directory path = ""
+ Assert.assertTrue(hDir.getRelativePathForLocalization().isEmpty());
+
+ // Testing path generation from "0" to "0/0/z/z"
+ for (int i = 1; i <= 37 * 36 * 36; i++) {
+ StringBuffer sb = new StringBuffer();
+ String num = Integer.toString(i - 1, 36);
+ if (num.length() == 1) {
+ sb.append(num.charAt(0));
+ } else {
+ sb.append(Integer.toString(
+ Integer.parseInt(num.substring(0, 1), 36) - 1, 36));
+ }
+ for (int j = 1; j < num.length(); j++) {
+ sb.append(Path.SEPARATOR).append(num.charAt(j));
+ }
+ Assert.assertEquals(sb.toString(), hDir.getRelativePathForLocalization());
+ }
+
+ String testPath1 = "4";
+ String testPath2 = "2";
+ /*
+ * Making sure directory "4" and "2" becomes non-full so that they are
+ * reused for future getRelativePathForLocalization() calls in the order
+ * they are freed.
+ */
+ hDir.decrementFileCountForPath(testPath1);
+ hDir.decrementFileCountForPath(testPath2);
+ // After below call directory "4" should become full.
+ Assert.assertEquals(testPath1, hDir.getRelativePathForLocalization());
+ Assert.assertEquals(testPath2, hDir.getRelativePathForLocalization());
+ }
+
+ @Test(timeout = 10000)
+ public void testMinimumPerDirectoryFileLimit() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1");
+ Exception e = null;
+ ResourceLocalizationService service =
+ new ResourceLocalizationService(null, null, null, null);
+ try {
+ service.init(conf);
+ } catch (Exception e1) {
+ e = e1;
+ }
+ Assert.assertNotNull(e);
+ Assert.assertEquals(YarnException.class, e.getClass());
+ Assert.assertEquals(e.getMessage(),
+ YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+ + " parameter is configured with a value less than 37.");
+
+ }
+
+ @Test(timeout = 1000)
+ public void testDirectoryStateChangeFromFullToNonFull() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "40");
+ LocalCacheDirectoryManager dir = new LocalCacheDirectoryManager(conf);
+
+ // checking for first four paths
+ String rootPath = "";
+ String firstSubDir = "0";
+ for (int i = 0; i < 4; i++) {
+ Assert.assertEquals(rootPath, dir.getRelativePathForLocalization());
+ }
+ // Releasing two files from the root directory.
+ dir.decrementFileCountForPath(rootPath);
+ dir.decrementFileCountForPath(rootPath);
+ // Space for two files should be available in root directory.
+ Assert.assertEquals(rootPath, dir.getRelativePathForLocalization());
+ Assert.assertEquals(rootPath, dir.getRelativePathForLocalization());
+ // As no space is now available in root directory so it should be from
+ // first sub directory
+ Assert.assertEquals(firstSubDir, dir.getRelativePathForLocalization());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
index 0e0a47200a..a8bbdb0352 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -50,17 +51,17 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
-import org.mortbay.log.Log;
public class TestLocalResourcesTrackerImpl {
- @Test
+ @Test(timeout=10000)
@SuppressWarnings("unchecked")
public void test() {
String user = "testuser";
DrainDispatcher dispatcher = null;
try {
- dispatcher = createDispatcher(new Configuration());
+ Configuration conf = new Configuration();
+ dispatcher = createDispatcher(conf);
EventHandler localizerEventHandler =
mock(EventHandler.class);
EventHandler containerEventHandler =
@@ -86,7 +87,8 @@ public class TestLocalResourcesTrackerImpl {
localrsrc.put(req1, lr1);
localrsrc.put(req2, lr2);
LocalResourcesTracker tracker =
- new LocalResourcesTrackerImpl(user, dispatcher, localrsrc);
+ new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, false,
+ conf);
ResourceEvent req11Event =
new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
@@ -152,13 +154,14 @@ public class TestLocalResourcesTrackerImpl {
}
}
- @Test
+ @Test(timeout=10000)
@SuppressWarnings("unchecked")
public void testConsistency() {
String user = "testuser";
DrainDispatcher dispatcher = null;
try {
- dispatcher = createDispatcher(new Configuration());
+ Configuration conf = new Configuration();
+ dispatcher = createDispatcher(conf);
EventHandler localizerEventHandler = mock(EventHandler.class);
EventHandler containerEventHandler = mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
@@ -172,7 +175,7 @@ public class TestLocalResourcesTrackerImpl {
ConcurrentMap localrsrc = new ConcurrentHashMap();
localrsrc.put(req1, lr1);
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
- dispatcher, localrsrc);
+ dispatcher, localrsrc, false, conf);
ResourceEvent req11Event = new ResourceRequestEvent(req1,
LocalResourceVisibility.PUBLIC, lc1);
@@ -221,6 +224,113 @@ public class TestLocalResourcesTrackerImpl {
}
}
+ @Test(timeout = 100000)
+ @SuppressWarnings("unchecked")
+ public void testHierarchicalLocalCacheDirectories() {
+ String user = "testuser";
+ DrainDispatcher dispatcher = null;
+ try {
+ Configuration conf = new Configuration();
+ // setting per directory file limit to 1.
+ conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
+ dispatcher = createDispatcher(conf);
+
+ EventHandler localizerEventHandler =
+ mock(EventHandler.class);
+ EventHandler containerEventHandler =
+ mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
+
+ DeletionService mockDelService = mock(DeletionService.class);
+
+ ConcurrentMap localrsrc =
+ new ConcurrentHashMap();
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ dispatcher, localrsrc, true, conf);
+
+ // This is a random path. NO File creation will take place at this place.
+ Path localDir = new Path("/tmp");
+
+ // Container 1 needs lr1 resource
+ ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+ LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+ LocalResourceVisibility.PUBLIC);
+ LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+
+ // Container 1 requests lr1 to be localized
+ ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
+ LocalResourceVisibility.PUBLIC, lc1);
+ tracker.handle(reqEvent1);
+
+ // Simulate the process of localization of lr1
+ Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+ // Simulate lr1 getting localized
+ ResourceLocalizedEvent rle =
+ new ResourceLocalizedEvent(lr1,
+ new Path(hierarchicalPath1.toUri().toString() +
+ Path.SEPARATOR + "file1"), 120);
+ tracker.handle(rle);
+ // Localization successful.
+ tracker.localizationCompleted(lr1, true);
+
+ LocalResourceRequest lr2 = createLocalResourceRequest(user, 3, 3,
+ LocalResourceVisibility.PUBLIC);
+ Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
+ // localization failed.
+ tracker.localizationCompleted(lr2, false);
+
+ /*
+ * The path returned for two localization should be different because we
+ * are limiting one file per sub-directory.
+ */
+ Assert.assertNotSame(hierarchicalPath1, hierarchicalPath2);
+
+ LocalResourceRequest lr3 = createLocalResourceRequest(user, 2, 2,
+ LocalResourceVisibility.PUBLIC);
+ ResourceEvent reqEvent3 = new ResourceRequestEvent(lr3,
+ LocalResourceVisibility.PUBLIC, lc1);
+ tracker.handle(reqEvent3);
+ Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir);
+ tracker.localizationCompleted(lr3, true);
+
+ // Verifying that path created is inside the subdirectory
+ Assert.assertEquals(hierarchicalPath3.toUri().toString(),
+ hierarchicalPath1.toUri().toString() + Path.SEPARATOR + "0");
+
+ // Container 1 releases resource lr1
+ ResourceEvent relEvent1 = new ResourceReleaseEvent(lr1, cId1);
+ tracker.handle(relEvent1);
+
+ // Validate the file counts now
+ int resources = 0;
+ Iterator iter = tracker.iterator();
+ while (iter.hasNext()) {
+ iter.next();
+ resources++;
+ }
+ // There should be only two resources lr1 and lr3 now.
+ Assert.assertEquals(2, resources);
+
+ // Now simulate cache cleanup - removes unused resources.
+ iter = tracker.iterator();
+ while (iter.hasNext()) {
+ LocalizedResource rsrc = iter.next();
+ if (rsrc.getRefCount() == 0) {
+ Assert.assertTrue(tracker.remove(rsrc, mockDelService));
+ resources--;
+ }
+ }
+ // lr1 is not used by anyone and will be removed, only lr3 will hang
+ // around
+ Assert.assertEquals(1, resources);
+ } finally {
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ }
+ }
+
private boolean createdummylocalizefile(Path path) {
boolean ret = false;
File file = new File(path.toUri().getRawPath().toString());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
index ee24548c5c..f3f7cc5067 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -76,10 +77,11 @@ public class TestResourceRetention {
LocalResourcesTracker createMockTracker(String user, final long rsrcSize,
long nRsrcs, long timestamp, long tsstep) {
+ Configuration conf = new Configuration();
ConcurrentMap trackerResources =
new ConcurrentHashMap();
LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
- trackerResources));
+ trackerResources, false, conf));
for (int i = 0; i < nRsrcs; ++i) {
final LocalResourceRequest req = new LocalResourceRequest(
new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,