From d464f4d1c4dec483852fc8c0496787cba0af8f57 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Wed, 11 May 2016 19:10:17 -0700 Subject: [PATCH] YARN-5049. Extend NMStateStore to save queued container information. (Konstantinos Karanasos via asuresh) --- .../ContainerManagerImpl.java | 26 ++++++++---- .../queuing/QueuingContainerManagerImpl.java | 41 +++++++++++++++++++ .../recovery/NMLeveldbStateStoreService.java | 26 +++++++++++- .../recovery/NMNullStateStoreService.java | 4 ++ .../recovery/NMStateStoreService.java | 9 ++++ .../recovery/NMMemoryStateStoreService.java | 6 +++ .../TestNMLeveldbStateStoreService.java | 12 ++++++ 7 files changed, 113 insertions(+), 11 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 516ef907e0..4383d2bfc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -335,7 +335,6 @@ private void recoverApplication(ContainerManagerApplicationProto p) app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } - @SuppressWarnings("unchecked") private void recoverContainer(RecoveredContainerState rcs) throws IOException { StartContainerRequest req = rcs.getStartRequest(); @@ -350,14 +349,7 @@ private void recoverContainer(RecoveredContainerState rcs) + " with exit code " + rcs.getExitCode()); if (context.getApplications().containsKey(appId)) { - Credentials credentials = - YarnServerSecurityUtils.parseCredentials(launchContext); - Container container = new ContainerImpl(getConfig(), dispatcher, - req.getContainerLaunchContext(), - credentials, metrics, token, context, rcs); - context.getContainers().put(containerId, container); - dispatcher.getEventHandler().handle( - new ApplicationContainerInitEvent(container)); + recoverActiveContainer(launchContext, token, rcs); } else { if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) { LOG.warn(containerId + " has no corresponding application!"); @@ -367,6 +359,22 @@ private void recoverContainer(RecoveredContainerState rcs) } } + /** + * Recover a running container. + */ + @SuppressWarnings("unchecked") + protected void recoverActiveContainer( + ContainerLaunchContext launchContext, ContainerTokenIdentifier token, + RecoveredContainerState rcs) throws IOException { + Credentials credentials = YarnServerSecurityUtils.parseCredentials( + launchContext); + Container container = new ContainerImpl(getConfig(), dispatcher, + launchContext, credentials, metrics, token, context, rcs); + context.getContainers().put(token.getContainerID(), container); + dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent( + container)); + } + private void waitForRecoveredContainers() throws InterruptedException { final int sleepMsec = 100; int waitIterations = 100; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index 663dd3bde2..94d3172302 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; @@ -56,6 +57,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,6 +124,11 @@ protected void startContainerInternal( hasResourcesAvailable(allocatedContInfo.getPti())) { startAllocatedContainer(allocatedContInfo); } else { + this.context.getNMStateStore().storeContainer(containerTokenIdentifier + .getContainerID(), request); + this.context.getNMStateStore().storeContainerQueued( + containerTokenIdentifier.getContainerID()); + if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) { queuedGuaranteedContainers.add(allocatedContInfo); // Kill running opportunistic containers to make space for @@ -150,6 +158,7 @@ protected void stopContainerInternal(ContainerId containerID) this.context.getQueuingContext().getKilledQueuedContainers().put( containerTokenId, "Queued container request removed by ApplicationMaster."); + this.context.getNMStateStore().storeContainerKilled(containerID); } else { // The container started execution in the meanwhile. try { @@ -446,6 +455,38 @@ protected ContainerStatus getContainerStatusInternal(ContainerId containerID, return super.getContainerStatusInternal(containerID, nmTokenIdentifier); } + /** + * Recover running or queued container. + */ + @Override + protected void recoverActiveContainer( + ContainerLaunchContext launchContext, ContainerTokenIdentifier token, + RecoveredContainerState rcs) throws IOException { + if (rcs.getStatus() == + RecoveredContainerStatus.QUEUED && !rcs.getKilled()) { + LOG.info(token.getContainerID() + + "will be added to the queued containers."); + + AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo( + token, rcs.getStartRequest(), token.getExecutionType(), + token.getResource(), getConfig()); + + this.context.getQueuingContext().getQueuedContainers().put( + token.getContainerID(), token); + + if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) { + queuedGuaranteedContainers.add(allocatedContInfo); + // Kill running opportunistic containers to make space for + // guaranteed container. + killOpportunisticContainers(allocatedContInfo); + } else { + queuedOpportunisticContainers.add(allocatedContInfo); + } + } else { + super.recoverActiveContainer(launchContext, token, rcs); + } + } + @VisibleForTesting public int getNumAllocatedGuaranteedContainers() { return allocatedGuaranteedContainers.size(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 6e9efe123c..8bd20402b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -80,7 +80,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version"; private static final Version CURRENT_VERSION_INFO = Version - .newInstance(1, 0); + .newInstance(2, 0); private static final String DELETION_TASK_KEY_PREFIX = "DeletionService/deltask_"; @@ -106,6 +106,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request"; private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics"; private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; + private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued"; private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX = "/resourceChanged"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; @@ -239,8 +240,13 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, StartContainerRequestProto.parseFrom(entry.getValue())); } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) { rcs.diagnostics = asString(entry.getValue()); - } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) { + } else if (suffix.equals(CONTAINER_QUEUED_KEY_SUFFIX)) { if (rcs.status == RecoveredContainerStatus.REQUESTED) { + rcs.status = RecoveredContainerStatus.QUEUED; + } + } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) { + if ((rcs.status == RecoveredContainerStatus.REQUESTED) + || (rcs.status == RecoveredContainerStatus.QUEUED)) { rcs.status = RecoveredContainerStatus.LAUNCHED; } } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) { @@ -283,6 +289,21 @@ public void storeContainer(ContainerId containerId, } } + @Override + public void storeContainerQueued(ContainerId containerId) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("storeContainerQueued: containerId=" + containerId); + } + + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_QUEUED_KEY_SUFFIX; + try { + db.put(bytes(key), EMPTY_VALUE); + } catch (DBException e) { + throw new IOException(e); + } + } + @Override public void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException { @@ -417,6 +438,7 @@ public void removeContainer(ContainerId containerId) batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX)); db.write(batch); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 08b80e961a..112095e1be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -74,6 +74,10 @@ public void storeContainer(ContainerId containerId, StartContainerRequest startRequest) throws IOException { } + @Override + public void storeContainerQueued(ContainerId containerId) throws IOException { + } + @Override public void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index ccf1e709d9..57f35a47b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -62,6 +62,7 @@ public List getApplications() { public enum RecoveredContainerStatus { REQUESTED, + QUEUED, LAUNCHED, COMPLETED } @@ -311,6 +312,14 @@ public abstract List loadContainersState() public abstract void storeContainer(ContainerId containerId, StartContainerRequest startRequest) throws IOException; + /** + * Record that a container has been queued at the NM + * @param containerId the container ID + * @throws IOException + */ + public abstract void storeContainerQueued(ContainerId containerId) + throws IOException; + /** * Record that a container has been launched * @param containerId the container ID diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 46522453ff..3c5edc0679 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -131,6 +131,12 @@ public synchronized void storeContainer(ContainerId containerId, containerStates.put(containerId, rcs); } + @Override + public void storeContainerQueued(ContainerId containerId) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.status = RecoveredContainerStatus.QUEUED; + } + @Override public synchronized void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index ccc9254afc..2f409c8246 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -280,6 +280,18 @@ public void testContainerStorage() throws IOException { // check whether the new container record is discarded assertEquals(1, recoveredContainers.size()); + // queue the container, and verify recovered + stateStore.storeContainerQueued(containerId); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertTrue(rcs.getDiagnostics().isEmpty()); + // launch the container, add some diagnostics, and verify recovered StringBuilder diags = new StringBuilder(); stateStore.storeContainerLaunched(containerId);