diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index ad63720d9f..89bef8f062 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -496,7 +496,7 @@ protected void recoverActiveContainer(Application app, Container container = new ContainerImpl(getConfig(), dispatcher, launchContext, credentials, metrics, token, context, rcs); context.getContainers().put(token.getContainerID(), container); - containerScheduler.recoverActiveContainer(container, rcs.getStatus()); + containerScheduler.recoverActiveContainer(container, rcs); app.handle(new ApplicationContainerInitEvent(container)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 5cdcf414b3..a61b9d15cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -44,6 +44,9 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService + .RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -259,11 +262,11 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) { * @param rcs Recovered Container status */ public void recoverActiveContainer(Container container, - RecoveredContainerStatus rcs) { + RecoveredContainerState rcs) { ExecutionType execType = container.getContainerTokenIdentifier().getExecutionType(); - if (rcs == RecoveredContainerStatus.QUEUED - || rcs == RecoveredContainerStatus.PAUSED) { + if (rcs.getStatus() == RecoveredContainerStatus.QUEUED + || rcs.getStatus() == RecoveredContainerStatus.PAUSED) { if (execType == ExecutionType.GUARANTEED) { queuedGuaranteedContainers.put(container.getContainerId(), container); } else if (execType == ExecutionType.OPPORTUNISTIC) { @@ -274,10 +277,15 @@ public void recoverActiveContainer(Container container, "UnKnown execution type received " + container.getContainerId() + ", execType " + execType); } - } else if (rcs == RecoveredContainerStatus.LAUNCHED) { + } else if (rcs.getStatus() == RecoveredContainerStatus.LAUNCHED) { runningContainers.put(container.getContainerId(), container); utilizationTracker.addContainerResources(container); } + if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED + && rcs.getCapability() != null) { + metrics.launchedContainer(); + metrics.allocateContainer(rcs.getCapability()); + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 6f643b04d5..44f5e18ab7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.util.ConverterUtils; import org.fusesource.leveldbjni.JniDBFactory; @@ -237,7 +238,7 @@ public List loadContainersState() iter.seek(bytes(CONTAINERS_KEY_PREFIX)); while (iter.hasNext()) { - Entry entry = iter.peekNext(); + Entry entry = iter.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(CONTAINERS_KEY_PREFIX)) { break; @@ -299,6 +300,10 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) { rcs.startRequest = new StartContainerRequestPBImpl( StartContainerRequestProto.parseFrom(entry.getValue())); + ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils + .newContainerTokenIdentifier(rcs.startRequest.getContainerToken()); + rcs.capability = new ResourcePBImpl( + containerTokenIdentifier.getProto().getResource()); } else if (suffix.equals(CONTAINER_VERSION_KEY_SUFFIX)) { rcs.version = Integer.parseInt(asString(entry.getValue())); } else if (suffix.equals(CONTAINER_START_TIME_KEY_SUFFIX)) { @@ -382,24 +387,25 @@ public void storeContainer(ContainerId containerId, int containerVersion, LOG.debug("storeContainer: containerId= " + idStr + ", startRequest= " + startRequest); } - String keyRequest = getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX); - String keyVersion = getContainerVersionKey(idStr); - String keyStartTime = + final String keyVersion = getContainerVersionKey(idStr); + final String keyRequest = + getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX); + final StartContainerRequestProto startContainerRequest = + ((StartContainerRequestPBImpl) startRequest).getProto(); + + final String keyStartTime = getContainerKey(idStr, CONTAINER_START_TIME_KEY_SUFFIX); + final String startTimeValue = Long.toString(startTime); + try { - WriteBatch batch = db.createWriteBatch(); - try { - batch.put(bytes(keyRequest), - ((StartContainerRequestPBImpl) startRequest).getProto(). - toByteArray()); - batch.put(bytes(keyStartTime), bytes(Long.toString(startTime))); + try (WriteBatch batch = db.createWriteBatch()) { + batch.put(bytes(keyRequest), startContainerRequest.toByteArray()); + batch.put(bytes(keyStartTime), bytes(startTimeValue)); if (containerVersion != 0) { batch.put(bytes(keyVersion), - bytes(Integer.toString(containerVersion))); + bytes(Integer.toString(containerVersion))); } db.write(batch); - } finally { - batch.close(); } } catch (DBException e) { markStoreUnHealthy(e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index f217f2f860..dfad9cfee3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -73,7 +73,7 @@ public List loadContainersState() @Override public void storeContainer(ContainerId containerId, int version, - long startTime, StartContainerRequest startRequest) throws IOException { + long startTime, StartContainerRequest startRequest) { } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 0ea0ef3b86..70decdba74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -416,7 +416,8 @@ public abstract List loadContainersState() * @throws IOException */ public abstract void storeContainer(ContainerId containerId, - int containerVersion, long startTime, StartContainerRequest startRequest) + int containerVersion, long startTime, + StartContainerRequest startRequest) throws IOException; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index b31601c8c9..493aa4ca76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -107,7 +107,7 @@ public abstract class BaseContainerManagerTest { protected static File remoteLogDir; protected static File tmpDir; - protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); + protected NodeManagerMetrics metrics = NodeManagerMetrics.create(); public BaseContainerManagerTest() throws UnsupportedFileSystemException { localFS = FileContext.getLocalFSFileContext(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 0a834af84a..a144adf47a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -47,6 +47,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -106,6 +107,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.metrics.TestNodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -400,6 +402,61 @@ public void testNMRecoveryForAppFinishedWithLogAggregationFailure() cm.stop(); } + @Test + public void testNodeManagerMetricsRecovery() throws Exception { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + Context context = createContext(conf, stateStore); + ContainerManagerImpl cm = createContainerManager(context, delSrvc); + cm.init(conf); + cm.start(); + metrics.addResource(Resource.newInstance(10240, 8)); + + // add an application by starting a container + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + ContainerId cid = ContainerId.newContainerId(attemptId, 1); + Map containerEnv = Collections.emptyMap(); + Map serviceData = Collections.emptyMap(); + Map localResources = Collections.emptyMap(); + List commands = Arrays.asList("sleep 60s".split(" ")); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, containerEnv, commands, serviceData, + null, null); + StartContainersResponse startResponse = startContainer(context, cm, cid, + clc, null, ContainerType.TASK); + assertTrue(startResponse.getFailedRequests().isEmpty()); + assertEquals(1, context.getApplications().size()); + Application app = context.getApplications().get(appId); + assertNotNull(app); + + // make sure the container reaches RUNNING state + waitForNMContainerState(cm, cid, + org.apache.hadoop.yarn.server.nodemanager + .containermanager.container.ContainerState.RUNNING); + TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7); + + // restart and verify metrics could be recovered + cm.stop(); + DefaultMetricsSystem.shutdown(); + metrics = NodeManagerMetrics.create(); + metrics.addResource(Resource.newInstance(10240, 8)); + TestNodeManagerMetrics.checkMetrics(0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 8); + context = createContext(conf, stateStore); + cm = createContainerManager(context, delSrvc); + cm.init(conf); + cm.start(); + assertEquals(1, context.getApplications().size()); + app = context.getApplications().get(appId); + assertNotNull(app); + TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7); + cm.stop(); + } + @Test public void testContainerResizeRecovery() throws Exception { conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java index 2ae8b978b3..6b3ac671d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.doNothing; @@ -31,6 +32,8 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService + .RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.junit.After; import org.junit.Before; @@ -71,6 +74,13 @@ public class TestContainerSchedulerRecovery { private ContainerScheduler spy; + private RecoveredContainerState createRecoveredContainerState( + RecoveredContainerStatus status) { + RecoveredContainerState mockState = mock(RecoveredContainerState.class); + when(mockState.getStatus()).thenReturn(status); + return mockState; + } + @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); spy = spy(tempContainerScheduler); @@ -94,7 +104,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.QUEUED); when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -113,7 +124,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.QUEUED); when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -132,7 +144,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.PAUSED); when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -151,7 +164,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.PAUSED); when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -170,7 +184,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.LAUNCHED); when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -189,7 +204,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.LAUNCHED); when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -208,7 +224,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.REQUESTED); when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -227,7 +244,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.REQUESTED); when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -246,7 +264,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.COMPLETED); when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -265,7 +284,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.COMPLETED); when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -284,7 +304,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.QUEUED); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); assertEquals(0, spy.getNumQueuedGuaranteedContainers()); @@ -302,7 +323,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.PAUSED); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); assertEquals(0, spy.getNumQueuedGuaranteedContainers()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java index d21e7ad770..c5f80ba958 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java @@ -113,8 +113,8 @@ public void testReferenceOfSingletonJvmMetrics() { assertGauge("AvailableVCores", 19, rb); } - private void checkMetrics(int launched, int completed, int failed, int killed, - int initing, int running, int allocatedGB, + public static void checkMetrics(int launched, int completed, int failed, + int killed, int initing, int running, int allocatedGB, int allocatedContainers, int availableGB, int allocatedVCores, int availableVCores) { MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index b67d11fceb..c5428d184b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; @@ -45,6 +46,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; + +import org.apache.hadoop.yarn.server.utils.BuilderUtils; + public class NMMemoryStateStoreService extends NMStateStoreService { private Map apps; private Map containerStates; @@ -132,11 +136,19 @@ public synchronized List loadContainersState() @Override public synchronized void storeContainer(ContainerId containerId, - int version, long startTime, StartContainerRequest startRequest) - throws IOException { + int version, long startTime, StartContainerRequest startRequest) { RecoveredContainerState rcs = new RecoveredContainerState(); rcs.startRequest = startRequest; rcs.version = version; + try { + ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils + .newContainerTokenIdentifier(startRequest.getContainerToken()); + rcs.capability = + new ResourcePBImpl(containerTokenIdentifier.getProto().getResource()); + } catch (IOException e) { + throw new RuntimeException(e); + } + rcs.setStartTime(startTime); containerStates.put(containerId, rcs); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 265b3e6883..c8c07d1fdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -238,7 +238,9 @@ public void testContainerStorage() throws IOException { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 4); ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); - StartContainerRequest containerReq = createContainerRequest(containerId); + Resource containerResource = Resource.newInstance(1024, 2); + StartContainerRequest containerReq = + createContainerRequest(containerId, containerResource); // store a container and verify recovered long containerStartTime = System.currentTimeMillis(); @@ -260,6 +262,7 @@ public void testContainerStorage() throws IOException { assertEquals(false, rcs.getKilled()); assertEquals(containerReq, rcs.getStartRequest()); assertTrue(rcs.getDiagnostics().isEmpty()); + assertEquals(containerResource, rcs.getCapability()); // store a new container record without StartContainerRequest ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); @@ -279,6 +282,7 @@ public void testContainerStorage() throws IOException { assertEquals(false, rcs.getKilled()); assertEquals(containerReq, rcs.getStartRequest()); assertTrue(rcs.getDiagnostics().isEmpty()); + assertEquals(containerResource, rcs.getCapability()); // launch the container, add some diagnostics, and verify recovered StringBuilder diags = new StringBuilder(); @@ -294,6 +298,7 @@ public void testContainerStorage() throws IOException { assertEquals(false, rcs.getKilled()); assertEquals(containerReq, rcs.getStartRequest()); assertEquals(diags.toString(), rcs.getDiagnostics()); + assertEquals(containerResource, rcs.getCapability()); // pause the container, and verify recovered stateStore.storeContainerPaused(containerId); @@ -394,8 +399,18 @@ private void validateRetryAttempts(ContainerId containerId) assertEquals(1462700529120L, (long)recoveredRestartTimes.get(2)); } + private StartContainerRequest createContainerRequest( + ContainerId containerId, Resource res) { + return createContainerRequestInternal(containerId, res); + } + private StartContainerRequest createContainerRequest( ContainerId containerId) { + return createContainerRequestInternal(containerId, null); + } + + private StartContainerRequest createContainerRequestInternal(ContainerId + containerId, Resource res) { LocalResource lrsrc = LocalResource.newInstance( URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"), LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L, @@ -421,6 +436,10 @@ private StartContainerRequest createContainerRequest( localResources, env, containerCmds, serviceData, containerTokens, acls); Resource containerRsrc = Resource.newInstance(1357, 3); + + if (res != null) { + containerRsrc = res; + } ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),