diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index 1ce3356748..a1e3bdb20a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -160,6 +160,7 @@ protected void stopContainerInternal(ContainerId containerID) containerTokenId.getExecutionType()); if (foundInQueue) { + LOG.info("Removing queued container with ID " + containerID); this.context.getQueuingContext().getKilledQueuedContainers().put( containerTokenId, "Queued container request removed by ApplicationMaster."); @@ -502,6 +503,16 @@ public int getNumAllocatedOpportunisticContainers() { return allocatedOpportunisticContainers.size(); } + @VisibleForTesting + public int getNumQueuedGuaranteedContainers() { + return queuedGuaranteedContainers.size(); + } + + @VisibleForTesting + public int getNumQueuedOpportunisticContainers() { + return queuedOpportunisticContainers.size(); + } + class QueuingApplicationEventDispatcher implements EventHandler { private EventHandler applicationEventDispatcher; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index ab6028838d..4f0e5c3f58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -40,10 +40,17 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -51,6 +58,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -71,6 +79,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; import org.junit.Before; @@ -354,4 +363,59 @@ public static void waitForNMContainerState(ContainerManagerImpl Assert.assertEquals("ContainerState is not correct (timedout)", finalState, currentState); } + + public static Token createContainerToken(ContainerId cId, long rmIdentifier, + NodeId nodeId, String user, + NMContainerTokenSecretManager containerTokenSecretManager) + throws IOException { + return createContainerToken(cId, rmIdentifier, nodeId, user, + containerTokenSecretManager, null); + } + + public static Token createContainerToken(ContainerId cId, long rmIdentifier, + NodeId nodeId, String user, + NMContainerTokenSecretManager containerTokenSecretManager, + LogAggregationContext logAggregationContext) + throws IOException { + Resource r = BuilderUtils.newResource(1024, 1); + return createContainerToken(cId, rmIdentifier, nodeId, user, r, + containerTokenSecretManager, logAggregationContext); + } + + public static Token createContainerToken(ContainerId cId, long rmIdentifier, + NodeId nodeId, String user, Resource resource, + NMContainerTokenSecretManager containerTokenSecretManager, + LogAggregationContext logAggregationContext) + throws IOException { + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource, + System.currentTimeMillis() + 100000L, 123, rmIdentifier, + Priority.newInstance(0), 0, logAggregationContext, null); + return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager + .retrievePassword(containerTokenIdentifier), + containerTokenIdentifier); + } + + public static Token createContainerToken(ContainerId cId, long rmIdentifier, + NodeId nodeId, String user, Resource resource, + NMContainerTokenSecretManager containerTokenSecretManager, + LogAggregationContext logAggregationContext, ExecutionType executionType) + throws IOException { + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource, + System.currentTimeMillis() + 100000L, 123, rmIdentifier, + Priority.newInstance(0), 0, logAggregationContext, null, + ContainerType.TASK, executionType); + return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager + .retrievePassword(containerTokenIdentifier), + containerTokenIdentifier); + } + + public static ContainerId createContainerId(int id) { + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId = ContainerId.newContainerId(appAttemptId, id); + return containerId; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 702198e9a9..10b9155dd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -38,10 +43,10 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.Shell; -import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; -import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; @@ -58,13 +63,9 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.api.records.LogAggregationContext; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.SignalContainerCommand; @@ -75,7 +76,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; -import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; @@ -90,8 +90,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; -import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Assert; import org.junit.Before; @@ -99,11 +97,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; - public class TestContainerManager extends BaseContainerManagerTest { public TestContainerManager() throws UnsupportedFileSystemException { @@ -119,14 +112,6 @@ public TestContainerManager() throws UnsupportedFileSystemException { public void setup() throws IOException { super.setup(); } - - public static ContainerId createContainerId(int id) { - ApplicationId appId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 1); - ContainerId containerId = ContainerId.newContainerId(appAttemptId, id); - return containerId; - } @Override protected ContainerManagerImpl @@ -1153,53 +1138,6 @@ public void testChangeContainerResource() throws Exception { assertEquals(targetResource, containerStatus.getCapability()); } - public static Token createContainerToken(ContainerId cId, long rmIdentifier, - NodeId nodeId, String user, - NMContainerTokenSecretManager containerTokenSecretManager) - throws IOException { - return createContainerToken(cId, rmIdentifier, nodeId, user, - containerTokenSecretManager, null); - } - - public static Token createContainerToken(ContainerId cId, long rmIdentifier, - NodeId nodeId, String user, - NMContainerTokenSecretManager containerTokenSecretManager, - LogAggregationContext logAggregationContext) - throws IOException { - Resource r = BuilderUtils.newResource(1024, 1); - return createContainerToken(cId, rmIdentifier, nodeId, user, r, - containerTokenSecretManager, logAggregationContext); - } - - public static Token createContainerToken(ContainerId cId, long rmIdentifier, - NodeId nodeId, String user, Resource resource, - NMContainerTokenSecretManager containerTokenSecretManager, - LogAggregationContext logAggregationContext) - throws IOException { - ContainerTokenIdentifier containerTokenIdentifier = - new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource, - System.currentTimeMillis() + 100000L, 123, rmIdentifier, - Priority.newInstance(0), 0, logAggregationContext, null); - return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager - .retrievePassword(containerTokenIdentifier), - containerTokenIdentifier); - } - - public static Token createContainerToken(ContainerId cId, long rmIdentifier, - NodeId nodeId, String user, Resource resource, - NMContainerTokenSecretManager containerTokenSecretManager, - LogAggregationContext logAggregationContext, ExecutionType executionType) - throws IOException { - ContainerTokenIdentifier containerTokenIdentifier = - new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource, - System.currentTimeMillis() + 100000L, 123, rmIdentifier, - Priority.newInstance(0), 0, logAggregationContext, null, - ContainerType.TASK, executionType); - return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager - .retrievePassword(containerTokenIdentifier), - containerTokenIdentifier); - } - @Test public void testOutputThreadDumpSignal() throws IOException, InterruptedException, YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java index 8a0d155fd3..4d44d8d85d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -32,42 +37,27 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; - +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; - -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor - .ContainersMonitorImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorPlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorProcessTree; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Assert; import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class TestQueuingContainerManager extends TestContainerManager { +/** + * Class for testing the {@link QueuingContainerManagerImpl}. + */ +public class TestQueuingContainerManager extends BaseContainerManagerTest { interface HasResources { boolean decide(Context context, ContainerId cId); @@ -119,15 +109,6 @@ protected UserGroupInformation getRemoteUgi() throws YarnException { return ugi; } - @Override - protected void authorizeGetAndStopContainerRequest( - ContainerId containerId, Container container, boolean stopRequest, - NMTokenIdentifier identifier) throws YarnException { - if (container == null || container.getUser().equals("Fail")) { - throw new YarnException("Reject this container"); - } - } - @Override protected ContainersMonitor createContainersMonitor( ContainerExecutor exec) { @@ -148,7 +129,7 @@ public long getVmemAllocatedForContainers() { @Override public long getVCoresAllocatedForContainers() { - return 2; + return 4; } }; } @@ -186,54 +167,17 @@ public void setup() throws IOException { } /** - * Test to verify that an OPPORTUNISTIC container is killed when - * a GUARANTEED container arrives and all the Node Resources are used up - * - * For this specific test case, 4 containers are requested (last one being - * guaranteed). Assumptions : - * 1) The first OPPORTUNISTIC Container will start running - * 2) The second and third OPP containers will be queued - * 3) When the GUARANTEED container comes in, the running OPP container - * will be killed to make room - * 4) After the GUARANTEED container finishes, the remaining 2 OPP - * containers will be dequeued and run. - * 5) Only the first OPP container will be killed. - * + * Starting one GUARANTEED and one OPPORTUNISTIC container. * @throws Exception */ @Test - public void testSimpleOpportunisticContainer() throws Exception { + public void testStartMultipleContainers() throws Exception { shouldDeleteWait = true; containerManager.start(); - // ////// Create the resources for the container - File dir = new File(tmpDir, "dir"); - dir.mkdirs(); - File file = new File(dir, "file"); - PrintWriter fileWriter = new PrintWriter(file); - fileWriter.write("Hello World!"); - fileWriter.close(); - - // ////// Construct the container-spec. ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); - URL resource_alpha = - ConverterUtils.getYarnUrlFromPath(localFS - .makeQualified(new Path(file.getAbsolutePath()))); - LocalResource rsrc_alpha = - recordFactory.newRecordInstance(LocalResource.class); - rsrc_alpha.setResource(resource_alpha); - rsrc_alpha.setSize(-1); - rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); - rsrc_alpha.setType(LocalResourceType.FILE); - rsrc_alpha.setTimestamp(file.lastModified()); - String destinationFile = "dest_file"; - Map localResources = - new HashMap(); - localResources.put(destinationFile, rsrc_alpha); - containerLaunchContext.setLocalResources(localResources); - // Start 3 OPPORTUNISTIC containers and 1 GUARANTEED container List list = new ArrayList<>(); list.add(StartContainerRequest.newInstance( containerLaunchContext, @@ -241,6 +185,122 @@ public void testSimpleOpportunisticContainer() throws Exception { context.getNodeId(), user, BuilderUtils.newResource(1024, 1), context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForContainerState(containerManager, + createContainerId(0), + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); + BaseContainerManagerTest.waitForContainerState(containerManager, + createContainerId(1), + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); + + // Ensure all containers are running. + List statList = new ArrayList(); + for (int i = 0; i < 2; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } + } + + /** + * Submit both a GUARANTEED and an OPPORTUNISTIC container, each of which + * requires more resources than available at the node, and make sure they + * are both queued. + * @throws Exception + */ + @Test + public void testQueueMultipleContainers() throws Exception { + shouldDeleteWait = true; + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(3072, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(3072, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + Thread.sleep(5000); + + // Ensure both containers are queued. + List statList = new ArrayList(); + for (int i = 0; i < 2; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, + status.getState()); + } + + // Ensure both containers are properly queued. + Assert.assertEquals(2, containerManager.getContext().getQueuingContext() + .getQueuedContainers().size()); + Assert.assertEquals(1, ((QueuingContainerManagerImpl) containerManager) + .getNumQueuedGuaranteedContainers()); + Assert.assertEquals(1, ((QueuingContainerManagerImpl) containerManager) + .getNumQueuedOpportunisticContainers()); + } + + /** + * Starts one OPPORTUNISTIC container that takes up the whole node's + * resources, and submit two more that will be queued. + * @throws Exception + */ + @Test + public void testStartAndQueueMultipleContainers() throws Exception { + shouldDeleteWait = true; + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, ExecutionType.OPPORTUNISTIC))); list.add(StartContainerRequest.newInstance( containerLaunchContext, @@ -256,21 +316,185 @@ public void testSimpleOpportunisticContainer() throws Exception { user, BuilderUtils.newResource(1024, 1), context.getContainerTokenSecretManager(), null, ExecutionType.OPPORTUNISTIC))); - // GUARANTEED + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + Thread.sleep(5000); + + // Ensure first container is running and others are queued. + List statList = new ArrayList(); + for (int i = 0; i < 3; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = GetContainerStatusesRequest + .newInstance(Arrays.asList(createContainerId(0))); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(createContainerId(0))) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } else { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, + status.getState()); + } + } + + // Ensure two containers are properly queued. + Assert.assertEquals(2, containerManager.getContext().getQueuingContext() + .getQueuedContainers().size()); + Assert.assertEquals(0, ((QueuingContainerManagerImpl) containerManager) + .getNumQueuedGuaranteedContainers()); + Assert.assertEquals(2, ((QueuingContainerManagerImpl) containerManager) + .getNumQueuedOpportunisticContainers()); + } + + /** + * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources + * requests by each container as such that only one can run in parallel. + * Thus, the OPPORTUNISTIC container that started running, will be + * killed for the GUARANTEED container to start. + * Once the GUARANTEED container finishes its execution, the remaining + * OPPORTUNISTIC container will be executed. + * @throws Exception + */ + @Test + public void testKillOpportunisticForGuaranteedContainer() throws Exception { + shouldDeleteWait = true; + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(0), ContainerState.DONE, 30); + Thread.sleep(5000); + + // Get container statuses. Container 0 should be killed, container 1 + // should be queued and container 2 should be running. + List statList = new ArrayList(); + for (int i = 0; i < 3; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(createContainerId(0))) { + Assert.assertTrue(status.getDiagnostics() + .contains("Container killed by the ApplicationMaster")); + } else if (status.getContainerId().equals(createContainerId(1))) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, + status.getState()); + } else if (status.getContainerId().equals(createContainerId(2))) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } + System.out.println("\nStatus : [" + status + "]\n"); + } + + // Make sure the remaining OPPORTUNISTIC container starts its execution. + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(2), ContainerState.DONE, 30); + Thread.sleep(5000); + statRequest = GetContainerStatusesRequest.newInstance(Arrays.asList( + createContainerId(1))); + ContainerStatus contStatus1 = containerManager.getContainerStatuses( + statRequest).getContainerStatuses().get(0); + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + contStatus1.getState()); + } + + /** + * Submit three OPPORTUNISTIC containers that can run concurrently, and one + * GUARANTEED that needs to kill two of the OPPORTUNISTIC for it to run. + * @throws Exception + */ + @Test + public void testKillMultipleOpportunisticContainers() throws Exception { + shouldDeleteWait = true; + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); list.add(StartContainerRequest.newInstance( containerLaunchContext, createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER, context.getNodeId(), - user, BuilderUtils.newResource(1024, 1), + user, BuilderUtils.newResource(1500, 1), context.getContainerTokenSecretManager(), null, ExecutionType.GUARANTEED))); + StartContainersRequest allRequests = StartContainersRequest.newInstance(list); - containerManager.startContainers(allRequests); - Thread.sleep(10000); + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(0), ContainerState.DONE, 30); + Thread.sleep(5000); + // Get container statuses. Container 0 should be killed, container 1 + // should be queued and container 2 should be running. + int killedContainers = 0; + int runningContainers = 0; List statList = new ArrayList(); for (int i = 0; i < 4; i++) { statList.add(createContainerId(i)); @@ -280,12 +504,18 @@ public void testSimpleOpportunisticContainer() throws Exception { List containerStatuses = containerManager .getContainerStatuses(statRequest).getContainerStatuses(); for (ContainerStatus status : containerStatuses) { - // Ensure that the first opportunistic container is killed - if (status.getContainerId().equals(createContainerId(0))) { - Assert.assertTrue(status.getDiagnostics() - .contains("Container killed by the ApplicationMaster")); + if (status.getDiagnostics().contains( + "Container killed by the ApplicationMaster")) { + killedContainers++; + } + if (status.getState() == + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) { + runningContainers++; } System.out.println("\nStatus : [" + status + "]\n"); } + + Assert.assertEquals(2, killedContainers); + Assert.assertEquals(2, runningContainers); } }