diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1966bd2019..02f2a3a613 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -29,6 +29,9 @@ Release 2.5.0 - UNRELEASED YARN-1362. Distinguish between nodemanager shutdown for decommission vs shutdown for restart. (Jason Lowe via junping_du) + YARN-1338. Recover localized resource cache state upon nodemanager restart + (Jason Lowe via junping_du) + IMPROVEMENTS YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml index 7f34a2db3e..370cc36ea4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml @@ -156,6 +156,10 @@ org.apache.hadoop hadoop-yarn-server-common + + org.fusesource.leveldbjni + leveldbjni-all + @@ -292,6 +296,7 @@ ${basedir}/src/main/proto + yarn_server_nodemanager_recovery.proto yarn_server_nodemanager_service_protos.proto LocalizationProtocol.proto diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index c1c57b4b1f..956ea33493 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -67,6 +68,8 @@ public interface Context { ApplicationACLsManager getApplicationACLsManager(); + NMStateStoreService getNMStateStore(); + boolean getDecommissioned(); void setDecommissioned(boolean isDecommissioned); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 913578fcf9..83b0ede5b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -53,6 +53,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; @@ -78,6 +81,7 @@ public class NodeManager extends CompositeService private ContainerManagerImpl containerManager; private NodeStatusUpdater nodeStatusUpdater; private static CompositeServiceShutdownHook nodeManagerShutdownHook; + private NMStateStoreService nmStore = null; private AtomicBoolean isStopping = new AtomicBoolean(false); @@ -115,9 +119,10 @@ protected DeletionService createDeletionService(ContainerExecutor exec) { protected NMContext createNMContext( NMContainerTokenSecretManager containerTokenSecretManager, - NMTokenSecretManagerInNM nmTokenSecretManager) { + NMTokenSecretManagerInNM nmTokenSecretManager, + NMStateStoreService stateStore) { return new NMContext(containerTokenSecretManager, nmTokenSecretManager, - dirsHandler, aclsManager); + dirsHandler, aclsManager, stateStore); } protected void doSecureLogin() throws IOException { @@ -125,11 +130,8 @@ protected void doSecureLogin() throws IOException { YarnConfiguration.NM_PRINCIPAL); } - @Override - protected void serviceInit(Configuration conf) throws Exception { - - conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); - + private void initAndStartRecoveryStore(Configuration conf) + throws IOException { boolean recoveryEnabled = conf.getBoolean( YarnConfiguration.NM_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED); @@ -142,7 +144,36 @@ protected void serviceInit(Configuration conf) throws Exception { } Path recoveryRoot = new Path(recoveryDirName); recoveryFs.mkdirs(recoveryRoot, new FsPermission((short)0700)); + nmStore = new NMLeveldbStateStoreService(); + } else { + nmStore = new NMNullStateStoreService(); } + nmStore.init(conf); + nmStore.start(); + } + + private void stopRecoveryStore() throws IOException { + nmStore.stop(); + if (context.getDecommissioned() && nmStore.canRecover()) { + LOG.info("Removing state store due to decommission"); + Configuration conf = getConfig(); + Path recoveryRoot = new Path( + conf.get(YarnConfiguration.NM_RECOVERY_DIR)); + LOG.info("Removing state store at " + recoveryRoot + + " due to decommission"); + FileSystem recoveryFs = FileSystem.getLocal(conf); + if (!recoveryFs.delete(recoveryRoot, true)) { + LOG.warn("Unable to delete " + recoveryRoot); + } + } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + + initAndStartRecoveryStore(conf); NMContainerTokenSecretManager containerTokenSecretManager = new NMContainerTokenSecretManager(conf); @@ -171,7 +202,7 @@ protected void serviceInit(Configuration conf) throws Exception { dirsHandler = nodeHealthChecker.getDiskHandler(); this.context = createNMContext(containerTokenSecretManager, - nmTokenSecretManager); + nmTokenSecretManager, nmStore); nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); @@ -220,6 +251,7 @@ protected void serviceStop() throws Exception { return; } super.serviceStop(); + stopRecoveryStore(); DefaultMetricsSystem.shutdown(); } @@ -272,11 +304,13 @@ public static class NMContext implements Context { private WebServer webServer; private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class); + private final NMStateStoreService stateStore; private boolean isDecommissioned = false; public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, - LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager) { + LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, + NMStateStoreService stateStore) { this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; this.dirsHandler = dirsHandler; @@ -284,6 +318,7 @@ public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, this.nodeHealthStatus.setIsNodeHealthy(true); this.nodeHealthStatus.setHealthReport("Healthy"); this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis()); + this.stateStore = stateStore; } /** @@ -351,6 +386,11 @@ public ApplicationACLsManager getApplicationACLsManager() { return aclsManager; } + @Override + public NMStateStoreService getNMStateStore() { + return stateStore; + } + @Override public boolean getDecommissioned() { return isDecommissioned; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index dd3deb3c56..750c11a981 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -116,6 +117,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -218,6 +220,15 @@ public void serviceInit(Configuration conf) throws Exception { SHUTDOWN_CLEANUP_SLOP_MS; super.serviceInit(conf); + recover(); + } + + private void recover() throws IOException, URISyntaxException { + NMStateStoreService stateStore = context.getNMStateStore(); + if (stateStore.canRecover()) { + rsrcLocalizationSrvc.recoverLocalizedResources( + stateStore.loadLocalizationState()); + } } protected LogHandler createLogHandler(Configuration conf, Context context, @@ -239,7 +250,7 @@ public ContainersMonitor getContainersMonitor() { protected ResourceLocalizationService createResourceLocalizationService( ContainerExecutor exec, DeletionService deletionContext) { return new ResourceLocalizationService(this.dispatcher, exec, - deletionContext, dirsHandler); + deletionContext, dirsHandler, context.getNMStateStore()); } protected ContainersLauncher createContainersLauncher(Context context, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java index 8a3b6bf208..4649e0b892 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java @@ -26,6 +26,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import com.google.common.annotations.VisibleForTesting; + /** * {@link LocalCacheDirectoryManager} is used for managing hierarchical * directories for local cache. It will allow to restrict the number of files in @@ -99,6 +101,57 @@ public synchronized void decrementFileCountForPath(String relPath) { } } + /** + * Increment the file count for a relative directory within the cache + * + * @param relPath the relative path + */ + public synchronized void incrementFileCountForPath(String relPath) { + relPath = relPath == null ? "" : relPath.trim(); + Directory subDir = knownDirectories.get(relPath); + if (subDir == null) { + int dirnum = Directory.getDirectoryNumber(relPath); + totalSubDirectories = Math.max(dirnum, totalSubDirectories); + subDir = new Directory(dirnum); + nonFullDirectories.add(subDir); + knownDirectories.put(subDir.getRelativePath(), subDir); + } + if (subDir.incrementAndGetCount() >= perDirectoryFileLimit) { + nonFullDirectories.remove(subDir); + } + } + + /** + * Given a path to a directory within a local cache tree return the + * root of the cache directory. + * + * @param path the directory within a cache directory + * @return the local cache directory root or null if not found + */ + public static Path getCacheDirectoryRoot(Path path) { + while (path != null) { + String name = path.getName(); + if (name.length() != 1) { + return path; + } + int dirnum = DIRECTORIES_PER_LEVEL; + try { + dirnum = Integer.parseInt(name, DIRECTORIES_PER_LEVEL); + } catch (NumberFormatException e) { + } + if (dirnum >= DIRECTORIES_PER_LEVEL) { + return path; + } + path = path.getParent(); + } + return path; + } + + @VisibleForTesting + synchronized Directory getDirectory(String relPath) { + return knownDirectories.get(relPath); + } + /* * It limits the number of files and sub directories in the directory to the * limit LocalCacheDirectoryManager#perDirectoryFileLimit. @@ -108,11 +161,9 @@ static class Directory { private final String relativePath; private int fileCount; - public Directory(int directoryNo) { - fileCount = 0; - if (directoryNo == 0) { - relativePath = ""; - } else { + static String getRelativePath(int directoryNo) { + String relativePath = ""; + if (directoryNo > 0) { String tPath = Integer.toString(directoryNo - 1, DIRECTORIES_PER_LEVEL); StringBuffer sb = new StringBuffer(); if (tPath.length() == 1) { @@ -128,6 +179,27 @@ public Directory(int directoryNo) { } relativePath = sb.toString(); } + return relativePath; + } + + static int getDirectoryNumber(String relativePath) { + String numStr = relativePath.replace("/", ""); + if (relativePath.isEmpty()) { + return 0; + } + if (numStr.length() > 1) { + // undo step from getRelativePath() to reuse 0th sub directory + String firstChar = Integer.toString( + Integer.parseInt(numStr.substring(0, 1), + DIRECTORIES_PER_LEVEL) + 1, DIRECTORIES_PER_LEVEL); + numStr = firstChar + numStr.substring(1); + } + return Integer.parseInt(numStr, DIRECTORIES_PER_LEVEL) + 1; + } + + public Directory(int directoryNo) { + fileCount = 0; + relativePath = getRelativePath(directoryNo); } public int incrementAndGetCount() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java index 7d00d94e4f..14ec911784 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java @@ -18,15 +18,12 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; -import com.google.common.annotations.VisibleForTesting; - /** * Component tracking resources all of the same {@link LocalResourceVisibility} * @@ -34,18 +31,11 @@ interface LocalResourcesTracker extends EventHandler, Iterable { - // TODO: Not used at all!! - boolean contains(LocalResourceRequest resource); - boolean remove(LocalizedResource req, DeletionService delService); Path getPathForLocalization(LocalResourceRequest req, Path localDirPath); String getUser(); - long nextUniqueNumber(); - - @VisibleForTesting - @Private LocalizedResource getLocalizedResource(LocalResourceRequest request); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index da959d7b00..7cf6b1572f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; import java.io.File; +import java.io.IOException; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -27,14 +28,21 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import com.google.common.annotations.VisibleForTesting; @@ -53,6 +61,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { .compile(RANDOM_DIR_REGEX); private final String user; + private final ApplicationId appId; private final Dispatcher dispatcher; private final ConcurrentMap localrsrc; private Configuration conf; @@ -77,17 +86,22 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { * per APPLICATION, USER and PUBLIC cache. */ private AtomicLong uniqueNumberGenerator = new AtomicLong(9); + private NMStateStoreService stateStore; - public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, - boolean useLocalCacheDirectoryManager, Configuration conf) { - this(user, dispatcher, + public LocalResourcesTrackerImpl(String user, ApplicationId appId, + Dispatcher dispatcher, boolean useLocalCacheDirectoryManager, + Configuration conf, NMStateStoreService stateStore) { + this(user, appId, dispatcher, new ConcurrentHashMap(), - useLocalCacheDirectoryManager, conf); + useLocalCacheDirectoryManager, conf, stateStore); } - LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, + LocalResourcesTrackerImpl(String user, ApplicationId appId, + Dispatcher dispatcher, ConcurrentMap localrsrc, - boolean useLocalCacheDirectoryManager, Configuration conf) { + boolean useLocalCacheDirectoryManager, Configuration conf, + NMStateStoreService stateStore) { + this.appId = appId; this.user = user; this.dispatcher = dispatcher; this.localrsrc = localrsrc; @@ -98,6 +112,7 @@ public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, new ConcurrentHashMap(); } this.conf = conf; + this.stateStore = stateStore; } /* @@ -119,8 +134,7 @@ public synchronized void handle(ResourceEvent event) { if (rsrc != null && (!isResourcePresent(rsrc))) { LOG.info("Resource " + rsrc.getLocalPath() + " is missing, localizing it again"); - localrsrc.remove(req); - decrementFileCountForLocalCacheDirectory(req, rsrc); + removeResource(req); rsrc = null; } if (null == rsrc) { @@ -141,15 +155,102 @@ public synchronized void handle(ResourceEvent event) { } break; case LOCALIZATION_FAILED: - decrementFileCountForLocalCacheDirectory(req, null); /* * If resource localization fails then Localized resource will be * removed from local cache. */ - localrsrc.remove(req); + removeResource(req); + break; + case RECOVERED: + if (rsrc != null) { + LOG.warn("Ignoring attempt to recover existing resource " + rsrc); + return; + } + rsrc = recoverResource(req, (ResourceRecoveredEvent) event); + localrsrc.put(req, rsrc); break; } + rsrc.handle(event); + + if (event.getType() == ResourceEventType.LOCALIZED) { + if (rsrc.getLocalPath() != null) { + try { + stateStore.finishResourceLocalization(user, appId, + buildLocalizedResourceProto(rsrc)); + } catch (IOException ioe) { + LOG.error("Error storing resource state for " + rsrc, ioe); + } + } else { + LOG.warn("Resource " + rsrc + " localized without a location"); + } + } + } + + private LocalizedResource recoverResource(LocalResourceRequest req, + ResourceRecoveredEvent event) { + // unique number for a resource is the directory of the resource + Path localDir = event.getLocalPath().getParent(); + long rsrcId = Long.parseLong(localDir.getName()); + + // update ID generator to avoid conflicts with existing resources + while (true) { + long currentRsrcId = uniqueNumberGenerator.get(); + long nextRsrcId = Math.max(currentRsrcId, rsrcId); + if (uniqueNumberGenerator.compareAndSet(currentRsrcId, nextRsrcId)) { + break; + } + } + + incrementFileCountForLocalCacheDirectory(localDir.getParent()); + + return new LocalizedResource(req, dispatcher); + } + + private LocalizedResourceProto buildLocalizedResourceProto( + LocalizedResource rsrc) { + return LocalizedResourceProto.newBuilder() + .setResource(buildLocalResourceProto(rsrc.getRequest())) + .setLocalPath(rsrc.getLocalPath().toString()) + .setSize(rsrc.getSize()) + .build(); + } + + private LocalResourceProto buildLocalResourceProto(LocalResource lr) { + LocalResourcePBImpl lrpb; + if (!(lr instanceof LocalResourcePBImpl)) { + lr = LocalResource.newInstance(lr.getResource(), lr.getType(), + lr.getVisibility(), lr.getSize(), lr.getTimestamp(), + lr.getPattern()); + } + lrpb = (LocalResourcePBImpl) lr; + return lrpb.getProto(); + } + + public void incrementFileCountForLocalCacheDirectory(Path cacheDir) { + if (useLocalCacheDirectoryManager) { + Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot( + cacheDir); + if (cacheRoot != null) { + LocalCacheDirectoryManager dir = directoryManagers.get(cacheRoot); + if (dir == null) { + dir = new LocalCacheDirectoryManager(conf); + LocalCacheDirectoryManager otherDir = + directoryManagers.putIfAbsent(cacheRoot, dir); + if (otherDir != null) { + dir = otherDir; + } + } + if (cacheDir.equals(cacheRoot)) { + dir.incrementFileCountForPath(""); + } else { + String dirStr = cacheDir.toUri().getRawPath(); + String rootStr = cacheRoot.toUri().getRawPath(); + dir.incrementFileCountForPath( + dirStr.substring(rootStr.length() + 1)); + } + } + } } /* @@ -216,11 +317,6 @@ public boolean isResourcePresent(LocalizedResource rsrc) { return ret; } - @Override - public boolean contains(LocalResourceRequest resource) { - return localrsrc.containsKey(resource); - } - @Override public boolean remove(LocalizedResource rem, DeletionService delService) { // current synchronization guaranteed by crude RLS event for cleanup @@ -237,16 +333,31 @@ public boolean remove(LocalizedResource rem, DeletionService delService) { + " with non-zero refcount"); return false; } else { // ResourceState is LOCALIZED or INIT - localrsrc.remove(rem.getRequest()); if (ResourceState.LOCALIZED.equals(rsrc.getState())) { delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath())); } - decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc); + removeResource(rem.getRequest()); LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache"); return true; } } + private void removeResource(LocalResourceRequest req) { + LocalizedResource rsrc = localrsrc.remove(req); + decrementFileCountForLocalCacheDirectory(req, rsrc); + if (rsrc != null) { + Path localPath = rsrc.getLocalPath(); + if (localPath != null) { + try { + stateStore.removeLocalizedResource(user, appId, localPath); + } catch (IOException e) { + LOG.error("Unable to remove resource " + rsrc + " from state store", + e); + } + } + } + } + /** * Returns the path up to the random directory component. */ @@ -285,6 +396,7 @@ public Iterator iterator() { @Override public Path getPathForLocalization(LocalResourceRequest req, Path localDirPath) { + Path rPath = localDirPath; if (useLocalCacheDirectoryManager && localDirPath != null) { if (!directoryManagers.containsKey(localDirPath)) { @@ -293,7 +405,7 @@ public Iterator iterator() { } LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath); - Path rPath = localDirPath; + rPath = localDirPath; String hierarchicalPath = dir.getRelativePathForLocalization(); // For most of the scenarios we will get root path only which // is an empty string @@ -301,21 +413,36 @@ public Iterator iterator() { rPath = new Path(localDirPath, hierarchicalPath); } inProgressLocalResourcesMap.put(req, rPath); - return rPath; - } else { - return localDirPath; } + + rPath = new Path(rPath, + Long.toString(uniqueNumberGenerator.incrementAndGet())); + Path localPath = new Path(rPath, req.getPath().getName()); + LocalizedResource rsrc = localrsrc.get(req); + rsrc.setLocalPath(localPath); + LocalResource lr = LocalResource.newInstance(req.getResource(), + req.getType(), req.getVisibility(), req.getSize(), + req.getTimestamp()); + try { + stateStore.startResourceLocalization(user, appId, + ((LocalResourcePBImpl) lr).getProto(), localPath); + } catch (IOException e) { + LOG.error("Unable to record localization start for " + rsrc, e); + } + return rPath; } - @Override - public long nextUniqueNumber() { - return uniqueNumberGenerator.incrementAndGet(); - } - - @VisibleForTesting - @Private @Override public LocalizedResource getLocalizedResource(LocalResourceRequest request) { return localrsrc.get(request); } -} \ No newline at end of file + + @VisibleForTesting + LocalCacheDirectoryManager getDirectoryManager(Path localDirPath) { + LocalCacheDirectoryManager mgr = null; + if (useLocalCacheDirectoryManager) { + mgr = directoryManagers.get(localDirPath); + } + return mgr; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java index ed110b07e9..e2d0fe1cc4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; @@ -54,8 +55,8 @@ public class LocalizedResource implements EventHandler { private static final Log LOG = LogFactory.getLog(LocalizedResource.class); - Path localPath; - long size = -1; + volatile Path localPath; + volatile long size = -1; final LocalResourceRequest rsrc; final Dispatcher dispatcher; final StateMachine @@ -76,6 +77,8 @@ public class LocalizedResource implements EventHandler { // From INIT (ref == 0, awaiting req) .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING, ResourceEventType.REQUEST, new FetchResourceTransition()) + .addTransition(ResourceState.INIT, ResourceState.LOCALIZED, + ResourceEventType.RECOVERED, new RecoveredTransition()) // From DOWNLOADING (ref > 0, may be localizing) .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING, @@ -157,6 +160,10 @@ public Path getLocalPath() { return localPath; } + public void setLocalPath(Path localPath) { + this.localPath = Path.getPathWithoutSchemeAndAuthority(localPath); + } + public long getTimestamp() { return timestamp.get(); } @@ -234,7 +241,8 @@ private static class FetchSuccessTransition extends ResourceTransition { @Override public void transition(LocalizedResource rsrc, ResourceEvent event) { ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event; - rsrc.localPath = locEvent.getLocation(); + rsrc.localPath = + Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation()); rsrc.size = locEvent.getSize(); for (ContainerId container : rsrc.ref) { rsrc.dispatcher.getEventHandler().handle( @@ -291,4 +299,13 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) { rsrc.release(relEvent.getContainer()); } } + + private static class RecoveredTransition extends ResourceTransition { + @Override + public void transition(LocalizedResource rsrc, ResourceEvent event) { + ResourceRecoveredEvent recoveredEvent = (ResourceRecoveredEvent) event; + rsrc.localPath = recoveredEvent.getLocalPath(); + rsrc.size = recoveredEvent.getSize(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index de9bbdc994..554b368dd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -81,6 +82,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; @@ -109,10 +112,15 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -142,6 +150,7 @@ public class ResourceLocalizationService extends CompositeService private RecordFactory recordFactory; private final ScheduledExecutorService cacheCleanup; private LocalizerTokenSecretManager secretManager; + private NMStateStoreService stateStore; private LocalResourcesTracker publicRsrc; @@ -163,7 +172,7 @@ public class ResourceLocalizationService extends CompositeService public ResourceLocalizationService(Dispatcher dispatcher, ContainerExecutor exec, DeletionService delService, - LocalDirsHandlerService dirsHandler) { + LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) { super(ResourceLocalizationService.class.getName()); this.exec = exec; @@ -175,6 +184,7 @@ public ResourceLocalizationService(Dispatcher dispatcher, new ThreadFactoryBuilder() .setNameFormat("ResourceLocalizationService Cache Cleanup") .build()); + this.stateStore = stateStore; } FileContext getLocalFileContext(Configuration conf) { @@ -203,15 +213,17 @@ private void validateConf(Configuration conf) { @Override public void serviceInit(Configuration conf) throws Exception { this.validateConf(conf); - this.publicRsrc = - new LocalResourcesTrackerImpl(null, dispatcher, true, conf); + this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher, + true, conf, stateStore); this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); try { FileContext lfs = getLocalFileContext(conf); lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK)); - cleanUpLocalDir(lfs,delService); + if (!stateStore.canRecover()) { + cleanUpLocalDir(lfs,delService); + } List localDirs = dirsHandler.getLocalDirs(); for (String localDir : localDirs) { @@ -249,6 +261,74 @@ public void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); } + //Recover localized resources after an NM restart + public void recoverLocalizedResources(RecoveredLocalizationState state) + throws URISyntaxException { + LocalResourceTrackerState trackerState = state.getPublicTrackerState(); + recoverTrackerResources(publicRsrc, trackerState); + + for (Map.Entry userEntry : + state.getUserResources().entrySet()) { + String user = userEntry.getKey(); + RecoveredUserResources userResources = userEntry.getValue(); + trackerState = userResources.getPrivateTrackerState(); + if (!trackerState.isEmpty()) { + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + null, dispatcher, true, super.getConfig(), stateStore); + LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, + tracker); + if (oldTracker != null) { + tracker = oldTracker; + } + recoverTrackerResources(tracker, trackerState); + } + + for (Map.Entry appEntry : + userResources.getAppTrackerStates().entrySet()) { + trackerState = appEntry.getValue(); + if (!trackerState.isEmpty()) { + ApplicationId appId = appEntry.getKey(); + String appIdStr = ConverterUtils.toString(appId); + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + appId, dispatcher, false, super.getConfig(), stateStore); + LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr, + tracker); + if (oldTracker != null) { + tracker = oldTracker; + } + recoverTrackerResources(tracker, trackerState); + } + } + } + } + + private void recoverTrackerResources(LocalResourcesTracker tracker, + LocalResourceTrackerState state) throws URISyntaxException { + for (LocalizedResourceProto proto : state.getLocalizedResources()) { + LocalResource rsrc = new LocalResourcePBImpl(proto.getResource()); + LocalResourceRequest req = new LocalResourceRequest(rsrc); + LOG.info("Recovering localized resource " + req + " at " + + proto.getLocalPath()); + tracker.handle(new ResourceRecoveredEvent(req, + new Path(proto.getLocalPath()), proto.getSize())); + } + + for (Map.Entry entry : + state.getInProgressResources().entrySet()) { + LocalResource rsrc = new LocalResourcePBImpl(entry.getKey()); + LocalResourceRequest req = new LocalResourceRequest(rsrc); + Path localPath = entry.getValue(); + tracker.handle(new ResourceRecoveredEvent(req, localPath, 0)); + + // delete any in-progress localizations, containers will request again + LOG.info("Deleting in-progress localization for " + req + " at " + + localPath); + tracker.remove(tracker.getLocalizedResource(req), delService); + } + + // TODO: remove untracked directories in local filesystem + } + @Override public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) { return localizerTracker.processHeartbeat(status); @@ -337,17 +417,10 @@ private void handleInitApplicationResources(Application app) { // 0) Create application tracking structs String userName = app.getUser(); privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName, - dispatcher, true, super.getConfig())); - if (null != appRsrc.putIfAbsent( - ConverterUtils.toString(app.getAppId()), - new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super - .getConfig()))) { - LOG.warn("Initializing application " + app + " already present"); - assert false; // TODO: FIXME assert doesn't help - // ^ The condition is benign. Tests should fail and it - // should appear in logs, but it's an internal error - // that should have no effect on applications - } + null, dispatcher, true, super.getConfig(), stateStore)); + String appIdStr = ConverterUtils.toString(app.getAppId()); + appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(), + app.getAppId(), dispatcher, false, super.getConfig(), stateStore)); // 1) Signal container init // // This is handled by the ApplicationImpl state machine and allows @@ -446,18 +519,28 @@ private void handleCleanupContainerResources( @SuppressWarnings({"unchecked"}) private void handleDestroyApplicationResources(Application application) { - String userName; - String appIDStr; + String userName = application.getUser(); + ApplicationId appId = application.getAppId(); + String appIDStr = application.toString(); LocalResourcesTracker appLocalRsrcsTracker = - appRsrc.remove(ConverterUtils.toString(application.getAppId())); - if (null == appLocalRsrcsTracker) { + appRsrc.remove(ConverterUtils.toString(appId)); + if (appLocalRsrcsTracker != null) { + for (LocalizedResource rsrc : appLocalRsrcsTracker ) { + Path localPath = rsrc.getLocalPath(); + if (localPath != null) { + try { + stateStore.removeLocalizedResource(userName, appId, localPath); + } catch (IOException e) { + LOG.error("Unable to remove resource " + rsrc + " for " + appIDStr + + " from state store", e); + } + } + } + } else { LOG.warn("Removing uninitialized application " + application); } - // TODO: What to do with appLocalRsrcsTracker? // Delete the application directories - userName = application.getUser(); - appIDStr = application.toString(); for (String localDir : dirsHandler.getLocalDirs()) { // Delete the user-owned app-dir @@ -668,19 +751,15 @@ public void addResource(LocalizerResourceRequestEvent request) { if (rsrc.getState().equals(ResourceState.DOWNLOADING)) { LocalResource resource = request.getResource().getRequest(); try { - Path publicDirDestPath = + Path publicRootPath = dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR + ContainerLocalizer.FILECACHE, ContainerLocalizer.getEstimatedSize(resource), true); - Path hierarchicalPath = - publicRsrc.getPathForLocalization(key, publicDirDestPath); - if (!hierarchicalPath.equals(publicDirDestPath)) { - publicDirDestPath = hierarchicalPath; + Path publicDirDestPath = + publicRsrc.getPathForLocalization(key, publicRootPath); + if (!publicDirDestPath.getParent().equals(publicRootPath)) { DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); } - publicDirDestPath = - new Path(publicDirDestPath, Long.toString(publicRsrc - .nextUniqueNumber())); // explicitly synchronize pending here to avoid future task // completing and being dequeued before pending updated synchronized (pending) { @@ -968,9 +1047,8 @@ private Path getPathForLocalization(LocalResource rsrc) throws IOException, Path dirPath = dirsHandler.getLocalPathForWrite(cacheDirectory, ContainerLocalizer.getEstimatedSize(rsrc), false); - dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc), - dirPath); - return new Path (dirPath, Long.toString(tracker.nextUniqueNumber())); + return tracker.getPathForLocalization(new LocalResourceRequest(rsrc), + dirPath); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java index e657c0acf3..f9f6d9805c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java @@ -31,5 +31,7 @@ public enum ResourceEventType { /** See {@link ResourceReleaseEvent} */ RELEASE, /** See {@link ResourceFailedLocalizationEvent} */ - LOCALIZATION_FAILED + LOCALIZATION_FAILED, + /** See {@link ResourceRecoveredEvent} */ + RECOVERED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceRecoveredEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceRecoveredEvent.java new file mode 100644 index 0000000000..c453e930db --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceRecoveredEvent.java @@ -0,0 +1,43 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; + +public class ResourceRecoveredEvent extends ResourceEvent { + + private final Path localPath; + private final long size; + + public ResourceRecoveredEvent(LocalResourceRequest rsrc, Path localPath, + long size) { + super(rsrc, ResourceEventType.RECOVERED); + this.localPath = localPath; + this.size = size; + } + + public Path getLocalPath() { + return localPath; + } + + public long getSize() { + return size; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java new file mode 100644 index 0000000000..d124757ca6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.recovery; + +import static org.fusesource.leveldbjni.JniDBFactory.asString; +import static org.fusesource.leveldbjni.JniDBFactory.bytes; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.server.utils.LeveldbIterator; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.fusesource.leveldbjni.JniDBFactory; +import org.fusesource.leveldbjni.internal.NativeDB; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBException; +import org.iq80.leveldb.Logger; +import org.iq80.leveldb.Options; +import org.iq80.leveldb.WriteBatch; + +public class NMLeveldbStateStoreService extends NMStateStoreService { + + public static final Log LOG = + LogFactory.getLog(NMLeveldbStateStoreService.class); + + private static final String DB_NAME = "yarn-nm-state"; + private static final String DB_SCHEMA_VERSION_KEY = "schema-version"; + private static final String DB_SCHEMA_VERSION = "1.0"; + + private static final String LOCALIZATION_KEY_PREFIX = "Localization/"; + private static final String LOCALIZATION_PUBLIC_KEY_PREFIX = + LOCALIZATION_KEY_PREFIX + "public/"; + private static final String LOCALIZATION_PRIVATE_KEY_PREFIX = + LOCALIZATION_KEY_PREFIX + "private/"; + private static final String LOCALIZATION_STARTED_SUFFIX = "started/"; + private static final String LOCALIZATION_COMPLETED_SUFFIX = "completed/"; + private static final String LOCALIZATION_FILECACHE_SUFFIX = "filecache/"; + private static final String LOCALIZATION_APPCACHE_SUFFIX = "appcache/"; + + private DB db; + + public NMLeveldbStateStoreService() { + super(NMLeveldbStateStoreService.class.getName()); + } + + @Override + protected void startStorage() throws IOException { + } + + @Override + protected void closeStorage() throws IOException { + if (db != null) { + db.close(); + } + } + + + @Override + public RecoveredLocalizationState loadLocalizationState() + throws IOException { + RecoveredLocalizationState state = new RecoveredLocalizationState(); + + try { + LeveldbIterator iter = new LeveldbIterator(db); + iter.seek(bytes(LOCALIZATION_PUBLIC_KEY_PREFIX)); + state.publicTrackerState = loadResourceTrackerState(iter, + LOCALIZATION_PUBLIC_KEY_PREFIX); + + iter.seek(bytes(LOCALIZATION_PRIVATE_KEY_PREFIX)); + while (iter.hasNext()) { + Entry entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(LOCALIZATION_PRIVATE_KEY_PREFIX)) { + break; + } + + int userEndPos = key.indexOf('/', + LOCALIZATION_PRIVATE_KEY_PREFIX.length()); + if (userEndPos < 0) { + throw new IOException("Unable to determine user in resource key: " + + key); + } + String user = key.substring( + LOCALIZATION_PRIVATE_KEY_PREFIX.length(), userEndPos); + state.userResources.put(user, loadUserLocalizedResources(iter, + key.substring(0, userEndPos+1))); + } + } catch (DBException e) { + throw new IOException(e.getMessage(), e); + } + + return state; + } + + private LocalResourceTrackerState loadResourceTrackerState( + LeveldbIterator iter, String keyPrefix) throws IOException { + final String completedPrefix = keyPrefix + LOCALIZATION_COMPLETED_SUFFIX; + final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX; + LocalResourceTrackerState state = new LocalResourceTrackerState(); + while (iter.hasNext()) { + Entry entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + + if (key.startsWith(completedPrefix)) { + state.localizedResources = loadCompletedResources(iter, + completedPrefix); + } else if (key.startsWith(startedPrefix)) { + state.inProgressResources = loadStartedResources(iter, startedPrefix); + } else { + throw new IOException("Unexpected key in resource tracker state: " + + key); + } + } + + return state; + } + + private List loadCompletedResources( + LeveldbIterator iter, String keyPrefix) throws IOException { + List rsrcs = + new ArrayList(); + while (iter.hasNext()) { + Entry entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Loading completed resource from " + key); + } + rsrcs.add(LocalizedResourceProto.parseFrom(entry.getValue())); + iter.next(); + } + + return rsrcs; + } + + private Map loadStartedResources( + LeveldbIterator iter, String keyPrefix) throws IOException { + Map rsrcs = + new HashMap(); + while (iter.hasNext()) { + Entry entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + + Path localPath = new Path(key.substring(keyPrefix.length())); + if (LOG.isDebugEnabled()) { + LOG.debug("Loading in-progress resource at " + localPath); + } + rsrcs.put(LocalResourceProto.parseFrom(entry.getValue()), localPath); + iter.next(); + } + + return rsrcs; + } + + private RecoveredUserResources loadUserLocalizedResources( + LeveldbIterator iter, String keyPrefix) throws IOException { + RecoveredUserResources userResources = new RecoveredUserResources(); + while (iter.hasNext()) { + Entry entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + + if (key.startsWith(LOCALIZATION_FILECACHE_SUFFIX, keyPrefix.length())) { + userResources.privateTrackerState = loadResourceTrackerState(iter, + keyPrefix + LOCALIZATION_FILECACHE_SUFFIX); + } else if (key.startsWith(LOCALIZATION_APPCACHE_SUFFIX, + keyPrefix.length())) { + int appIdStartPos = keyPrefix.length() + + LOCALIZATION_APPCACHE_SUFFIX.length(); + int appIdEndPos = key.indexOf('/', appIdStartPos); + if (appIdEndPos < 0) { + throw new IOException("Unable to determine appID in resource key: " + + key); + } + ApplicationId appId = ConverterUtils.toApplicationId( + key.substring(appIdStartPos, appIdEndPos)); + userResources.appTrackerStates.put(appId, + loadResourceTrackerState(iter, key.substring(0, appIdEndPos+1))); + } else { + throw new IOException("Unexpected user resource key " + key); + } + } + return userResources; + } + + @Override + public void startResourceLocalization(String user, ApplicationId appId, + LocalResourceProto proto, Path localPath) throws IOException { + String key = getResourceStartedKey(user, appId, localPath.toString()); + try { + db.put(bytes(key), proto.toByteArray()); + } catch (DBException e) { + throw new IOException(e.getMessage(), e); + } + } + + @Override + public void finishResourceLocalization(String user, ApplicationId appId, + LocalizedResourceProto proto) throws IOException { + String localPath = proto.getLocalPath(); + String startedKey = getResourceStartedKey(user, appId, localPath); + String completedKey = getResourceCompletedKey(user, appId, localPath); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing localized resource to " + completedKey); + } + try { + WriteBatch batch = db.createWriteBatch(); + try { + batch.delete(bytes(startedKey)); + batch.put(bytes(completedKey), proto.toByteArray()); + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e.getMessage(), e); + } + } + + @Override + public void removeLocalizedResource(String user, ApplicationId appId, + Path localPath) throws IOException { + String localPathStr = localPath.toString(); + String startedKey = getResourceStartedKey(user, appId, localPathStr); + String completedKey = getResourceCompletedKey(user, appId, localPathStr); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing local resource at " + localPathStr); + } + try { + WriteBatch batch = db.createWriteBatch(); + try { + batch.delete(bytes(startedKey)); + batch.delete(bytes(completedKey)); + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e.getMessage(), e); + } + } + + private String getResourceStartedKey(String user, ApplicationId appId, + String localPath) { + return getResourceTrackerKeyPrefix(user, appId) + + LOCALIZATION_STARTED_SUFFIX + localPath; + } + + private String getResourceCompletedKey(String user, ApplicationId appId, + String localPath) { + return getResourceTrackerKeyPrefix(user, appId) + + LOCALIZATION_COMPLETED_SUFFIX + localPath; + } + + private String getResourceTrackerKeyPrefix(String user, + ApplicationId appId) { + if (user == null) { + return LOCALIZATION_PUBLIC_KEY_PREFIX; + } + if (appId == null) { + return LOCALIZATION_PRIVATE_KEY_PREFIX + user + "/" + + LOCALIZATION_FILECACHE_SUFFIX; + } + return LOCALIZATION_PRIVATE_KEY_PREFIX + user + "/" + + LOCALIZATION_APPCACHE_SUFFIX + appId + "/"; + } + + + @Override + protected void initStorage(Configuration conf) + throws IOException { + Path storeRoot = createStorageDir(conf); + Options options = new Options(); + options.createIfMissing(false); + options.logger(new LeveldbLogger()); + LOG.info("Using state database at " + storeRoot + " for recovery"); + File dbfile = new File(storeRoot.toString()); + byte[] schemaVersionData = null; + try { + db = JniDBFactory.factory.open(dbfile, options); + try { + schemaVersionData = db.get(bytes(DB_SCHEMA_VERSION_KEY)); + } catch (DBException e) { + throw new IOException(e.getMessage(), e); + } + } catch (NativeDB.DBException e) { + if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { + LOG.info("Creating state database at " + dbfile); + options.createIfMissing(true); + try { + db = JniDBFactory.factory.open(dbfile, options); + schemaVersionData = bytes(DB_SCHEMA_VERSION); + db.put(bytes(DB_SCHEMA_VERSION_KEY), schemaVersionData); + } catch (DBException dbErr) { + throw new IOException(dbErr.getMessage(), dbErr); + } + } else { + throw e; + } + } + if (schemaVersionData != null) { + String schemaVersion = asString(schemaVersionData); + // only support exact schema matches for now + if (!DB_SCHEMA_VERSION.equals(schemaVersion)) { + throw new IOException("Incompatible state database schema, found " + + schemaVersion + " expected " + DB_SCHEMA_VERSION); + } + } else { + throw new IOException("State database schema version not found"); + } + } + + private Path createStorageDir(Configuration conf) throws IOException { + final String storeUri = conf.get(YarnConfiguration.NM_RECOVERY_DIR); + if (storeUri == null) { + throw new IOException("No store location directory configured in " + + YarnConfiguration.NM_RECOVERY_DIR); + } + + Path root = new Path(storeUri, DB_NAME); + FileSystem fs = FileSystem.getLocal(conf); + fs.mkdirs(root, new FsPermission((short)0700)); + return root; + } + + + private static class LeveldbLogger implements Logger { + private static final Log LOG = LogFactory.getLog(LeveldbLogger.class); + + @Override + public void log(String message) { + LOG.info(message); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java new file mode 100644 index 0000000000..d41ddde629 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.recovery; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; + +// The state store to use when state isn't being stored +public class NMNullStateStoreService extends NMStateStoreService { + + public NMNullStateStoreService() { + super(NMNullStateStoreService.class.getName()); + } + + @Override + public boolean canRecover() { + return false; + } + + @Override + public RecoveredLocalizationState loadLocalizationState() + throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + + @Override + public void startResourceLocalization(String user, ApplicationId appId, + LocalResourceProto proto, Path localPath) throws IOException { + } + + @Override + public void finishResourceLocalization(String user, ApplicationId appId, + LocalizedResourceProto proto) throws IOException { + } + + @Override + public void removeLocalizedResource(String user, ApplicationId appId, + Path localPath) throws IOException { + } + + @Override + protected void initStorage(Configuration conf) throws IOException { + } + + @Override + protected void startStorage() throws IOException { + } + + @Override + protected void closeStorage() throws IOException { + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java new file mode 100644 index 0000000000..295fdb97b8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -0,0 +1,163 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.recovery; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; + +@Private +@Unstable +public abstract class NMStateStoreService extends AbstractService { + + public NMStateStoreService(String name) { + super(name); + } + + public static class LocalResourceTrackerState { + List localizedResources = + new ArrayList(); + Map inProgressResources = + new HashMap(); + + public List getLocalizedResources() { + return localizedResources; + } + + public Map getInProgressResources() { + return inProgressResources; + } + + public boolean isEmpty() { + return localizedResources.isEmpty() && inProgressResources.isEmpty(); + } + } + + public static class RecoveredUserResources { + LocalResourceTrackerState privateTrackerState = + new LocalResourceTrackerState(); + Map appTrackerStates = + new HashMap(); + + public LocalResourceTrackerState getPrivateTrackerState() { + return privateTrackerState; + } + + public Map + getAppTrackerStates() { + return appTrackerStates; + } + } + + public static class RecoveredLocalizationState { + LocalResourceTrackerState publicTrackerState = + new LocalResourceTrackerState(); + Map userResources = + new HashMap(); + + public LocalResourceTrackerState getPublicTrackerState() { + return publicTrackerState; + } + + public Map getUserResources() { + return userResources; + } + } + + /** Initialize the state storage */ + @Override + public void serviceInit(Configuration conf) throws IOException { + initStorage(conf); + } + + /** Start the state storage for use */ + @Override + public void serviceStart() throws IOException { + startStorage(); + } + + /** Shutdown the state storage. */ + @Override + public void serviceStop() throws IOException { + closeStorage(); + } + + public boolean canRecover() { + return true; + } + + + /** + * Load the state of localized resources + * @return recovered localized resource state + * @throws IOException + */ + public abstract RecoveredLocalizationState loadLocalizationState() + throws IOException; + + /** + * Record the start of localization for a resource + * @param user the username or null if the resource is public + * @param appId the application ID if the resource is app-specific or null + * @param proto the resource request + * @param localPath local filesystem path where the resource will be stored + * @throws IOException + */ + public abstract void startResourceLocalization(String user, + ApplicationId appId, LocalResourceProto proto, Path localPath) + throws IOException; + + /** + * Record the completion of a resource localization + * @param user the username or null if the resource is public + * @param appId the application ID if the resource is app-specific or null + * @param proto the serialized localized resource + * @throws IOException + */ + public abstract void finishResourceLocalization(String user, + ApplicationId appId, LocalizedResourceProto proto) throws IOException; + + /** + * Remove records related to a resource localization + * @param user the username or null if the resource is public + * @param appId the application ID if the resource is app-specific or null + * @param localPath local filesystem path where the resource will be stored + * @throws IOException + */ + public abstract void removeLocalizedResource(String user, + ApplicationId appId, Path localPath) throws IOException; + + + protected abstract void initStorage(Configuration conf) throws IOException; + + protected abstract void startStorage() throws IOException; + + protected abstract void closeStorage() throws IOException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto new file mode 100644 index 0000000000..bd1f74a965 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "YarnServerNodemanagerRecoveryProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_protos.proto"; + +message LocalizedResourceProto { + optional LocalResourceProto resource = 1; + optional string localPath = 2; + optional int64 size = 3; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index e6f0db2112..5753fb8f20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; public class DummyContainerManager extends ContainerManagerImpl { @@ -75,7 +76,7 @@ public DummyContainerManager(Context context, ContainerExecutor exec, protected ResourceLocalizationService createResourceLocalizationService( ContainerExecutor exec, DeletionService deletionContext) { return new ResourceLocalizationService(super.dispatcher, exec, - deletionContext, super.dirsHandler) { + deletionContext, super.dirsHandler, new NMNullStateStoreService()) { @Override public void handle(LocalizationEvent event) { switch (event.getType()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index 9cd8f956ed..fabb03bf3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -79,7 +80,8 @@ public void testSuccessfulContainerLaunch() throws InterruptedException, YarnConfiguration conf = new YarnConfiguration(); Context context = new NMContext(new NMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInNM(), null, null) { + new NMTokenSecretManagerInNM(), null, null, + new NMNullStateStoreService()) { @Override public int getHttpPort() { return 1234; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java index 2313baef76..c44f7b818a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java @@ -108,6 +108,36 @@ public void tearDown() throws IOException, InterruptedException { localFS.delete(new Path(basedir.getPath()), true); } + @Test + public void testStateStoreRemovalOnDecommission() throws IOException { + final File recoveryDir = new File(basedir, "nm-recovery"); + nm = new TestNodeManager(); + YarnConfiguration conf = createNMConfig(); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.NM_RECOVERY_DIR, recoveryDir.getAbsolutePath()); + + // verify state store is not removed on normal shutdown + nm.init(conf); + nm.start(); + Assert.assertTrue(recoveryDir.exists()); + Assert.assertTrue(recoveryDir.isDirectory()); + nm.stop(); + nm = null; + Assert.assertTrue(recoveryDir.exists()); + Assert.assertTrue(recoveryDir.isDirectory()); + + // verify state store is removed on decommissioned shutdown + nm = new TestNodeManager(); + nm.init(conf); + nm.start(); + Assert.assertTrue(recoveryDir.exists()); + Assert.assertTrue(recoveryDir.isDirectory()); + nm.getNMContext().setDecommissioned(true); + nm.stop(); + nm = null; + Assert.assertFalse(recoveryDir.exists()); + } + @Test public void testKillContainersOnShutdown() throws IOException, YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 3729ab1eb5..772bc05cb9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -91,6 +91,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @SuppressWarnings("rawtypes") public class TestNodeStatusUpdater { @@ -1159,7 +1161,8 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, @Override protected NMContext createNMContext( NMContainerTokenSecretManager containerTokenSecretManager, - NMTokenSecretManagerInNM nmTokenSecretManager) { + NMTokenSecretManagerInNM nmTokenSecretManager, + NMStateStoreService store) { return new MyNMContext(containerTokenSecretManager, nmTokenSecretManager); } @@ -1268,7 +1271,8 @@ private class MyNMContext extends NMContext { public MyNMContext( NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager) { - super(containerTokenSecretManager, nmTokenSecretManager, null, null); + super(containerTokenSecretManager, nmTokenSecretManager, null, null, + new NMNullStateStoreService()); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 8e4b0f33ac..fc92b36fd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -103,7 +104,8 @@ public BaseContainerManagerTest() throws UnsupportedFileSystemException { protected static final int HTTP_PORT = 5412; protected Configuration conf = new YarnConfiguration(); protected Context context = new NMContext(new NMContainerTokenSecretManager( - conf), new NMTokenSecretManagerInNM(), null, new ApplicationACLsManager(conf)) { + conf), new NMTokenSecretManagerInNM(), null, + new ApplicationACLsManager(conf), new NMNullStateStoreService()) { public int getHttpPort() { return HTTP_PORT; }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java index ffde6aeca2..503ce8c01c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheDirectoryManager.Directory; import org.junit.Test; public class TestLocalCacheDirectoryManager { @@ -73,7 +74,7 @@ public void testMinimumPerDirectoryFileLimit() { conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1"); Exception e = null; ResourceLocalizationService service = - new ResourceLocalizationService(null, null, null, null); + new ResourceLocalizationService(null, null, null, null, null); try { service.init(conf); } catch (Exception e1) { @@ -109,4 +110,49 @@ public void testDirectoryStateChangeFromFullToNonFull() { // first sub directory Assert.assertEquals(firstSubDir, dir.getRelativePathForLocalization()); } + + @Test + public void testDirectoryConversion() { + for (int i = 0; i < 10000; ++i) { + String path = Directory.getRelativePath(i); + Assert.assertEquals("Incorrect conversion for " + i, i, + Directory.getDirectoryNumber(path)); + } + } + + @Test + public void testIncrementFileCountForPath() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, + LocalCacheDirectoryManager.DIRECTORIES_PER_LEVEL + 2); + LocalCacheDirectoryManager mgr = new LocalCacheDirectoryManager(conf); + final String rootPath = ""; + mgr.incrementFileCountForPath(rootPath); + Assert.assertEquals(rootPath, mgr.getRelativePathForLocalization()); + Assert.assertFalse("root dir should be full", + rootPath.equals(mgr.getRelativePathForLocalization())); + // finish filling the other directory + mgr.getRelativePathForLocalization(); + // free up space in the root dir + mgr.decrementFileCountForPath(rootPath); + mgr.decrementFileCountForPath(rootPath); + Assert.assertEquals(rootPath, mgr.getRelativePathForLocalization()); + Assert.assertEquals(rootPath, mgr.getRelativePathForLocalization()); + String otherDir = mgr.getRelativePathForLocalization(); + Assert.assertFalse("root dir should be full", otherDir.equals(rootPath)); + + final String deepDir0 = "d/e/e/p/0"; + final String deepDir1 = "d/e/e/p/1"; + final String deepDir2 = "d/e/e/p/2"; + final String deepDir3 = "d/e/e/p/3"; + mgr.incrementFileCountForPath(deepDir0); + Assert.assertEquals(otherDir, mgr.getRelativePathForLocalization()); + Assert.assertEquals(deepDir0, mgr.getRelativePathForLocalization()); + Assert.assertEquals("total dir count incorrect after increment", + deepDir1, mgr.getRelativePathForLocalization()); + mgr.incrementFileCountForPath(deepDir2); + mgr.incrementFileCountForPath(deepDir1); + mgr.incrementFileCountForPath(deepDir2); + Assert.assertEquals(deepDir3, mgr.getRelativePathForLocalization()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index ef715a256b..23a57d60fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.any; import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -34,13 +35,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; @@ -52,10 +57,14 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Test; +import org.mockito.ArgumentCaptor; public class TestLocalResourcesTrackerImpl { @@ -92,8 +101,8 @@ public void test() { localrsrc.put(req1, lr1); localrsrc.put(req2, lr2); LocalResourcesTracker tracker = - new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, false, - conf); + new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc, + false, conf, new NMNullStateStoreService()); ResourceEvent req11Event = new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1); @@ -176,7 +185,8 @@ public void testConsistency() { ConcurrentMap localrsrc = new ConcurrentHashMap(); localrsrc.put(req1, lr1); LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - dispatcher, localrsrc, false, conf); + null, dispatcher, localrsrc, false, conf, + new NMNullStateStoreService()); ResourceEvent req11Event = new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1); @@ -246,7 +256,8 @@ public void testLocalResourceCache() { ConcurrentMap localrsrc = new ConcurrentHashMap(); LocalResourcesTracker tracker = - new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, true, conf); + new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc, + true, conf, new NMNullStateStoreService()); LocalResourceRequest lr = createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC); @@ -264,6 +275,7 @@ public void testLocalResourceCache() { // Container-1 requesting local resource. tracker.handle(reqEvent1); + dispatcher.await(); // New localized Resource should have been added to local resource map // and the requesting container will be added to its waiting queue. @@ -280,6 +292,7 @@ public void testLocalResourceCache() { ResourceEvent reqEvent2 = new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc2); tracker.handle(reqEvent2); + dispatcher.await(); // Container 2 should have been added to the waiting queue of the local // resource @@ -295,6 +308,7 @@ public void testLocalResourceCache() { LocalizedResource localizedResource = localrsrc.get(lr); tracker.handle(resourceFailedEvent); + dispatcher.await(); // After receiving failed resource event; all waiting containers will be // notified with Container Resource Failed Event. @@ -308,6 +322,7 @@ public void testLocalResourceCache() { // exception. ResourceReleaseEvent relEvent1 = new ResourceReleaseEvent(lr, cId1); tracker.handle(relEvent1); + dispatcher.await(); // Container-3 now requests for the same resource. This request call // is coming prior to Container-2's release call. @@ -316,6 +331,7 @@ public void testLocalResourceCache() { ResourceEvent reqEvent3 = new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc3); tracker.handle(reqEvent3); + dispatcher.await(); // Local resource cache now should have the requested resource and the // number of waiting containers should be 1. @@ -327,6 +343,7 @@ public void testLocalResourceCache() { // Container-2 Releases the resource ResourceReleaseEvent relEvent2 = new ResourceReleaseEvent(lr, cId2); tracker.handle(relEvent2); + dispatcher.await(); // Making sure that there is no change in the cache after the release. Assert.assertEquals(1, localrsrc.size()); @@ -340,6 +357,7 @@ public void testLocalResourceCache() { ResourceLocalizedEvent localizedEvent = new ResourceLocalizedEvent(lr, localizedPath, 123L); tracker.handle(localizedEvent); + dispatcher.await(); // Verifying ContainerResourceLocalizedEvent . verify(containerEventHandler, times(1)).handle( @@ -351,6 +369,7 @@ public void testLocalResourceCache() { // Container-3 releasing the resource. ResourceReleaseEvent relEvent3 = new ResourceReleaseEvent(lr, cId3); tracker.handle(relEvent3); + dispatcher.await(); Assert.assertEquals(0, localrsrc.get(lr).getRefCount()); @@ -384,7 +403,8 @@ public void testHierarchicalLocalCacheDirectories() { ConcurrentMap localrsrc = new ConcurrentHashMap(); LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - dispatcher, localrsrc, true, conf); + null, dispatcher, localrsrc, true, conf, + new NMNullStateStoreService()); // This is a random path. NO File creation will take place at this place. Path localDir = new Path("/tmp"); @@ -401,7 +421,9 @@ public void testHierarchicalLocalCacheDirectories() { tracker.handle(reqEvent1); // Simulate the process of localization of lr1 - Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir); + // NOTE: Localization path from tracker has resource ID at end + Path hierarchicalPath1 = + tracker.getPathForLocalization(lr1, localDir).getParent(); // Simulate lr1 getting localized ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(lr1, @@ -417,7 +439,8 @@ public void testHierarchicalLocalCacheDirectories() { new ResourceRequestEvent(lr2, LocalResourceVisibility.PUBLIC, lc1); tracker.handle(reqEvent2); - Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir); + Path hierarchicalPath2 = + tracker.getPathForLocalization(lr2, localDir).getParent(); // localization failed. ResourceFailedLocalizationEvent rfe2 = new ResourceFailedLocalizationEvent( @@ -435,7 +458,8 @@ public void testHierarchicalLocalCacheDirectories() { ResourceEvent reqEvent3 = new ResourceRequestEvent(lr3, LocalResourceVisibility.PUBLIC, lc1); tracker.handle(reqEvent3); - Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir); + Path hierarchicalPath3 = + tracker.getPathForLocalization(lr3, localDir).getParent(); // localization successful ResourceLocalizedEvent rle3 = new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri() @@ -479,6 +503,284 @@ public void testHierarchicalLocalCacheDirectories() { } } + @Test + @SuppressWarnings("unchecked") + public void testStateStoreSuccessfulLocalization() throws Exception { + final String user = "someuser"; + final ApplicationId appId = ApplicationId.newInstance(1, 1); + // This is a random path. NO File creation will take place at this place. + final Path localDir = new Path("/tmp"); + Configuration conf = new YarnConfiguration(); + DrainDispatcher dispatcher = null; + dispatcher = createDispatcher(conf); + EventHandler localizerEventHandler = + mock(EventHandler.class); + EventHandler containerEventHandler = + mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEventHandler); + DeletionService mockDelService = mock(DeletionService.class); + NMStateStoreService stateStore = mock(NMStateStoreService.class); + + try { + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + appId, dispatcher, false, conf, stateStore); + // Container 1 needs lr1 resource + ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); + LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1, + LocalResourceVisibility.APPLICATION); + LocalizerContext lc1 = new LocalizerContext(user, cId1, null); + + // Container 1 requests lr1 to be localized + ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1, + LocalResourceVisibility.APPLICATION, lc1); + tracker.handle(reqEvent1); + dispatcher.await(); + + // Simulate the process of localization of lr1 + Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir); + + ArgumentCaptor localResourceCaptor = + ArgumentCaptor.forClass(LocalResourceProto.class); + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + verify(stateStore).startResourceLocalization(eq(user), eq(appId), + localResourceCaptor.capture(), pathCaptor.capture()); + LocalResourceProto lrProto = localResourceCaptor.getValue(); + Path localizedPath1 = pathCaptor.getValue(); + Assert.assertEquals(lr1, + new LocalResourceRequest(new LocalResourcePBImpl(lrProto))); + Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent()); + + // Simulate lr1 getting localized + ResourceLocalizedEvent rle1 = + new ResourceLocalizedEvent(lr1, pathCaptor.getValue(), 120); + tracker.handle(rle1); + dispatcher.await(); + + ArgumentCaptor localizedProtoCaptor = + ArgumentCaptor.forClass(LocalizedResourceProto.class); + verify(stateStore).finishResourceLocalization(eq(user), eq(appId), + localizedProtoCaptor.capture()); + LocalizedResourceProto localizedProto = localizedProtoCaptor.getValue(); + Assert.assertEquals(lr1, new LocalResourceRequest( + new LocalResourcePBImpl(localizedProto.getResource()))); + Assert.assertEquals(localizedPath1.toString(), + localizedProto.getLocalPath()); + LocalizedResource localizedRsrc1 = tracker.getLocalizedResource(lr1); + Assert.assertNotNull(localizedRsrc1); + + // simulate release and retention processing + tracker.handle(new ResourceReleaseEvent(lr1, cId1)); + dispatcher.await(); + boolean removeResult = tracker.remove(localizedRsrc1, mockDelService); + + Assert.assertTrue(removeResult); + verify(stateStore).removeLocalizedResource(eq(user), eq(appId), + eq(localizedPath1)); + } finally { + if (dispatcher != null) { + dispatcher.stop(); + } + } + } + + @Test + @SuppressWarnings("unchecked") + public void testStateStoreFailedLocalization() throws Exception { + final String user = "someuser"; + final ApplicationId appId = ApplicationId.newInstance(1, 1); + // This is a random path. NO File creation will take place at this place. + final Path localDir = new Path("/tmp"); + Configuration conf = new YarnConfiguration(); + DrainDispatcher dispatcher = null; + dispatcher = createDispatcher(conf); + EventHandler localizerEventHandler = + mock(EventHandler.class); + EventHandler containerEventHandler = + mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEventHandler); + NMStateStoreService stateStore = mock(NMStateStoreService.class); + + try { + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + appId, dispatcher, false, conf, stateStore); + // Container 1 needs lr1 resource + ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); + LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1, + LocalResourceVisibility.APPLICATION); + LocalizerContext lc1 = new LocalizerContext(user, cId1, null); + + // Container 1 requests lr1 to be localized + ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1, + LocalResourceVisibility.APPLICATION, lc1); + tracker.handle(reqEvent1); + dispatcher.await(); + + // Simulate the process of localization of lr1 + Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir); + + ArgumentCaptor localResourceCaptor = + ArgumentCaptor.forClass(LocalResourceProto.class); + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + verify(stateStore).startResourceLocalization(eq(user), eq(appId), + localResourceCaptor.capture(), pathCaptor.capture()); + LocalResourceProto lrProto = localResourceCaptor.getValue(); + Path localizedPath1 = pathCaptor.getValue(); + Assert.assertEquals(lr1, + new LocalResourceRequest(new LocalResourcePBImpl(lrProto))); + Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent()); + + ResourceFailedLocalizationEvent rfe1 = + new ResourceFailedLocalizationEvent( + lr1, new Exception("Test").toString()); + tracker.handle(rfe1); + dispatcher.await(); + verify(stateStore).removeLocalizedResource(eq(user), eq(appId), + eq(localizedPath1)); + } finally { + if (dispatcher != null) { + dispatcher.stop(); + } + } + } + + @Test + @SuppressWarnings("unchecked") + public void testRecoveredResource() throws Exception { + final String user = "someuser"; + final ApplicationId appId = ApplicationId.newInstance(1, 1); + // This is a random path. NO File creation will take place at this place. + final Path localDir = new Path("/tmp/localdir"); + Configuration conf = new YarnConfiguration(); + DrainDispatcher dispatcher = null; + dispatcher = createDispatcher(conf); + EventHandler localizerEventHandler = + mock(EventHandler.class); + EventHandler containerEventHandler = + mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEventHandler); + NMStateStoreService stateStore = mock(NMStateStoreService.class); + + try { + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + appId, dispatcher, false, conf, stateStore); + // Container 1 needs lr1 resource + ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); + LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1, + LocalResourceVisibility.APPLICATION); + Assert.assertNull(tracker.getLocalizedResource(lr1)); + final long localizedId1 = 52; + Path hierarchicalPath1 = new Path(localDir, + Long.toString(localizedId1)); + Path localizedPath1 = new Path(hierarchicalPath1, "resource.jar"); + tracker.handle(new ResourceRecoveredEvent(lr1, localizedPath1, 120)); + dispatcher.await(); + Assert.assertNotNull(tracker.getLocalizedResource(lr1)); + + // verify new paths reflect recovery of previous resources + LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2, + LocalResourceVisibility.APPLICATION); + LocalizerContext lc2 = new LocalizerContext(user, cId1, null); + ResourceEvent reqEvent2 = new ResourceRequestEvent(lr2, + LocalResourceVisibility.APPLICATION, lc2); + tracker.handle(reqEvent2); + dispatcher.await(); + Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir); + long localizedId2 = Long.parseLong(hierarchicalPath2.getName()); + Assert.assertEquals(localizedId1 + 1, localizedId2); + } finally { + if (dispatcher != null) { + dispatcher.stop(); + } + } + } + + @Test + @SuppressWarnings("unchecked") + public void testRecoveredResourceWithDirCacheMgr() throws Exception { + final String user = "someuser"; + final ApplicationId appId = ApplicationId.newInstance(1, 1); + // This is a random path. NO File creation will take place at this place. + final Path localDirRoot = new Path("/tmp/localdir"); + Configuration conf = new YarnConfiguration(); + DrainDispatcher dispatcher = null; + dispatcher = createDispatcher(conf); + EventHandler localizerEventHandler = + mock(EventHandler.class); + EventHandler containerEventHandler = + mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEventHandler); + NMStateStoreService stateStore = mock(NMStateStoreService.class); + + try { + LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user, + appId, dispatcher, true, conf, stateStore); + LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1, + LocalResourceVisibility.PUBLIC); + Assert.assertNull(tracker.getLocalizedResource(lr1)); + final long localizedId1 = 52; + Path hierarchicalPath1 = new Path(localDirRoot + "/4/2", + Long.toString(localizedId1)); + Path localizedPath1 = new Path(hierarchicalPath1, "resource.jar"); + tracker.handle(new ResourceRecoveredEvent(lr1, localizedPath1, 120)); + dispatcher.await(); + Assert.assertNotNull(tracker.getLocalizedResource(lr1)); + LocalCacheDirectoryManager dirMgrRoot = + tracker.getDirectoryManager(localDirRoot); + Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount()); + Assert.assertEquals(1, dirMgrRoot.getDirectory("4/2").getCount()); + + LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2, + LocalResourceVisibility.PUBLIC); + Assert.assertNull(tracker.getLocalizedResource(lr2)); + final long localizedId2 = localizedId1 + 1; + Path hierarchicalPath2 = new Path(localDirRoot + "/4/2", + Long.toString(localizedId2)); + Path localizedPath2 = new Path(hierarchicalPath2, "resource.jar"); + tracker.handle(new ResourceRecoveredEvent(lr2, localizedPath2, 120)); + dispatcher.await(); + Assert.assertNotNull(tracker.getLocalizedResource(lr2)); + Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount()); + Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount()); + + LocalResourceRequest lr3 = createLocalResourceRequest(user, 3, 3, + LocalResourceVisibility.PUBLIC); + Assert.assertNull(tracker.getLocalizedResource(lr3)); + final long localizedId3 = 128; + Path hierarchicalPath3 = new Path(localDirRoot + "/4/3", + Long.toString(localizedId3)); + Path localizedPath3 = new Path(hierarchicalPath3, "resource.jar"); + tracker.handle(new ResourceRecoveredEvent(lr3, localizedPath3, 120)); + dispatcher.await(); + Assert.assertNotNull(tracker.getLocalizedResource(lr3)); + Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount()); + Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount()); + Assert.assertEquals(1, dirMgrRoot.getDirectory("4/3").getCount()); + + LocalResourceRequest lr4 = createLocalResourceRequest(user, 4, 4, + LocalResourceVisibility.PUBLIC); + Assert.assertNull(tracker.getLocalizedResource(lr4)); + final long localizedId4 = 256; + Path hierarchicalPath4 = new Path(localDirRoot + "/4", + Long.toString(localizedId4)); + Path localizedPath4 = new Path(hierarchicalPath4, "resource.jar"); + tracker.handle(new ResourceRecoveredEvent(lr4, localizedPath4, 120)); + dispatcher.await(); + Assert.assertNotNull(tracker.getLocalizedResource(lr4)); + Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount()); + Assert.assertEquals(1, dirMgrRoot.getDirectory("4").getCount()); + Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount()); + Assert.assertEquals(1, dirMgrRoot.getDirectory("4/3").getCount()); + } finally { + if (dispatcher != null) { + dispatcher.stop(); + } + } + } + private boolean createdummylocalizefile(Path path) { boolean ret = false; File file = new File(path.toUri().getRawPath().toString()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index a19eef240e..ed59ddd841 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; @@ -120,6 +122,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; @@ -188,7 +194,8 @@ public void testLocalizationInit() throws Exception { ResourceLocalizationService locService = spy(new ResourceLocalizationService(dispatcher, exec, delService, - diskhandler)); + diskhandler, + new NMNullStateStoreService())); doReturn(lfs) .when(locService).getLocalFileContext(isA(Configuration.class)); try { @@ -253,7 +260,8 @@ public void testResourceRelease() throws Exception { ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher, exec, delService, - dirsHandler); + dirsHandler, + new NMNullStateStoreService()); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker( @@ -287,7 +295,7 @@ public void testResourceRelease() throws Exception { user, appId); // init container. - final Container c = getMockContainer(appId, 42); + final Container c = getMockContainer(appId, 42, user); // init resources Random r = new Random(); @@ -402,6 +410,233 @@ public void testResourceRelease() throws Exception { } } + @Test + @SuppressWarnings("unchecked") // mocked generics + public void testRecovery() throws Exception { + final String user1 = "user1"; + final String user2 = "user2"; + final ApplicationId appId1 = ApplicationId.newInstance(1, 1); + final ApplicationId appId2 = ApplicationId.newInstance(1, 2); + + List localDirs = new ArrayList(); + String[] sDirs = new String[4]; + for (int i = 0; i < 4; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + + NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + EventHandler applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + //Ignore actual localization + EventHandler localizerBus = mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerBus); + + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + dirsHandler.init(conf); + + ResourceLocalizationService spyService = + createSpyService(dispatcher, dirsHandler, stateStore); + try { + spyService.init(conf); + spyService.start(); + + final Application app1 = mock(Application.class); + when(app1.getUser()).thenReturn(user1); + when(app1.getAppId()).thenReturn(appId1); + final Application app2 = mock(Application.class); + when(app2.getUser()).thenReturn(user2); + when(app2.getAppId()).thenReturn(appId2); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app1)); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app2)); + dispatcher.await(); + + //Get a handle on the trackers after they're setup with INIT_APP_RESOURCES + LocalResourcesTracker appTracker1 = + spyService.getLocalResourcesTracker( + LocalResourceVisibility.APPLICATION, user1, appId1); + LocalResourcesTracker privTracker1 = + spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE, + user1, null); + LocalResourcesTracker appTracker2 = + spyService.getLocalResourcesTracker( + LocalResourceVisibility.APPLICATION, user2, appId2); + LocalResourcesTracker pubTracker = + spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, + null, null); + + // init containers + final Container c1 = getMockContainer(appId1, 1, user1); + final Container c2 = getMockContainer(appId2, 2, user2); + + // init resources + Random r = new Random(); + long seed = r.nextLong(); + System.out.println("SEED: " + seed); + r.setSeed(seed); + + // Send localization requests of each type. + final LocalResource privResource1 = getPrivateMockedResource(r); + final LocalResourceRequest privReq1 = + new LocalResourceRequest(privResource1); + final LocalResource privResource2 = getPrivateMockedResource(r); + final LocalResourceRequest privReq2 = + new LocalResourceRequest(privResource2); + + final LocalResource pubResource1 = getPublicMockedResource(r); + final LocalResourceRequest pubReq1 = + new LocalResourceRequest(pubResource1); + final LocalResource pubResource2 = getPublicMockedResource(r); + final LocalResourceRequest pubReq2 = + new LocalResourceRequest(pubResource2); + + final LocalResource appResource1 = getAppMockedResource(r); + final LocalResourceRequest appReq1 = + new LocalResourceRequest(appResource1); + final LocalResource appResource2 = getAppMockedResource(r); + final LocalResourceRequest appReq2 = + new LocalResourceRequest(appResource2); + final LocalResource appResource3 = getAppMockedResource(r); + final LocalResourceRequest appReq3 = + new LocalResourceRequest(appResource3); + + Map> req1 = + new HashMap>(); + req1.put(LocalResourceVisibility.PRIVATE, + Arrays.asList(new LocalResourceRequest[] { privReq1, privReq2 })); + req1.put(LocalResourceVisibility.PUBLIC, + Collections.singletonList(pubReq1)); + req1.put(LocalResourceVisibility.APPLICATION, + Collections.singletonList(appReq1)); + + Map> req2 = + new HashMap>(); + req2.put(LocalResourceVisibility.APPLICATION, + Arrays.asList(new LocalResourceRequest[] { appReq2, appReq3 })); + req2.put(LocalResourceVisibility.PUBLIC, + Collections.singletonList(pubReq2)); + + // Send Request event + spyService.handle(new ContainerLocalizationRequestEvent(c1, req1)); + spyService.handle(new ContainerLocalizationRequestEvent(c2, req2)); + dispatcher.await(); + + // Simulate start of localization for all resources + privTracker1.getPathForLocalization(privReq1, + dirsHandler.getLocalPathForWrite( + ContainerLocalizer.USERCACHE + user1)); + privTracker1.getPathForLocalization(privReq2, + dirsHandler.getLocalPathForWrite( + ContainerLocalizer.USERCACHE + user1)); + LocalizedResource privLr1 = privTracker1.getLocalizedResource(privReq1); + LocalizedResource privLr2 = privTracker1.getLocalizedResource(privReq2); + appTracker1.getPathForLocalization(appReq1, + dirsHandler.getLocalPathForWrite( + ContainerLocalizer.APPCACHE + appId1)); + LocalizedResource appLr1 = appTracker1.getLocalizedResource(appReq1); + appTracker2.getPathForLocalization(appReq2, + dirsHandler.getLocalPathForWrite( + ContainerLocalizer.APPCACHE + appId2)); + LocalizedResource appLr2 = appTracker2.getLocalizedResource(appReq2); + appTracker2.getPathForLocalization(appReq3, + dirsHandler.getLocalPathForWrite( + ContainerLocalizer.APPCACHE + appId2)); + LocalizedResource appLr3 = appTracker2.getLocalizedResource(appReq3); + pubTracker.getPathForLocalization(pubReq1, + dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE)); + LocalizedResource pubLr1 = pubTracker.getLocalizedResource(pubReq1); + pubTracker.getPathForLocalization(pubReq2, + dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE)); + LocalizedResource pubLr2 = pubTracker.getLocalizedResource(pubReq2); + + // Simulate completion of localization for most resources with + // possibly different sizes than in the request + assertNotNull("Localization not started", privLr1.getLocalPath()); + privTracker1.handle(new ResourceLocalizedEvent(privReq1, + privLr1.getLocalPath(), privLr1.getSize() + 5)); + assertNotNull("Localization not started", privLr2.getLocalPath()); + privTracker1.handle(new ResourceLocalizedEvent(privReq2, + privLr2.getLocalPath(), privLr2.getSize() + 10)); + assertNotNull("Localization not started", appLr1.getLocalPath()); + appTracker1.handle(new ResourceLocalizedEvent(appReq1, + appLr1.getLocalPath(), appLr1.getSize())); + assertNotNull("Localization not started", appLr3.getLocalPath()); + appTracker2.handle(new ResourceLocalizedEvent(appReq3, + appLr3.getLocalPath(), appLr3.getSize() + 7)); + assertNotNull("Localization not started", pubLr1.getLocalPath()); + pubTracker.handle(new ResourceLocalizedEvent(pubReq1, + pubLr1.getLocalPath(), pubLr1.getSize() + 1000)); + assertNotNull("Localization not started", pubLr2.getLocalPath()); + pubTracker.handle(new ResourceLocalizedEvent(pubReq2, + pubLr2.getLocalPath(), pubLr2.getSize() + 99999)); + + dispatcher.await(); + assertEquals(ResourceState.LOCALIZED, privLr1.getState()); + assertEquals(ResourceState.LOCALIZED, privLr2.getState()); + assertEquals(ResourceState.LOCALIZED, appLr1.getState()); + assertEquals(ResourceState.DOWNLOADING, appLr2.getState()); + assertEquals(ResourceState.LOCALIZED, appLr3.getState()); + assertEquals(ResourceState.LOCALIZED, pubLr1.getState()); + assertEquals(ResourceState.LOCALIZED, pubLr2.getState()); + + // restart and recover + spyService = createSpyService(dispatcher, dirsHandler, stateStore); + spyService.init(conf); + spyService.recoverLocalizedResources( + stateStore.loadLocalizationState()); + dispatcher.await(); + + appTracker1 = spyService.getLocalResourcesTracker( + LocalResourceVisibility.APPLICATION, user1, appId1); + privTracker1 = spyService.getLocalResourcesTracker( + LocalResourceVisibility.PRIVATE, user1, null); + appTracker2 = spyService.getLocalResourcesTracker( + LocalResourceVisibility.APPLICATION, user2, appId2); + pubTracker = spyService.getLocalResourcesTracker( + LocalResourceVisibility.PUBLIC, null, null); + + LocalizedResource recoveredRsrc = + privTracker1.getLocalizedResource(privReq1); + assertEquals(privReq1, recoveredRsrc.getRequest()); + assertEquals(privLr1.getLocalPath(), recoveredRsrc.getLocalPath()); + assertEquals(privLr1.getSize(), recoveredRsrc.getSize()); + assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState()); + recoveredRsrc = privTracker1.getLocalizedResource(privReq2); + assertEquals(privReq2, recoveredRsrc.getRequest()); + assertEquals(privLr2.getLocalPath(), recoveredRsrc.getLocalPath()); + assertEquals(privLr2.getSize(), recoveredRsrc.getSize()); + assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState()); + recoveredRsrc = appTracker1.getLocalizedResource(appReq1); + assertEquals(appReq1, recoveredRsrc.getRequest()); + assertEquals(appLr1.getLocalPath(), recoveredRsrc.getLocalPath()); + assertEquals(appLr1.getSize(), recoveredRsrc.getSize()); + assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState()); + recoveredRsrc = appTracker2.getLocalizedResource(appReq2); + assertNull("in-progress resource should not be present", recoveredRsrc); + recoveredRsrc = appTracker2.getLocalizedResource(appReq3); + assertEquals(appReq3, recoveredRsrc.getRequest()); + assertEquals(appLr3.getLocalPath(), recoveredRsrc.getLocalPath()); + assertEquals(appLr3.getSize(), recoveredRsrc.getSize()); + assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState()); + } finally { + dispatcher.stop(); + stateStore.close(); + } + } + @Test( timeout = 10000) @SuppressWarnings("unchecked") // mocked generics public void testLocalizationHeartbeat() throws Exception { @@ -436,7 +671,8 @@ public void testLocalizationHeartbeat() throws Exception { ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher, exec, delService, - dirsHandler); + dirsHandler, + new NMNullStateStoreService()); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class)); @@ -469,7 +705,7 @@ public boolean matches(Object o) { long seed = r.nextLong(); System.out.println("SEED: " + seed); r.setSeed(seed); - final Container c = getMockContainer(appId, 42); + final Container c = getMockContainer(appId, 42, "user0"); FSDataOutputStream out = new FSDataOutputStream(new DataOutputBuffer(), null); doReturn(out).when(spylfs).createInternal(isA(Path.class), @@ -616,7 +852,8 @@ public void testFailedPublicResource() throws Exception { try { ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher, exec, delService, - dirsHandler); + dirsHandler, + new NMNullStateStoreService()); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); doReturn(lfs).when(spyService).getLocalFileContext( @@ -637,7 +874,7 @@ public void testFailedPublicResource() throws Exception { dispatcher.await(); // init container. - final Container c = getMockContainer(appId, 42); + final Container c = getMockContainer(appId, 42, user); // init resources Random r = new Random(); @@ -725,7 +962,7 @@ public void testPublicResourceAddResourceExceptions() throws Exception { try { ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher, exec, delService, - dirsHandlerSpy); + dirsHandlerSpy, new NMNullStateStoreService()); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); doReturn(lfs).when(spyService).getLocalFileContext( @@ -758,7 +995,7 @@ public void testPublicResourceAddResourceExceptions() throws Exception { .put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq)); // init container. - final Container c = getMockContainer(appId, 42); + final Container c = getMockContainer(appId, 42, user); // first test ioexception Mockito @@ -838,7 +1075,7 @@ public void testParallelDownloadAttemptsForPrivateResource() throws Exception { ResourceLocalizationService rls = new ResourceLocalizationService(dispatcher1, exec, delService, - localDirHandler); + localDirHandler, new NMNullStateStoreService()); dispatcher1.register(LocalizationEventType.class, rls); rls.init(conf); @@ -991,7 +1228,7 @@ public void testLocalResourcePath() throws Exception { ResourceLocalizationService rls = new ResourceLocalizationService(dispatcher1, exec, delService, - localDirHandler); + localDirHandler, new NMNullStateStoreService()); dispatcher1.register(LocalizationEventType.class, rls); rls.init(conf); @@ -1157,7 +1394,7 @@ public void testParallelDownloadAttemptsForPublicResource() throws Exception { // it as otherwise it will remove requests from pending queue. ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher1, exec, delService, - dirsHandler); + dirsHandler, new NMNullStateStoreService()); ResourceLocalizationService spyService = spy(rawService); dispatcher1.register(LocalizationEventType.class, spyService); spyService.init(conf); @@ -1424,12 +1661,13 @@ private static LocalResource getPrivateMockedResource(Random r) { return getMockedResource(r, LocalResourceVisibility.PRIVATE); } - private static Container getMockContainer(ApplicationId appId, int id) { + private static Container getMockContainer(ApplicationId appId, int id, + String user) { Container c = mock(Container.class); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1); ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id); - when(c.getUser()).thenReturn("user0"); + when(c.getUser()).thenReturn(user); when(c.getContainerId()).thenReturn(cId); Credentials creds = new Credentials(); creds.addToken(new Text("tok" + id), getToken(id)); @@ -1438,6 +1676,24 @@ private static Container getMockContainer(ApplicationId appId, int id) { return c; } + private ResourceLocalizationService createSpyService( + DrainDispatcher dispatcher, LocalDirsHandlerService dirsHandler, + NMStateStoreService stateStore) { + ContainerExecutor exec = mock(ContainerExecutor.class); + LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class); + DeletionService delService = mock(DeletionService.class); + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher, exec, delService, + dirsHandler, stateStore); + ResourceLocalizationService spyService = spy(rawService); + doReturn(mockServer).when(spyService).createServer(); + doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker( + isA(Configuration.class)); + doReturn(lfs).when(spyService) + .getLocalFileContext(isA(Configuration.class)); + return spyService; + } + @SuppressWarnings({ "unchecked", "rawtypes" }) static Token getToken(int id) { return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java index f3f7cc5067..0e3bf86bfb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java @@ -26,11 +26,13 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; - +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.junit.Test; + import static org.junit.Assert.*; import org.mockito.ArgumentCaptor; + import static org.mockito.Mockito.*; public class TestResourceRetention { @@ -81,7 +83,7 @@ LocalResourcesTracker createMockTracker(String user, final long rsrcSize, ConcurrentMap trackerResources = new ConcurrentHashMap(); LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null, - trackerResources, false, conf)); + null, trackerResources, false, conf, new NMNullStateStoreService())); for (int i = 0; i < nRsrcs; ++i) { final LocalResourceRequest req = new LocalResourceRequest( new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java new file mode 100644 index 0000000000..a146e7b4c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.recovery; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; + +public class NMMemoryStateStoreService extends NMStateStoreService { + private Map trackerStates; + + public NMMemoryStateStoreService() { + super(NMMemoryStateStoreService.class.getName()); + } + + private LocalResourceTrackerState loadTrackerState(TrackerState ts) { + LocalResourceTrackerState result = new LocalResourceTrackerState(); + result.localizedResources.addAll(ts.localizedResources.values()); + for (Map.Entry entry : + ts.inProgressMap.entrySet()) { + result.inProgressResources.put(entry.getValue(), entry.getKey()); + } + return result; + } + + private TrackerState getTrackerState(TrackerKey key) { + TrackerState ts = trackerStates.get(key); + if (ts == null) { + ts = new TrackerState(); + trackerStates.put(key, ts); + } + return ts; + } + + @Override + public synchronized RecoveredLocalizationState loadLocalizationState() { + RecoveredLocalizationState result = new RecoveredLocalizationState(); + for (Map.Entry e : trackerStates.entrySet()) { + TrackerKey tk = e.getKey(); + TrackerState ts = e.getValue(); + // check what kind of tracker state we have and recover appropriately + // public trackers have user == null + // private trackers have a valid user but appId == null + // app-specific trackers have a valid user and valid appId + if (tk.user == null) { + result.publicTrackerState = loadTrackerState(ts); + } else { + RecoveredUserResources rur = result.userResources.get(tk.user); + if (rur == null) { + rur = new RecoveredUserResources(); + result.userResources.put(tk.user, rur); + } + if (tk.appId == null) { + rur.privateTrackerState = loadTrackerState(ts); + } else { + rur.appTrackerStates.put(tk.appId, loadTrackerState(ts)); + } + } + } + return result; + } + + @Override + public synchronized void startResourceLocalization(String user, + ApplicationId appId, LocalResourceProto proto, Path localPath) { + TrackerState ts = getTrackerState(new TrackerKey(user, appId)); + ts.inProgressMap.put(localPath, proto); + } + + @Override + public synchronized void finishResourceLocalization(String user, + ApplicationId appId, LocalizedResourceProto proto) { + TrackerState ts = getTrackerState(new TrackerKey(user, appId)); + Path localPath = new Path(proto.getLocalPath()); + ts.inProgressMap.remove(localPath); + ts.localizedResources.put(localPath, proto); + } + + @Override + public synchronized void removeLocalizedResource(String user, + ApplicationId appId, Path localPath) { + TrackerState ts = trackerStates.get(new TrackerKey(user, appId)); + if (ts != null) { + ts.inProgressMap.remove(localPath); + ts.localizedResources.remove(localPath); + } + } + + @Override + protected void initStorage(Configuration conf) { + trackerStates = new HashMap(); + } + + @Override + protected void startStorage() { + } + + @Override + protected void closeStorage() { + } + + + private static class TrackerState { + Map inProgressMap = + new HashMap(); + Map localizedResources = + new HashMap(); + } + + private static class TrackerKey { + String user; + ApplicationId appId; + + public TrackerKey(String user, ApplicationId appId) { + this.user = user; + this.appId = appId; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((appId == null) ? 0 : appId.hashCode()); + result = prime * result + ((user == null) ? 0 : user.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (!(obj instanceof TrackerKey)) + return false; + TrackerKey other = (TrackerKey) obj; + if (appId == null) { + if (other.appId != null) + return false; + } else if (!appId.equals(other.appId)) + return false; + if (user == null) { + if (other.user != null) + return false; + } else if (!user.equals(other.user)) + return false; + return true; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java new file mode 100644 index 0000000000..c970c1c3d1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -0,0 +1,407 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.recovery; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestNMLeveldbStateStoreService { + private static final File TMP_DIR = new File( + System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + TestNMLeveldbStateStoreService.class.getName()); + + YarnConfiguration conf; + NMLeveldbStateStoreService stateStore; + + @Before + public void setup() throws IOException { + FileUtil.fullyDelete(TMP_DIR); + conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.NM_RECOVERY_DIR, TMP_DIR.toString()); + restartStateStore(); + } + + @After + public void cleanup() throws IOException { + if (stateStore != null) { + stateStore.close(); + } + FileUtil.fullyDelete(TMP_DIR); + } + + private void restartStateStore() throws IOException { + // need to close so leveldb releases database lock + if (stateStore != null) { + stateStore.close(); + } + stateStore = new NMLeveldbStateStoreService(); + stateStore.init(conf); + stateStore.start(); + } + + private void verifyEmptyState() throws IOException { + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + assertNotNull(state); + LocalResourceTrackerState pubts = state.getPublicTrackerState(); + assertNotNull(pubts); + assertTrue(pubts.getLocalizedResources().isEmpty()); + assertTrue(pubts.getInProgressResources().isEmpty()); + assertTrue(state.getUserResources().isEmpty()); + } + + @Test + public void testEmptyState() throws IOException { + assertTrue(stateStore.canRecover()); + verifyEmptyState(); + } + + @Test + public void testStartResourceLocalization() throws IOException { + String user = "somebody"; + ApplicationId appId = ApplicationId.newInstance(1, 1); + + // start a local resource for an application + Path appRsrcPath = new Path("hdfs://some/app/resource"); + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) + LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(appRsrcPath), + LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION, + 123L, 456L); + LocalResourceProto appRsrcProto = rsrcPb.getProto(); + Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc"); + stateStore.startResourceLocalization(user, appId, appRsrcProto, + appRsrcLocalPath); + + // restart and verify only app resource is marked in-progress + restartStateStore(); + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + LocalResourceTrackerState pubts = state.getPublicTrackerState(); + assertTrue(pubts.getLocalizedResources().isEmpty()); + assertTrue(pubts.getInProgressResources().isEmpty()); + Map userResources = + state.getUserResources(); + assertEquals(1, userResources.size()); + RecoveredUserResources rur = userResources.get(user); + LocalResourceTrackerState privts = rur.getPrivateTrackerState(); + assertNotNull(privts); + assertTrue(privts.getLocalizedResources().isEmpty()); + assertTrue(privts.getInProgressResources().isEmpty()); + assertEquals(1, rur.getAppTrackerStates().size()); + LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId); + assertNotNull(appts); + assertTrue(appts.getLocalizedResources().isEmpty()); + assertEquals(1, appts.getInProgressResources().size()); + assertEquals(appRsrcLocalPath, + appts.getInProgressResources().get(appRsrcProto)); + + // start some public and private resources + Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); + rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(pubRsrcPath1), + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, + 789L, 135L); + LocalResourceProto pubRsrcProto1 = rsrcPb.getProto(); + Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1"); + stateStore.startResourceLocalization(null, null, pubRsrcProto1, + pubRsrcLocalPath1); + Path pubRsrcPath2 = new Path("hdfs://some/public/resource2"); + rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(pubRsrcPath2), + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, + 789L, 135L); + LocalResourceProto pubRsrcProto2 = rsrcPb.getProto(); + Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2"); + stateStore.startResourceLocalization(null, null, pubRsrcProto2, + pubRsrcLocalPath2); + Path privRsrcPath = new Path("hdfs://some/private/resource"); + rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(privRsrcPath), + LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE, + 789L, 680L, "*pattern*"); + LocalResourceProto privRsrcProto = rsrcPb.getProto(); + Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc"); + stateStore.startResourceLocalization(user, null, privRsrcProto, + privRsrcLocalPath); + + // restart and verify resources are marked in-progress + restartStateStore(); + state = stateStore.loadLocalizationState(); + pubts = state.getPublicTrackerState(); + assertTrue(pubts.getLocalizedResources().isEmpty()); + assertEquals(2, pubts.getInProgressResources().size()); + assertEquals(pubRsrcLocalPath1, + pubts.getInProgressResources().get(pubRsrcProto1)); + assertEquals(pubRsrcLocalPath2, + pubts.getInProgressResources().get(pubRsrcProto2)); + userResources = state.getUserResources(); + assertEquals(1, userResources.size()); + rur = userResources.get(user); + privts = rur.getPrivateTrackerState(); + assertNotNull(privts); + assertTrue(privts.getLocalizedResources().isEmpty()); + assertEquals(1, privts.getInProgressResources().size()); + assertEquals(privRsrcLocalPath, + privts.getInProgressResources().get(privRsrcProto)); + assertEquals(1, rur.getAppTrackerStates().size()); + appts = rur.getAppTrackerStates().get(appId); + assertNotNull(appts); + assertTrue(appts.getLocalizedResources().isEmpty()); + assertEquals(1, appts.getInProgressResources().size()); + assertEquals(appRsrcLocalPath, + appts.getInProgressResources().get(appRsrcProto)); + } + + @Test + public void testFinishResourceLocalization() throws IOException { + String user = "somebody"; + ApplicationId appId = ApplicationId.newInstance(1, 1); + + // start and finish a local resource for an application + Path appRsrcPath = new Path("hdfs://some/app/resource"); + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) + LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(appRsrcPath), + LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION, + 123L, 456L); + LocalResourceProto appRsrcProto = rsrcPb.getProto(); + Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc"); + stateStore.startResourceLocalization(user, appId, appRsrcProto, + appRsrcLocalPath); + LocalizedResourceProto appLocalizedProto = + LocalizedResourceProto.newBuilder() + .setResource(appRsrcProto) + .setLocalPath(appRsrcLocalPath.toString()) + .setSize(1234567L) + .build(); + stateStore.finishResourceLocalization(user, appId, appLocalizedProto); + + // restart and verify only app resource is completed + restartStateStore(); + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + LocalResourceTrackerState pubts = state.getPublicTrackerState(); + assertTrue(pubts.getLocalizedResources().isEmpty()); + assertTrue(pubts.getInProgressResources().isEmpty()); + Map userResources = + state.getUserResources(); + assertEquals(1, userResources.size()); + RecoveredUserResources rur = userResources.get(user); + LocalResourceTrackerState privts = rur.getPrivateTrackerState(); + assertNotNull(privts); + assertTrue(privts.getLocalizedResources().isEmpty()); + assertTrue(privts.getInProgressResources().isEmpty()); + assertEquals(1, rur.getAppTrackerStates().size()); + LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId); + assertNotNull(appts); + assertTrue(appts.getInProgressResources().isEmpty()); + assertEquals(1, appts.getLocalizedResources().size()); + assertEquals(appLocalizedProto, + appts.getLocalizedResources().iterator().next()); + + // start some public and private resources + Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); + rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(pubRsrcPath1), + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, + 789L, 135L); + LocalResourceProto pubRsrcProto1 = rsrcPb.getProto(); + Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1"); + stateStore.startResourceLocalization(null, null, pubRsrcProto1, + pubRsrcLocalPath1); + Path pubRsrcPath2 = new Path("hdfs://some/public/resource2"); + rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(pubRsrcPath2), + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, + 789L, 135L); + LocalResourceProto pubRsrcProto2 = rsrcPb.getProto(); + Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2"); + stateStore.startResourceLocalization(null, null, pubRsrcProto2, + pubRsrcLocalPath2); + Path privRsrcPath = new Path("hdfs://some/private/resource"); + rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(privRsrcPath), + LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE, + 789L, 680L, "*pattern*"); + LocalResourceProto privRsrcProto = rsrcPb.getProto(); + Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc"); + stateStore.startResourceLocalization(user, null, privRsrcProto, + privRsrcLocalPath); + + // finish some of the resources + LocalizedResourceProto pubLocalizedProto1 = + LocalizedResourceProto.newBuilder() + .setResource(pubRsrcProto1) + .setLocalPath(pubRsrcLocalPath1.toString()) + .setSize(pubRsrcProto1.getSize()) + .build(); + stateStore.finishResourceLocalization(null, null, pubLocalizedProto1); + LocalizedResourceProto privLocalizedProto = + LocalizedResourceProto.newBuilder() + .setResource(privRsrcProto) + .setLocalPath(privRsrcLocalPath.toString()) + .setSize(privRsrcProto.getSize()) + .build(); + stateStore.finishResourceLocalization(user, null, privLocalizedProto); + + // restart and verify state + restartStateStore(); + state = stateStore.loadLocalizationState(); + pubts = state.getPublicTrackerState(); + assertEquals(1, pubts.getLocalizedResources().size()); + assertEquals(pubLocalizedProto1, + pubts.getLocalizedResources().iterator().next()); + assertEquals(1, pubts.getInProgressResources().size()); + assertEquals(pubRsrcLocalPath2, + pubts.getInProgressResources().get(pubRsrcProto2)); + userResources = state.getUserResources(); + assertEquals(1, userResources.size()); + rur = userResources.get(user); + privts = rur.getPrivateTrackerState(); + assertNotNull(privts); + assertEquals(1, privts.getLocalizedResources().size()); + assertEquals(privLocalizedProto, + privts.getLocalizedResources().iterator().next()); + assertTrue(privts.getInProgressResources().isEmpty()); + assertEquals(1, rur.getAppTrackerStates().size()); + appts = rur.getAppTrackerStates().get(appId); + assertNotNull(appts); + assertTrue(appts.getInProgressResources().isEmpty()); + assertEquals(1, appts.getLocalizedResources().size()); + assertEquals(appLocalizedProto, + appts.getLocalizedResources().iterator().next()); + } + + @Test + public void testRemoveLocalizedResource() throws IOException { + String user = "somebody"; + ApplicationId appId = ApplicationId.newInstance(1, 1); + + // go through the complete lifecycle for an application local resource + Path appRsrcPath = new Path("hdfs://some/app/resource"); + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) + LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(appRsrcPath), + LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION, + 123L, 456L); + LocalResourceProto appRsrcProto = rsrcPb.getProto(); + Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc"); + stateStore.startResourceLocalization(user, appId, appRsrcProto, + appRsrcLocalPath); + LocalizedResourceProto appLocalizedProto = + LocalizedResourceProto.newBuilder() + .setResource(appRsrcProto) + .setLocalPath(appRsrcLocalPath.toString()) + .setSize(1234567L) + .build(); + stateStore.finishResourceLocalization(user, appId, appLocalizedProto); + stateStore.removeLocalizedResource(user, appId, appRsrcLocalPath); + + restartStateStore(); + verifyEmptyState(); + + // remove an app resource that didn't finish + stateStore.startResourceLocalization(user, appId, appRsrcProto, + appRsrcLocalPath); + stateStore.removeLocalizedResource(user, appId, appRsrcLocalPath); + + restartStateStore(); + verifyEmptyState(); + + // add public and private resources and remove some + Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); + rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(pubRsrcPath1), + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, + 789L, 135L); + LocalResourceProto pubRsrcProto1 = rsrcPb.getProto(); + Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1"); + stateStore.startResourceLocalization(null, null, pubRsrcProto1, + pubRsrcLocalPath1); + LocalizedResourceProto pubLocalizedProto1 = + LocalizedResourceProto.newBuilder() + .setResource(pubRsrcProto1) + .setLocalPath(pubRsrcLocalPath1.toString()) + .setSize(789L) + .build(); + stateStore.finishResourceLocalization(null, null, pubLocalizedProto1); + Path pubRsrcPath2 = new Path("hdfs://some/public/resource2"); + rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(pubRsrcPath2), + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, + 789L, 135L); + LocalResourceProto pubRsrcProto2 = rsrcPb.getProto(); + Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2"); + stateStore.startResourceLocalization(null, null, pubRsrcProto2, + pubRsrcLocalPath2); + LocalizedResourceProto pubLocalizedProto2 = + LocalizedResourceProto.newBuilder() + .setResource(pubRsrcProto2) + .setLocalPath(pubRsrcLocalPath2.toString()) + .setSize(7654321L) + .build(); + stateStore.finishResourceLocalization(null, null, pubLocalizedProto2); + stateStore.removeLocalizedResource(null, null, pubRsrcLocalPath2); + Path privRsrcPath = new Path("hdfs://some/private/resource"); + rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(privRsrcPath), + LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE, + 789L, 680L, "*pattern*"); + LocalResourceProto privRsrcProto = rsrcPb.getProto(); + Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc"); + stateStore.startResourceLocalization(user, null, privRsrcProto, + privRsrcLocalPath); + stateStore.removeLocalizedResource(user, null, privRsrcLocalPath); + + // restart and verify state + restartStateStore(); + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + LocalResourceTrackerState pubts = state.getPublicTrackerState(); + assertTrue(pubts.getInProgressResources().isEmpty()); + assertEquals(1, pubts.getLocalizedResources().size()); + assertEquals(pubLocalizedProto1, + pubts.getLocalizedResources().iterator().next()); + Map userResources = + state.getUserResources(); + assertTrue(userResources.isEmpty()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java index 447ea8c8bb..9305b8459e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsPage.ContainersLogsBlock; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -77,7 +78,8 @@ public void testContainerLogDirs() throws IOException, YarnException { NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(); healthChecker.init(conf); LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler(); - NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler, new ApplicationACLsManager(conf)); + NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler, + new ApplicationACLsManager(conf), new NMNullStateStoreService()); // Add an application and the corresponding containers RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf); String user = "nobody"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index eecf039750..13a77afa9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -77,7 +79,8 @@ public void tearDown() { } private int startNMWebAppServer(String webAddr) { - Context nmContext = new NodeManager.NMContext(null, null, null, null); + Context nmContext = new NodeManager.NMContext(null, null, null, null, + null); ResourceView resourceView = new ResourceView() { @Override public long getVmemAllocatedForContainers() { @@ -135,7 +138,8 @@ public void testNMWebAppWithEphemeralPort() throws IOException { @Test public void testNMWebApp() throws IOException, YarnException { - Context nmContext = new NodeManager.NMContext(null, null, null, null); + Context nmContext = new NodeManager.NMContext(null, null, null, null, + null); ResourceView resourceView = new ResourceView() { @Override public long getVmemAllocatedForContainers() { @@ -185,6 +189,7 @@ public boolean isPmemCheckEnabled() { ContainerId container2 = BuilderUtils.newContainerId(recordFactory, appId, appAttemptId, 1); NodeManagerMetrics metrics = mock(NodeManagerMetrics.class); + NMStateStoreService stateStore = new NMNullStateStoreService(); for (ContainerId containerId : new ContainerId[] { container1, container2}) { // TODO: Use builder utils diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index 2f08bb331a..e90fe7e4fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -107,7 +107,8 @@ protected void configureServlets() { healthChecker.init(conf); dirsHandler = healthChecker.getDiskHandler(); aclsManager = new ApplicationACLsManager(conf); - nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager); + nmContext = new NodeManager.NMContext(null, null, dirsHandler, + aclsManager, null); NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042); ((NodeManager.NMContext)nmContext).setNodeId(nodeId); resourceView = new ResourceView() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java index 72c1f6f2c5..a5ceaa4c84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java @@ -99,7 +99,8 @@ protected void configureServlets() { healthChecker.init(conf); dirsHandler = healthChecker.getDiskHandler(); aclsManager = new ApplicationACLsManager(conf); - nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager); + nmContext = new NodeManager.NMContext(null, null, dirsHandler, + aclsManager, null); NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999); ((NodeManager.NMContext)nmContext).setNodeId(nodeId); resourceView = new ResourceView() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java index 29c92534d4..91400a6096 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java @@ -122,7 +122,8 @@ public boolean isPmemCheckEnabled() { healthChecker.init(conf); dirsHandler = healthChecker.getDiskHandler(); aclsManager = new ApplicationACLsManager(conf); - nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager) { + nmContext = new NodeManager.NMContext(null, null, dirsHandler, + aclsManager, null) { public NodeId getNodeId() { return NodeId.newInstance("testhost.foo.com", 8042); };