diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 1b2bca32d2..09b12f2c48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.api.impl; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -36,6 +37,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -142,6 +144,10 @@ private void createClusterAndStartApplication() throws Exception { // set the minimum allocation so that resource decrease can go under 1024 conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); + conf.setBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); + conf.setInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10); yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); yarnCluster.init(conf); yarnCluster.start(); @@ -924,8 +930,8 @@ public void testAskWithNodeLabels() { // add exp=x to ANY client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024, 1), null, null, Priority.UNDEFINED, true, "x")); - Assert.assertEquals(1, client.ask.size()); - Assert.assertEquals("x", client.ask.iterator().next() + assertEquals(1, client.ask.size()); + assertEquals("x", client.ask.iterator().next() .getNodeLabelExpression()); // add exp=x then add exp=a to ANY in same priority, only exp=a should kept @@ -933,8 +939,8 @@ public void testAskWithNodeLabels() { 1), null, null, Priority.UNDEFINED, true, "x")); client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024, 1), null, null, Priority.UNDEFINED, true, "a")); - Assert.assertEquals(1, client.ask.size()); - Assert.assertEquals("a", client.ask.iterator().next() + assertEquals(1, client.ask.size()); + assertEquals("a", client.ask.iterator().next() .getNodeLabelExpression()); // add exp=x to ANY, rack and node, only resource request has ANY resource @@ -943,10 +949,10 @@ public void testAskWithNodeLabels() { client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024, 1), null, null, Priority.UNDEFINED, true, "y")); - Assert.assertEquals(1, client.ask.size()); + assertEquals(1, client.ask.size()); for (ResourceRequest req : client.ask) { if (ResourceRequest.ANY.equals(req.getResourceName())) { - Assert.assertEquals("y", req.getNodeLabelExpression()); + assertEquals("y", req.getNodeLabelExpression()); } else { Assert.assertNull(req.getNodeLabelExpression()); } @@ -957,7 +963,7 @@ public void testAskWithNodeLabels() { new String[] { "node1", "node2" }, Priority.UNDEFINED, true, "y")); for (ResourceRequest req : client.ask) { if (ResourceRequest.ANY.equals(req.getResourceName())) { - Assert.assertEquals("y", req.getNodeLabelExpression()); + assertEquals("y", req.getNodeLabelExpression()); } else { Assert.assertNull(req.getNodeLabelExpression()); } @@ -971,7 +977,7 @@ private void verifyAddRequestFailed(AMRMClient client, } catch (InvalidContainerRequestException e) { return; } - Assert.fail(); + fail(); } @Test(timeout=30000) @@ -1042,7 +1048,8 @@ private List allocateAndStartContainers( // get allocations AllocateResponse allocResponse = amClient.allocate(0.1f); List containers = allocResponse.getAllocatedContainers(); - Assert.assertEquals(num, containers.size()); + assertEquals(num, containers.size()); + // build container launch context Credentials ts = new Credentials(); DataOutputBuffer dob = new DataOutputBuffer(); @@ -1083,14 +1090,14 @@ private List allocateAndStartContainers( private void doContainerResourceChange( final AMRMClient amClient, List containers) throws YarnException, IOException { - Assert.assertEquals(3, containers.size()); + assertEquals(3, containers.size()); // remember the container IDs Container container1 = containers.get(0); Container container2 = containers.get(1); Container container3 = containers.get(2); AMRMClientImpl amClientImpl = (AMRMClientImpl) amClient; - Assert.assertEquals(0, amClientImpl.change.size()); + assertEquals(0, amClientImpl.change.size()); // verify newer request overwrites older request for the container1 amClientImpl.requestContainerUpdate(container1, UpdateContainerRequest.newInstance(container1.getVersion(), @@ -1100,21 +1107,21 @@ private void doContainerResourceChange( UpdateContainerRequest.newInstance(container1.getVersion(), container1.getId(), ContainerUpdateType.INCREASE_RESOURCE, Resource.newInstance(4096, 1), null)); - Assert.assertEquals(Resource.newInstance(4096, 1), + assertEquals(Resource.newInstance(4096, 1), amClientImpl.change.get(container1.getId()).getValue().getCapability()); // verify new decrease request cancels old increase request for container1 amClientImpl.requestContainerUpdate(container1, UpdateContainerRequest.newInstance(container1.getVersion(), container1.getId(), ContainerUpdateType.DECREASE_RESOURCE, Resource.newInstance(512, 1), null)); - Assert.assertEquals(Resource.newInstance(512, 1), + assertEquals(Resource.newInstance(512, 1), amClientImpl.change.get(container1.getId()).getValue().getCapability()); // request resource increase for container2 amClientImpl.requestContainerUpdate(container2, UpdateContainerRequest.newInstance(container2.getVersion(), container2.getId(), ContainerUpdateType.INCREASE_RESOURCE, Resource.newInstance(2048, 1), null)); - Assert.assertEquals(Resource.newInstance(2048, 1), + assertEquals(Resource.newInstance(2048, 1), amClientImpl.change.get(container2.getId()).getValue().getCapability()); // verify release request will cancel pending change requests for the same // container @@ -1122,27 +1129,357 @@ private void doContainerResourceChange( UpdateContainerRequest.newInstance(container3.getVersion(), container3.getId(), ContainerUpdateType.INCREASE_RESOURCE, Resource.newInstance(2048, 1), null)); - Assert.assertEquals(3, amClientImpl.pendingChange.size()); + assertEquals(3, amClientImpl.pendingChange.size()); amClientImpl.releaseAssignedContainer(container3.getId()); - Assert.assertEquals(2, amClientImpl.pendingChange.size()); + assertEquals(2, amClientImpl.pendingChange.size()); // as of now: container1 asks to decrease to (512, 1) // container2 asks to increase to (2048, 1) // send allocation requests AllocateResponse allocResponse = amClient.allocate(0.1f); - Assert.assertEquals(0, amClientImpl.change.size()); + assertEquals(0, amClientImpl.change.size()); // we should get decrease confirmation right away List updatedContainers = allocResponse.getUpdatedContainers(); - Assert.assertEquals(1, updatedContainers.size()); + assertEquals(1, updatedContainers.size()); // we should get increase allocation after the next NM's heartbeat to RM triggerSchedulingWithNMHeartBeat(); // get allocations allocResponse = amClient.allocate(0.1f); updatedContainers = allocResponse.getUpdatedContainers(); - Assert.assertEquals(1, updatedContainers.size()); + assertEquals(1, updatedContainers.size()); } + @Test(timeout=60000) + public void testAMRMClientWithContainerPromotion() + throws YarnException, IOException { + AMRMClientImpl amClient = + (AMRMClientImpl) AMRMClient + .createAMRMClient(); + //asserting we are not using the singleton instance cache + Assert.assertSame(NMTokenCache.getSingleton(), + amClient.getNMTokenCache()); + amClient.init(conf); + amClient.start(); + + // start am nm client + NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient(); + Assert.assertNotNull(nmClient); + // asserting we are using the singleton instance cache + Assert.assertSame( + NMTokenCache.getSingleton(), nmClient.getNMTokenCache()); + nmClient.init(conf); + nmClient.start(); + assertEquals(STATE.STARTED, nmClient.getServiceState()); + + amClient.registerApplicationMaster("Host", 10000, ""); + // setup container request + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // START OPPORTUNISTIC Container, Send allocation request to RM + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority2, 0, + true, null, ExecutionTypeRequest + .newInstance(ExecutionType.OPPORTUNISTIC, true))); + + int oppContainersRequestedAny = + amClient.getTable(0).get(priority2, ResourceRequest.ANY, + ExecutionType.OPPORTUNISTIC, capability).remoteRequest + .getNumContainers(); + + assertEquals(1, oppContainersRequestedAny); + assertEquals(1, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + Map allocatedOpportContainers = new HashMap<>(); + int iterationsLeft = 50; + + amClient.getNMTokenCache().clearCache(); + assertEquals(0, + amClient.getNMTokenCache().numberOfTokensInCache()); + + AllocateResponse allocResponse = null; + while (allocatedContainerCount < oppContainersRequestedAny + && iterationsLeft-- > 0) { + allocResponse = amClient.allocate(0.1f); + // let NM heartbeat to RM and trigger allocations + //triggerSchedulingWithNMHeartBeat(); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + allocatedContainerCount += + allocResponse.getAllocatedContainers().size(); + for (Container container : allocResponse.getAllocatedContainers()) { + if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + allocatedOpportContainers.put(container.getId(), container); + } + } + if (allocatedContainerCount < oppContainersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + + assertEquals(oppContainersRequestedAny, allocatedContainerCount); + assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size()); + + startContainer(allocResponse, nmClient); + + // SEND PROMOTION REQUEST TO RM + try { + Container c = allocatedOpportContainers.values().iterator().next(); + amClient.requestContainerUpdate( + c, UpdateContainerRequest.newInstance(c.getVersion(), + c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + null, ExecutionType.OPPORTUNISTIC)); + fail("Should throw Exception.."); + } catch (IllegalArgumentException e) { + System.out.println("## " + e.getMessage()); + assertTrue(e.getMessage().contains( + "target should be GUARANTEED and original should be OPPORTUNISTIC")); + } + + Container c = allocatedOpportContainers.values().iterator().next(); + amClient.requestContainerUpdate( + c, UpdateContainerRequest.newInstance(c.getVersion(), + c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED)); + iterationsLeft = 120; + Map updatedContainers = new HashMap<>(); + // do a few iterations to ensure RM is not going to send new containers + while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) { + // inform RM of rejection + allocResponse = amClient.allocate(0.1f); + // RM did not send new containers because AM does not need any + if (allocResponse.getUpdatedContainers() != null) { + for (UpdatedContainer updatedContainer : allocResponse + .getUpdatedContainers()) { + System.out.println("Got update.."); + updatedContainers.put(updatedContainer.getContainer().getId(), + updatedContainer); + } + } + if (iterationsLeft > 0) { + // sleep to make sure NM's heartbeat + sleep(100); + } + } + assertEquals(1, updatedContainers.size()); + + for (ContainerId cId : allocatedOpportContainers.keySet()) { + Container orig = allocatedOpportContainers.get(cId); + UpdatedContainer updatedContainer = updatedContainers.get(cId); + assertNotNull(updatedContainer); + assertEquals(ExecutionType.GUARANTEED, + updatedContainer.getContainer().getExecutionType()); + assertEquals(orig.getResource(), + updatedContainer.getContainer().getResource()); + assertEquals(orig.getNodeId(), + updatedContainer.getContainer().getNodeId()); + assertEquals(orig.getVersion() + 1, + updatedContainer.getContainer().getVersion()); + } + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // SEND UPDATE EXECTYPE UPDATE TO NM + updateContainerExecType(allocResponse, ExecutionType.GUARANTEED, nmClient); + + amClient.ask.clear(); + } + + @Test(timeout=60000) + public void testAMRMClientWithContainerDemotion() + throws YarnException, IOException { + AMRMClientImpl amClient = + (AMRMClientImpl) AMRMClient + .createAMRMClient(); + //asserting we are not using the singleton instance cache + Assert.assertSame(NMTokenCache.getSingleton(), + amClient.getNMTokenCache()); + amClient.init(conf); + amClient.start(); + + NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient(); + Assert.assertNotNull(nmClient); + // asserting we are using the singleton instance cache + Assert.assertSame( + NMTokenCache.getSingleton(), nmClient.getNMTokenCache()); + nmClient.init(conf); + nmClient.start(); + assertEquals(STATE.STARTED, nmClient.getServiceState()); + + amClient.registerApplicationMaster("Host", 10000, ""); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // START OPPORTUNISTIC Container, Send allocation request to RM + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority2, 0, + true, null, ExecutionTypeRequest + .newInstance(ExecutionType.GUARANTEED, true))); + + int oppContainersRequestedAny = + amClient.getTable(0).get(priority2, ResourceRequest.ANY, + ExecutionType.GUARANTEED, capability).remoteRequest + .getNumContainers(); + + assertEquals(1, oppContainersRequestedAny); + assertEquals(1, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + Map allocatedGuaranteedContainers = new HashMap<>(); + int iterationsLeft = 50; + + amClient.getNMTokenCache().clearCache(); + assertEquals(0, + amClient.getNMTokenCache().numberOfTokensInCache()); + + AllocateResponse allocResponse = null; + while (allocatedContainerCount < oppContainersRequestedAny + && iterationsLeft-- > 0) { + allocResponse = amClient.allocate(0.1f); + // let NM heartbeat to RM and trigger allocations + //triggerSchedulingWithNMHeartBeat(); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + allocatedContainerCount += + allocResponse.getAllocatedContainers().size(); + for (Container container : allocResponse.getAllocatedContainers()) { + if (container.getExecutionType() == ExecutionType.GUARANTEED) { + allocatedGuaranteedContainers.put(container.getId(), container); + } + } + if (allocatedContainerCount < oppContainersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + assertEquals(oppContainersRequestedAny, allocatedContainerCount); + assertEquals(oppContainersRequestedAny, + allocatedGuaranteedContainers.size()); + startContainer(allocResponse, nmClient); + + // SEND DEMOTION REQUEST TO RM + try { + Container c = allocatedGuaranteedContainers.values().iterator().next(); + amClient.requestContainerUpdate( + c, UpdateContainerRequest.newInstance(c.getVersion(), + c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED)); + fail("Should throw Exception.."); + } catch (IllegalArgumentException e) { + System.out.println("## " + e.getMessage()); + assertTrue(e.getMessage().contains( + "target should be OPPORTUNISTIC and original should be GUARANTEED")); + } + + Container c = allocatedGuaranteedContainers.values().iterator().next(); + amClient.requestContainerUpdate( + c, UpdateContainerRequest.newInstance(c.getVersion(), + c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE, + null, ExecutionType.OPPORTUNISTIC)); + iterationsLeft = 120; + Map updatedContainers = new HashMap<>(); + // do a few iterations to ensure RM is not going to send new containers + while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) { + // inform RM of rejection + allocResponse = amClient.allocate(0.1f); + // RM did not send new containers because AM does not need any + if (allocResponse.getUpdatedContainers() != null) { + for (UpdatedContainer updatedContainer : allocResponse + .getUpdatedContainers()) { + System.out.println("Got update.."); + updatedContainers.put(updatedContainer.getContainer().getId(), + updatedContainer); + } + } + if (iterationsLeft > 0) { + // sleep to make sure NM's heartbeat + sleep(100); + } + } + assertEquals(1, updatedContainers.size()); + + for (ContainerId cId : allocatedGuaranteedContainers.keySet()) { + Container orig = allocatedGuaranteedContainers.get(cId); + UpdatedContainer updatedContainer = updatedContainers.get(cId); + assertNotNull(updatedContainer); + assertEquals(ExecutionType.OPPORTUNISTIC, + updatedContainer.getContainer().getExecutionType()); + assertEquals(orig.getResource(), + updatedContainer.getContainer().getResource()); + assertEquals(orig.getNodeId(), + updatedContainer.getContainer().getNodeId()); + assertEquals(orig.getVersion() + 1, + updatedContainer.getContainer().getVersion()); + } + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + updateContainerExecType(allocResponse, ExecutionType.OPPORTUNISTIC, + nmClient); + amClient.ask.clear(); + } + + private void updateContainerExecType(AllocateResponse allocResponse, + ExecutionType expectedExecType, NMClientImpl nmClient) + throws IOException, YarnException { + for (UpdatedContainer updatedContainer : allocResponse + .getUpdatedContainers()) { + Container container = updatedContainer.getContainer(); + nmClient.increaseContainerResource(container); + // NodeManager may still need some time to get the stable + // container status + while (true) { + ContainerStatus status = nmClient + .getContainerStatus(container.getId(), container.getNodeId()); + if (status.getExecutionType() == expectedExecType) { + break; + } + sleep(10); + } + } + } + + private void startContainer(AllocateResponse allocResponse, + NMClientImpl nmClient) throws IOException, YarnException { + // START THE CONTAINER IN NM + // build container launch context + Credentials ts = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + // start a process long enough for increase/decrease action to take effect + ContainerLaunchContext clc = BuilderUtils.newContainerLaunchContext( + Collections.emptyMap(), + new HashMap(), Arrays.asList("sleep", "100"), + new HashMap(), securityTokens, + new HashMap()); + // start the containers and make sure they are in RUNNING state + for (Container container : allocResponse.getAllocatedContainers()) { + nmClient.startContainer(container, clc); + // NodeManager may still need some time to get the stable + // container status + while (true) { + ContainerStatus status = nmClient + .getContainerStatus(container.getId(), container.getNodeId()); + if (status.getState() == ContainerState.RUNNING) { + break; + } + sleep(10); + } + } + } + + private void testAllocation(final AMRMClientImpl amClient) throws YarnException, IOException { // setup container request @@ -1172,7 +1509,7 @@ private void testAllocation(final AMRMClientImpl amClient) Set releases = new TreeSet(); amClient.getNMTokenCache().clearCache(); - Assert.assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache()); + assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache()); HashMap receivedNMTokens = new HashMap(); while (allocatedContainerCount < containersRequestedAny @@ -1192,7 +1529,7 @@ private void testAllocation(final AMRMClientImpl amClient) for (NMToken token : allocResponse.getNMTokens()) { String nodeID = token.getNodeId().toString(); if (receivedNMTokens.containsKey(nodeID)) { - Assert.fail("Received token again for : " + nodeID); + fail("Received token again for : " + nodeID); } receivedNMTokens.put(nodeID, token.getToken()); } @@ -1204,7 +1541,7 @@ private void testAllocation(final AMRMClientImpl amClient) } // Should receive atleast 1 token - Assert.assertTrue(receivedNMTokens.size() > 0 + assertTrue(receivedNMTokens.size() > 0 && receivedNMTokens.size() <= nodeCount); assertEquals(allocatedContainerCount, containersRequestedAny); @@ -1444,7 +1781,7 @@ public void testAMRMClientOnAMRMTokenRollOver() throws YarnException, org.apache.hadoop.security.token.Token amrmToken_1 = getAMRMToken(); Assert.assertNotNull(amrmToken_1); - Assert.assertEquals(amrmToken_1.decodeIdentifier().getKeyId(), + assertEquals(amrmToken_1.decodeIdentifier().getKeyId(), amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId()); // Wait for enough time and make sure the roll_over happens @@ -1459,7 +1796,7 @@ public void testAMRMClientOnAMRMTokenRollOver() throws YarnException, org.apache.hadoop.security.token.Token amrmToken_2 = getAMRMToken(); Assert.assertNotNull(amrmToken_2); - Assert.assertEquals(amrmToken_2.decodeIdentifier().getKeyId(), + assertEquals(amrmToken_2.decodeIdentifier().getKeyId(), amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId()); Assert.assertNotEquals(amrmToken_1, amrmToken_2); @@ -1474,7 +1811,7 @@ public void testAMRMClientOnAMRMTokenRollOver() throws YarnException, AMRMTokenIdentifierForTest newVersionTokenIdentifier = new AMRMTokenIdentifierForTest(amrmToken_2.decodeIdentifier(), "message"); - Assert.assertEquals("Message is changed after set to newVersionTokenIdentifier", + assertEquals("Message is changed after set to newVersionTokenIdentifier", "message", newVersionTokenIdentifier.getMessage()); org.apache.hadoop.security.token.Token newVersionToken = new org.apache.hadoop.security.token.Token ( @@ -1530,10 +1867,10 @@ public ApplicationMasterProtocol run() { .getBindAddress(), conf); } }).allocate(Records.newRecord(AllocateRequest.class)); - Assert.fail("The old Token should not work"); + fail("The old Token should not work"); } catch (Exception ex) { - Assert.assertTrue(ex instanceof InvalidToken); - Assert.assertTrue(ex.getMessage().contains( + assertTrue(ex instanceof InvalidToken); + assertTrue(ex.getMessage().contains( "Invalid AMRMToken from " + amrmToken_2.decodeIdentifier().getApplicationAttemptId())); } @@ -1560,7 +1897,7 @@ public ApplicationMasterProtocol run() { org.apache.hadoop.security.token.Token token = iter.next(); if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { if (result != null) { - Assert.fail("credentials has more than one AMRM token." + fail("credentials has more than one AMRM token." + " token1: " + result + " token2: " + token); } result = (org.apache.hadoop.security.token.Token) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 6bd0816206..9b79e2d6fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -301,7 +301,6 @@ private void testContainerManagement(NMClientImpl nmClient, assertTrue("The thrown exception is not expected", e.getMessage().contains("is not handled by this NodeManager")); } - // increaseContainerResource shouldn't be called before startContainer, // otherwise, NodeManager cannot find the container try { @@ -475,10 +474,10 @@ private void testIncreaseContainerResource(Container container) try { nmClient.increaseContainerResource(container); } catch (YarnException e) { - // NM container will only be in SCHEDULED state, so expect the increase - // action to fail. + // NM container increase container resource should fail without a version + // increase action to fail. if (!e.getMessage().contains( - "can only be changed when a container is in RUNNING state")) { + container.getId() + " has update version ")) { throw (AssertionError) (new AssertionError("Exception is not expected: " + e) .initCause(e)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 84ed3c1b3c..a1e8ca0bfc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LogAggregationContext; @@ -136,13 +137,14 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ChangeMonitoringContainerResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; @@ -410,8 +412,24 @@ private void recoverContainer(RecoveredContainerState rcs) throws IOException { StartContainerRequest req = rcs.getStartRequest(); ContainerLaunchContext launchContext = req.getContainerLaunchContext(); - ContainerTokenIdentifier token = - BuilderUtils.newContainerTokenIdentifier(req.getContainerToken()); + ContainerTokenIdentifier token = null; + if(rcs.getCapability() != null) { + ContainerTokenIdentifier originalToken = + BuilderUtils.newContainerTokenIdentifier(req.getContainerToken()); + token = new ContainerTokenIdentifier(originalToken.getContainerID(), + originalToken.getVersion(), originalToken.getNmHostAddress(), + originalToken.getApplicationSubmitter(), rcs.getCapability(), + originalToken.getExpiryTimeStamp(), originalToken.getMasterKeyId(), + originalToken.getRMIdentifier(), originalToken.getPriority(), + originalToken.getCreationTime(), + originalToken.getLogAggregationContext(), + originalToken.getNodeLabelExpression(), + originalToken.getContainerType(), originalToken.getExecutionType()); + + } else { + token = BuilderUtils.newContainerTokenIdentifier(req.getContainerToken()); + } + ContainerId containerId = token.getContainerID(); ApplicationId appId = containerId.getApplicationAttemptId().getApplicationId(); @@ -1183,9 +1201,7 @@ public ContainerUpdateResponse updateContainer(ContainerUpdateRequest // as container resource increase request will have come with // an updated NMToken. updateNMTokenIdentifier(nmTokenIdentifier); - Resource resource = containerTokenIdentifier.getResource(); - changeContainerResourceInternal(containerId, - containerTokenIdentifier.getVersion(), resource, true); + updateContainerInternal(containerId, containerTokenIdentifier); successfullyUpdatedContainers.add(containerId); } catch (YarnException | InvalidToken e) { failedContainers.put(containerId, SerializedException.newInstance(e)); @@ -1199,9 +1215,9 @@ public ContainerUpdateResponse updateContainer(ContainerUpdateRequest } @SuppressWarnings("unchecked") - private void changeContainerResourceInternal(ContainerId containerId, - int containerVersion, Resource targetResource, boolean increase) - throws YarnException, IOException { + private void updateContainerInternal(ContainerId containerId, + ContainerTokenIdentifier containerTokenIdentifier) + throws YarnException, IOException { Container container = context.getContainers().get(containerId); // Check container existence if (container == null) { @@ -1213,64 +1229,77 @@ private void changeContainerResourceInternal(ContainerId containerId, + " is not handled by this NodeManager"); } } + // Check container version. + int currentVersion = container.getContainerTokenIdentifier().getVersion(); + if (containerTokenIdentifier.getVersion() <= currentVersion) { + throw RPCUtil.getRemoteException("Container " + containerId.toString() + + " has update version [" + currentVersion + "] >= requested version" + + " [" + containerTokenIdentifier.getVersion() + "]"); + } + // Check container state org.apache.hadoop.yarn.server.nodemanager. containermanager.container.ContainerState currentState = container.getContainerState(); if (currentState != org.apache.hadoop.yarn.server. - nodemanager.containermanager.container.ContainerState.RUNNING) { + nodemanager.containermanager.container.ContainerState.RUNNING && + currentState != org.apache.hadoop.yarn.server. + nodemanager.containermanager.container.ContainerState.SCHEDULED) { throw RPCUtil.getRemoteException("Container " + containerId.toString() + " is in " + currentState.name() + " state." + " Resource can only be changed when a container is in" - + " RUNNING state"); + + " RUNNING or SCHEDULED state"); } + // Check validity of the target resource. Resource currentResource = container.getResource(); - if (currentResource.equals(targetResource)) { - LOG.warn("Unable to change resource for container " - + containerId.toString() - + ". The target resource " - + targetResource.toString() - + " is the same as the current resource"); - return; + ExecutionType currentExecType = + container.getContainerTokenIdentifier().getExecutionType(); + boolean isResourceChange = false; + boolean isExecTypeUpdate = false; + Resource targetResource = containerTokenIdentifier.getResource(); + ExecutionType targetExecType = containerTokenIdentifier.getExecutionType(); + + // Is true if either the resources has increased or execution type + // updated from opportunistic to guaranteed + boolean isIncrease = false; + if (!currentResource.equals(targetResource)) { + isResourceChange = true; + isIncrease = Resources.fitsIn(currentResource, targetResource) + && !Resources.fitsIn(targetResource, currentResource); + } else if (!currentExecType.equals(targetExecType)) { + isExecTypeUpdate = true; + isIncrease = currentExecType == ExecutionType.OPPORTUNISTIC && + targetExecType == ExecutionType.GUARANTEED; } - if (increase && !Resources.fitsIn(currentResource, targetResource)) { - throw RPCUtil.getRemoteException("Unable to increase resource for " - + "container " + containerId.toString() - + ". The target resource " - + targetResource.toString() - + " is smaller than the current resource " - + currentResource.toString()); - } - if (!increase && - (!Resources.fitsIn(Resources.none(), targetResource) - || !Resources.fitsIn(targetResource, currentResource))) { - throw RPCUtil.getRemoteException("Unable to decrease resource for " - + "container " + containerId.toString() - + ". The target resource " - + targetResource.toString() - + " is not smaller than the current resource " - + currentResource.toString()); - } - if (increase) { - org.apache.hadoop.yarn.api.records.Container increasedContainer = - org.apache.hadoop.yarn.api.records.Container.newInstance( - containerId, null, null, targetResource, null, null); + if (isIncrease) { + org.apache.hadoop.yarn.api.records.Container increasedContainer = null; + if (isResourceChange) { + increasedContainer = + org.apache.hadoop.yarn.api.records.Container.newInstance( + containerId, null, null, targetResource, null, null, + currentExecType); + } else { + increasedContainer = + org.apache.hadoop.yarn.api.records.Container.newInstance( + containerId, null, null, currentResource, null, null, + targetExecType); + } if (context.getIncreasedContainers().putIfAbsent(containerId, increasedContainer) != null){ throw RPCUtil.getRemoteException("Container " + containerId.toString() - + " resource is being increased."); + + " resource is being increased -or- " + + "is undergoing ExecutionType promoted."); } } this.readLock.lock(); try { if (!serviceStopped) { - // Persist container resource change for recovery - this.context.getNMStateStore().storeContainerResourceChanged( - containerId, containerVersion, targetResource); - getContainersMonitor().handle( - new ChangeMonitoringContainerResourceEvent( - containerId, targetResource)); + // Dispatch message to ContainerScheduler to actually + // make the change. + dispatcher.getEventHandler().handle(new UpdateContainerSchedulerEvent( + container, containerTokenIdentifier, isResourceChange, + isExecTypeUpdate, isIncrease)); } else { throw new YarnException( "Unable to change container resource as the NodeManager is " @@ -1571,8 +1600,11 @@ public void handle(ContainerManagerEvent event) { for (org.apache.hadoop.yarn.api.records.Container container : containersDecreasedEvent.getContainersToDecrease()) { try { - changeContainerResourceInternal(container.getId(), - container.getVersion(), container.getResource(), false); + ContainerTokenIdentifier containerTokenIdentifier = + BuilderUtils.newContainerTokenIdentifier( + container.getContainerToken()); + updateContainerInternal(container.getId(), + containerTokenIdentifier); } catch (YarnException e) { LOG.error("Unable to decrease container resource", e); } catch (IOException e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index bd3f06d1fc..f6e567c19e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -39,10 +39,10 @@ public interface Container extends EventHandler { Resource getResource(); - void setResource(Resource targetResource); - ContainerTokenIdentifier getContainerTokenIdentifier(); + void setContainerTokenIdentifier(ContainerTokenIdentifier token); + String getUser(); ContainerState getContainerState(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index c0aa6b0d63..734a27ba83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -148,9 +148,8 @@ private ReInitializationContext createContextForRollback() { private final Credentials credentials; private final NodeManagerMetrics metrics; private volatile ContainerLaunchContext launchContext; - private final ContainerTokenIdentifier containerTokenIdentifier; + private volatile ContainerTokenIdentifier containerTokenIdentifier; private final ContainerId containerId; - private volatile Resource resource; private final String user; private int version; private int exitCode = ContainerExitStatus.INVALID; @@ -201,7 +200,6 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE); this.containerTokenIdentifier = containerTokenIdentifier; this.containerId = containerTokenIdentifier.getContainerID(); - this.resource = containerTokenIdentifier.getResource(); this.diagnostics = new StringBuilder(); this.credentials = creds; this.metrics = metrics; @@ -269,13 +267,6 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.exitCode = rcs.getExitCode(); this.recoveredAsKilled = rcs.getKilled(); this.diagnostics.append(rcs.getDiagnostics()); - Resource recoveredCapability = rcs.getCapability(); - if (recoveredCapability != null - && !this.resource.equals(recoveredCapability)) { - // resource capability had been updated before NM was down - this.resource = Resource.newInstance(recoveredCapability.getMemorySize(), - recoveredCapability.getVirtualCores()); - } this.version = rcs.getVersion(); this.remainingRetryAttempts = rcs.getRemainingRetryAttempts(); this.workDir = rcs.getWorkDir(); @@ -640,14 +631,8 @@ public ContainerId getContainerId() { @Override public Resource getResource() { - return Resources.clone(this.resource); - } - - @Override - public void setResource(Resource targetResource) { - Resource currentResource = getResource(); - this.resource = Resources.clone(targetResource); - this.metrics.changeContainer(currentResource, targetResource); + return Resources.clone( + this.containerTokenIdentifier.getResource()); } @Override @@ -660,6 +645,16 @@ public ContainerTokenIdentifier getContainerTokenIdentifier() { } } + @Override + public void setContainerTokenIdentifier(ContainerTokenIdentifier token) { + this.writeLock.lock(); + try { + this.containerTokenIdentifier = token; + } finally { + this.writeLock.unlock(); + } + } + @Override public String getWorkDir() { return workDir; @@ -833,7 +828,8 @@ public ContainerState transition(ContainerImpl container, AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl", container.containerId.getApplicationAttemptId().getApplicationId(), container.containerId); - container.metrics.releaseContainer(container.resource); + container.metrics.releaseContainer( + container.containerTokenIdentifier.getResource()); container.sendFinishedEvents(); return ContainerState.DONE; } @@ -1517,7 +1513,8 @@ static class ContainerDoneTransition implements @Override @SuppressWarnings("unchecked") public void transition(ContainerImpl container, ContainerEvent event) { - container.metrics.releaseContainer(container.resource); + container.metrics.releaseContainer( + container.containerTokenIdentifier.getResource()); if (container.containerMetrics != null) { container.containerMetrics .recordFinishTimeAndExitCode(clock.getTime(), container.exitCode); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 6ee60bd17a..13e74917af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -741,19 +741,6 @@ private String formatUsageString(long currentVmemUsage, long vmemLimit, } } - private void changeContainerResource( - ContainerId containerId, Resource resource) { - Container container = context.getContainers().get(containerId); - // Check container existence - if (container == null) { - LOG.warn("Container " + containerId.toString() + "does not exist"); - return; - } - // YARN-5860: Route this through the ContainerScheduler to - // fix containerAllocation - container.setResource(resource); - } - private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) { if (!containerMetricsEnabled || monitoringEvent == null) { return; @@ -902,8 +889,6 @@ private void onChangeMonitoringContainerResource( int cpuVcores = changeEvent.getResource().getVirtualCores(); processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores); } - - changeContainerResource(containerId, changeEvent.getResource()); } private void onStopMonitoringContainer( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 60d6213d9a..19b450556d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -31,6 +31,9 @@ import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor + .ChangeMonitoringContainerResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; @@ -136,6 +139,13 @@ public void handle(ContainerSchedulerEvent event) { case CONTAINER_COMPLETED: onContainerCompleted(event.getContainer()); break; + case UPDATE_CONTAINER: + if (event instanceof UpdateContainerSchedulerEvent) { + onUpdateContainer((UpdateContainerSchedulerEvent) event); + } else { + LOG.error("Unknown event type on UpdateCOntainer: " + event.getType()); + } + break; case SHED_QUEUED_CONTAINERS: shedQueuedOpportunisticContainers(); break; @@ -145,6 +155,69 @@ public void handle(ContainerSchedulerEvent event) { } } + /** + * We assume that the ContainerManager has already figured out what kind + * of update this is. + */ + private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) { + ContainerId containerId = updateEvent.getContainer().getContainerId(); + if (updateEvent.isResourceChange()) { + if (runningContainers.containsKey(containerId)) { + this.utilizationTracker.subtractContainerResource( + updateEvent.getContainer()); + updateEvent.getContainer().setContainerTokenIdentifier( + updateEvent.getUpdatedToken()); + this.utilizationTracker.addContainerResources( + updateEvent.getContainer()); + getContainersMonitor().handle( + new ChangeMonitoringContainerResourceEvent(containerId, + updateEvent.getUpdatedToken().getResource())); + } else { + updateEvent.getContainer().setContainerTokenIdentifier( + updateEvent.getUpdatedToken()); + } + try { + // Persist change in the state store. + this.context.getNMStateStore().storeContainerResourceChanged( + containerId, + updateEvent.getUpdatedToken().getVersion(), + updateEvent.getUpdatedToken().getResource()); + } catch (IOException e) { + LOG.warn("Could not store container [" + containerId + "] resource " + + "change..", e); + } + } + + if (updateEvent.isExecTypeUpdate()) { + updateEvent.getContainer().setContainerTokenIdentifier( + updateEvent.getUpdatedToken()); + // If this is a running container.. just change the execution type + // and be done with it. + if (!runningContainers.containsKey(containerId)) { + // Promotion or not (Increase signifies either a promotion + // or container size increase) + if (updateEvent.isIncrease()) { + // Promotion of queued container.. + if (queuedOpportunisticContainers.remove(containerId) != null) { + queuedGuaranteedContainers.put(containerId, + updateEvent.getContainer()); + } + //Kill opportunistic containers if any to make room for + // promotion request + killOpportunisticContainers(updateEvent.getContainer()); + } else { + // Demotion of queued container.. Should not happen too often + // since you should not find too many queued guaranteed + // containers + if (queuedGuaranteedContainers.remove(containerId) != null) { + queuedOpportunisticContainers.put(containerId, + updateEvent.getContainer()); + } + } + } + } + } + /** * Return number of queued containers. * @return Number of queued containers. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index 086cb9bd5a..917eda09af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -24,6 +24,7 @@ public enum ContainerSchedulerEventType { SCHEDULE_CONTAINER, CONTAINER_COMPLETED, + UPDATE_CONTAINER, // Producer: Node HB response - RM has asked to shed the queue SHED_QUEUED_CONTAINERS, } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java new file mode 100644 index 0000000000..5384b7e8db --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .Container; +/** + * Update Event consumed by the {@link ContainerScheduler}. + */ +public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent { + + private ContainerTokenIdentifier updatedToken; + private boolean isResourceChange; + private boolean isExecTypeUpdate; + private boolean isIncrease; + + /** + * Create instance of Event. + * + * @param originalContainer Original Container. + * @param updatedToken Updated Container Token. + * @param isResourceChange is this a Resource Change. + * @param isExecTypeUpdate is this an ExecTypeUpdate. + * @param isIncrease is this a Container Increase. + */ + public UpdateContainerSchedulerEvent(Container originalContainer, + ContainerTokenIdentifier updatedToken, boolean isResourceChange, + boolean isExecTypeUpdate, boolean isIncrease) { + super(originalContainer, ContainerSchedulerEventType.UPDATE_CONTAINER); + this.updatedToken = updatedToken; + this.isResourceChange = isResourceChange; + this.isExecTypeUpdate = isExecTypeUpdate; + this.isIncrease = isIncrease; + } + + /** + * Update Container Token. + * + * @return Container Token. + */ + public ContainerTokenIdentifier getUpdatedToken() { + return updatedToken; + } + + /** + * isResourceChange. + * @return isResourceChange. + */ + public boolean isResourceChange() { + return isResourceChange; + } + + /** + * isExecTypeUpdate. + * @return isExecTypeUpdate. + */ + public boolean isExecTypeUpdate() { + return isExecTypeUpdate; + } + + /** + * isIncrease. + * @return isIncrease. + */ + public boolean isIncrease() { + return isIncrease; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index 0c025ac97d..b8cd7ddf9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -682,7 +682,7 @@ public void run() { try{ try { updateBarrier.await(); - increaseTokens.add(getContainerToken(targetResource)); + increaseTokens.add(getContainerToken(targetResource, 1)); ContainerUpdateRequest updateRequest = ContainerUpdateRequest.newInstance(increaseTokens); ContainerUpdateResponse updateResponse = @@ -710,6 +710,15 @@ private Token getContainerToken(Resource resource) throws IOException { getNMContext().getNodeId(), user, resource, getNMContext().getContainerTokenSecretManager(), null); } + + private Token getContainerToken(Resource resource, int version) + throws IOException { + ContainerId cId = TestContainerManager.createContainerId(0); + return TestContainerManager.createContainerToken( + cId, version, DUMMY_RM_IDENTIFIER, + getNMContext().getNodeId(), user, resource, + getNMContext().getContainerTokenSecretManager(), null); + } } public static NMContainerStatus createNMContainerStatus(int id, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index d266ac16e2..6c96a47587 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -421,6 +421,20 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier, containerTokenIdentifier); } + public static Token createContainerToken(ContainerId cId, int version, + long rmIdentifier, NodeId nodeId, String user, Resource resource, + NMContainerTokenSecretManager containerTokenSecretManager, + LogAggregationContext logAggregationContext) throws IOException { + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier(cId, version, nodeId.toString(), user, + resource, System.currentTimeMillis() + 100000L, 123, rmIdentifier, + Priority.newInstance(0), 0, logAggregationContext, null, + ContainerType.TASK, ExecutionType.GUARANTEED); + return BuilderUtils.newContainerToken(nodeId, + containerTokenSecretManager.retrievePassword(containerTokenIdentifier), + containerTokenIdentifier); + } + public static Token createContainerToken(ContainerId cId, long rmIdentifier, NodeId nodeId, String user, Resource resource, NMContainerTokenSecretManager containerTokenSecretManager, @@ -431,8 +445,23 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier, System.currentTimeMillis() + 100000L, 123, rmIdentifier, Priority.newInstance(0), 0, logAggregationContext, null, ContainerType.TASK, executionType); - return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager - .retrievePassword(containerTokenIdentifier), + return BuilderUtils.newContainerToken(nodeId, + containerTokenSecretManager.retrievePassword(containerTokenIdentifier), + containerTokenIdentifier); + } + + public static Token createContainerToken(ContainerId cId, int version, + long rmIdentifier, NodeId nodeId, String user, Resource resource, + NMContainerTokenSecretManager containerTokenSecretManager, + LogAggregationContext logAggregationContext, ExecutionType executionType) + throws IOException { + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier(cId, version, nodeId.toString(), user, + resource, System.currentTimeMillis() + 100000L, 123, rmIdentifier, + Priority.newInstance(0), 0, logAggregationContext, null, + ContainerType.TASK, executionType); + return BuilderUtils.newContainerToken(nodeId, + containerTokenSecretManager.retrievePassword(containerTokenIdentifier), containerTokenIdentifier); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 24d46b6df8..9844225ff8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -80,14 +82,15 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ConfigurationException; import org.apache.hadoop.yarn.exceptions.InvalidContainerException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; -import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -100,6 +103,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Before; @@ -116,10 +120,34 @@ public TestContainerManager() throws UnsupportedFileSystemException { static { LOG = LogFactory.getLog(TestContainerManager.class); } - + + private boolean delayContainers = false; + + @Override + protected ContainerExecutor createContainerExecutor() { + DefaultContainerExecutor exec = new DefaultContainerExecutor() { + @Override + public int launchContainer(ContainerStartContext ctx) + throws IOException, ConfigurationException { + if (delayContainers) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + // Nothing.. + } + } + return super.launchContainer(ctx); + } + }; + exec.setConf(conf); + return spy(exec); + } + @Override @Before public void setup() throws IOException { + conf.setInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10); super.setup(); } @@ -1468,7 +1496,7 @@ public void testNullTokens() throws Exception { Assert.assertEquals(strExceptionMsg, ContainerManagerImpl.INVALID_NMTOKEN_MSG); - ContainerManagerImpl spyContainerMgr = Mockito.spy(cMgrImpl); + ContainerManagerImpl spyContainerMgr = spy(cMgrImpl); UserGroupInformation ugInfo = UserGroupInformation.createRemoteUser("a"); Mockito.when(spyContainerMgr.getRemoteUgi()).thenReturn(ugInfo); Mockito.when(spyContainerMgr. @@ -1543,7 +1571,7 @@ public void testIncreaseContainerResourceWithInvalidRequests() throws Exception // container will have exited, and won't be in RUNNING state ContainerId cId0 = createContainerId(0); Token containerToken = - createContainerToken(cId0, DUMMY_RM_IDENTIFIER, + createContainerToken(cId0, 1, DUMMY_RM_IDENTIFIER, context.getNodeId(), user, Resource.newInstance(1234, 3), context.getContainerTokenSecretManager(), null); @@ -1572,7 +1600,7 @@ public void testIncreaseContainerResourceWithInvalidRequests() throws Exception if (cId0.equals(entry.getKey())) { Assert.assertTrue(entry.getValue().getMessage() .contains("Resource can only be changed when a " - + "container is in RUNNING state")); + + "container is in RUNNING or SCHEDULED state")); } else if (cId7.equals(entry.getKey())) { Assert.assertTrue(entry.getValue().getMessage() .contains("Container " + cId7.toString() @@ -1584,89 +1612,6 @@ public void testIncreaseContainerResourceWithInvalidRequests() throws Exception } } - @Test - public void testIncreaseContainerResourceWithInvalidResource() throws Exception { - containerManager.start(); - File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); - PrintWriter fileWriter = new PrintWriter(scriptFile); - // Construct the Container-id - ContainerId cId = createContainerId(0); - if (Shell.WINDOWS) { - fileWriter.println("@ping -n 100 127.0.0.1 >nul"); - } else { - fileWriter.write("\numask 0"); - fileWriter.write("\nexec sleep 100"); - } - fileWriter.close(); - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - URL resource_alpha = - URL.fromPath(localFS - .makeQualified(new Path(scriptFile.getAbsolutePath()))); - LocalResource rsrc_alpha = - recordFactory.newRecordInstance(LocalResource.class); - rsrc_alpha.setResource(resource_alpha); - rsrc_alpha.setSize(-1); - rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); - rsrc_alpha.setType(LocalResourceType.FILE); - rsrc_alpha.setTimestamp(scriptFile.lastModified()); - String destinationFile = "dest_file"; - Map localResources = - new HashMap(); - localResources.put(destinationFile, rsrc_alpha); - containerLaunchContext.setLocalResources(localResources); - List commands = - Arrays.asList(Shell.getRunScriptCommand(scriptFile)); - containerLaunchContext.setCommands(commands); - - StartContainerRequest scRequest = - StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), - user, context.getContainerTokenSecretManager())); - List list = new ArrayList<>(); - list.add(scRequest); - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - containerManager.startContainers(allRequests); - // Make sure the container reaches RUNNING state - BaseContainerManagerTest.waitForNMContainerState(containerManager, cId, - org.apache.hadoop.yarn.server.nodemanager. - containermanager.container.ContainerState.RUNNING); - // Construct container resource increase request, - List increaseTokens = new ArrayList<>(); - // Add increase request. The increase request should fail - // as the current resource does not fit in the target resource - Token containerToken = - createContainerToken(cId, DUMMY_RM_IDENTIFIER, - context.getNodeId(), user, - Resource.newInstance(512, 1), - context.getContainerTokenSecretManager(), null); - increaseTokens.add(containerToken); - ContainerUpdateRequest updateRequest = - ContainerUpdateRequest.newInstance(increaseTokens); - ContainerUpdateResponse updateResponse = - containerManager.updateContainer(updateRequest); - // Check response - Assert.assertEquals( - 0, updateResponse.getSuccessfullyUpdatedContainers().size()); - Assert.assertEquals(1, updateResponse.getFailedRequests().size()); - for (Map.Entry entry : updateResponse - .getFailedRequests().entrySet()) { - if (cId.equals(entry.getKey())) { - Assert.assertNotNull("Failed message", entry.getValue().getMessage()); - Assert.assertTrue(entry.getValue().getMessage() - .contains("The target resource " - + Resource.newInstance(512, 1).toString() - + " is smaller than the current resource " - + Resource.newInstance(1024, 1))); - } else { - throw new YarnException("Received failed request from wrong" - + " container: " + entry.getKey().toString()); - } - } - } - @Test public void testChangeContainerResource() throws Exception { containerManager.start(); @@ -1720,7 +1665,7 @@ public void testChangeContainerResource() throws Exception { List increaseTokens = new ArrayList<>(); // Add increase request. Resource targetResource = Resource.newInstance(4096, 2); - Token containerToken = createContainerToken(cId, DUMMY_RM_IDENTIFIER, + Token containerToken = createContainerToken(cId, 1, DUMMY_RM_IDENTIFIER, context.getNodeId(), user, targetResource, context.getContainerTokenSecretManager(), null); increaseTokens.add(containerToken); @@ -1741,15 +1686,19 @@ public void testChangeContainerResource() throws Exception { // Check status immediately as resource increase is blocking assertEquals(targetResource, containerStatus.getCapability()); // Simulate a decrease request - List containersToDecrease - = new ArrayList<>(); + List decreaseTokens = new ArrayList<>(); targetResource = Resource.newInstance(2048, 2); - org.apache.hadoop.yarn.api.records.Container decreasedContainer = - org.apache.hadoop.yarn.api.records.Container - .newInstance(cId, null, null, targetResource, null, null); - containersToDecrease.add(decreasedContainer); - containerManager.handle( - new CMgrDecreaseContainersResourceEvent(containersToDecrease)); + Token token = createContainerToken(cId, 2, DUMMY_RM_IDENTIFIER, + context.getNodeId(), user, targetResource, + context.getContainerTokenSecretManager(), null); + decreaseTokens.add(token); + updateRequest = ContainerUpdateRequest.newInstance(decreaseTokens); + updateResponse = containerManager.updateContainer(updateRequest); + + Assert.assertEquals( + 1, updateResponse.getSuccessfullyUpdatedContainers().size()); + Assert.assertTrue(updateResponse.getFailedRequests().isEmpty()); + // Check status with retry containerStatus = containerManager .getContainerStatuses(gcsRequest).getContainerStatuses().get(0); @@ -1879,7 +1828,7 @@ public void testStartContainerFailureWithInvalidLocalResource() ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); ContainerLaunchContext spyContainerLaunchContext = - Mockito.spy(containerLaunchContext); + spy(containerLaunchContext); Mockito.when(spyContainerLaunchContext.getLocalResources()) .thenReturn(localResources); @@ -1924,7 +1873,7 @@ public void testStartContainerFailureWithNullTypeLocalResource() ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); ContainerLaunchContext spyContainerLaunchContext = - Mockito.spy(containerLaunchContext); + spy(containerLaunchContext); Mockito.when(spyContainerLaunchContext.getLocalResources()) .thenReturn(localResources); @@ -1969,7 +1918,7 @@ public void testStartContainerFailureWithNullVisibilityLocalResource() ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); ContainerLaunchContext spyContainerLaunchContext = - Mockito.spy(containerLaunchContext); + spy(containerLaunchContext); Mockito.when(spyContainerLaunchContext.getLocalResources()) .thenReturn(localResources); @@ -1996,4 +1945,122 @@ public void testStartContainerFailureWithNullVisibilityLocalResource() Assert.assertTrue(response.getFailedRequests().get(cId).getMessage() .contains("Null resource visibility for local resource")); } + + @Test + public void testContainerUpdateExecTypeOpportunisticToGuaranteed() + throws IOException, YarnException, InterruptedException { + delayContainers = true; + containerManager.start(); + // Construct the Container-id + ContainerId cId = createContainerId(0); + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(cId, DUMMY_RM_IDENTIFIER, + context.getNodeId(), user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC)); + List list = new ArrayList<>(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + // Make sure the container reaches RUNNING state + BaseContainerManagerTest.waitForNMContainerState(containerManager, cId, + org.apache.hadoop.yarn.server.nodemanager. + containermanager.container.ContainerState.RUNNING); + // Construct container resource increase request, + List updateTokens = new ArrayList<>(); + Token containerToken = + createContainerToken(cId, 1, DUMMY_RM_IDENTIFIER, context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED); + updateTokens.add(containerToken); + ContainerUpdateRequest updateRequest = + ContainerUpdateRequest.newInstance(updateTokens); + ContainerUpdateResponse updateResponse = + containerManager.updateContainer(updateRequest); + + Assert.assertEquals( + 1, updateResponse.getSuccessfullyUpdatedContainers().size()); + Assert.assertTrue(updateResponse.getFailedRequests().isEmpty()); + + //Make sure the container is running + List statList = new ArrayList(); + statList.add(cId); + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + Assert.assertEquals(1, containerStatuses.size()); + for (ContainerStatus status : containerStatuses) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + Assert.assertEquals(ExecutionType.GUARANTEED, status.getExecutionType()); + } + } + + @Test + public void testContainerUpdateExecTypeGuaranteedToOpportunistic() + throws IOException, YarnException, InterruptedException { + delayContainers = true; + containerManager.start(); + // Construct the Container-id + ContainerId cId = createContainerId(0); + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(cId, DUMMY_RM_IDENTIFIER, + context.getNodeId(), user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null)); + List list = new ArrayList<>(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + // Make sure the container reaches RUNNING state + BaseContainerManagerTest.waitForNMContainerState(containerManager, cId, + org.apache.hadoop.yarn.server.nodemanager. + containermanager.container.ContainerState.RUNNING); + // Construct container resource increase request, + List updateTokens = new ArrayList<>(); + Token containerToken = + createContainerToken(cId, 1, DUMMY_RM_IDENTIFIER, context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC); + updateTokens.add(containerToken); + ContainerUpdateRequest updateRequest = + ContainerUpdateRequest.newInstance(updateTokens); + ContainerUpdateResponse updateResponse = + containerManager.updateContainer(updateRequest); + + Assert.assertEquals( + 1, updateResponse.getSuccessfullyUpdatedContainers().size()); + Assert.assertTrue(updateResponse.getFailedRequests().isEmpty()); + + //Make sure the container is running + List statList = new ArrayList(); + statList.add(cId); + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + Assert.assertEquals(1, containerStatuses.size()); + for (ContainerStatus status : containerStatuses) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + Assert + .assertEquals(ExecutionType.OPPORTUNISTIC, status.getExecutionType()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index d2bd79ced2..224e99cf9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -652,7 +652,7 @@ private ContainerUpdateResponse updateContainers( final List increaseTokens = new ArrayList(); // add increase request Token containerToken = TestContainerManager.createContainerToken( - cid, 0, context.getNodeId(), user.getShortUserName(), + cid, 1, 0, context.getNodeId(), user.getShortUserName(), capability, context.getContainerTokenSecretManager(), null); increaseTokens.add(containerToken); final ContainerUpdateRequest updateRequest = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java index aeba399ca0..a1c247bf57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java @@ -27,6 +27,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; @@ -37,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ConfigurationException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -951,4 +954,97 @@ public void testStopQueuedContainer() throws Exception { map.get(org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) .getContainerId()); } + + /** + * Starts one OPPORTUNISTIC container that takes up the whole node's + * resources, and submit one more that will be queued. Now promote the + * queued OPPORTUNISTIC container, which should kill the current running + * OPPORTUNISTIC container to make room for the promoted request. + * @throws Exception + */ + @Test + public void testPromotionOfOpportunisticContainers() throws Exception { + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + Thread.sleep(5000); + + // Ensure first container is running and others are queued. + List statList = new ArrayList(); + for (int i = 0; i < 3; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = GetContainerStatusesRequest + .newInstance(Arrays.asList(createContainerId(0))); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(createContainerId(0))) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } else { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED, + status.getState()); + } + } + + ContainerScheduler containerScheduler = + containerManager.getContainerScheduler(); + // Ensure two containers are properly queued. + Assert.assertEquals(1, containerScheduler.getNumQueuedContainers()); + Assert.assertEquals(0, + containerScheduler.getNumQueuedGuaranteedContainers()); + Assert.assertEquals(1, + containerScheduler.getNumQueuedOpportunisticContainers()); + + // Promote Queued Opportunistic Container + Token updateToken = + createContainerToken(createContainerId(1), 1, DUMMY_RM_IDENTIFIER, + context.getNodeId(), user, BuilderUtils.newResource(1024, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED); + List updateTokens = new ArrayList(); + updateTokens.add(updateToken); + ContainerUpdateRequest updateRequest = + ContainerUpdateRequest.newInstance(updateTokens); + ContainerUpdateResponse updateResponse = + containerManager.updateContainer(updateRequest); + + Assert.assertEquals(1, + updateResponse.getSuccessfullyUpdatedContainers().size()); + Assert.assertEquals(0, updateResponse.getFailedRequests().size()); + + waitForContainerState(containerManager, createContainerId(0), + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE); + + waitForContainerState(containerManager, createContainerId(1), + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); + + // Ensure no containers are queued. + Assert.assertEquals(0, containerScheduler.getNumQueuedContainers()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 022baeac49..4561e85c87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -140,7 +140,7 @@ public Resource getResource() { } @Override - public void setResource(Resource targetResource) { + public void setContainerTokenIdentifier(ContainerTokenIdentifier token) { } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 8b2f9db927..397d507b04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -655,7 +655,7 @@ private Container updateContainerAndNMToken(RMContainer rmContainer, container.getNodeId(), getUser(), container.getResource(), container.getPriority(), rmContainer.getCreationTime(), this.logAggregationContext, rmContainer.getNodeLabelExpression(), - containerType)); + containerType, container.getExecutionType())); updateNMToken(container); } catch (IllegalArgumentException e) { // DNS might be down, skip returning this container. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java index 8c422551f6..677aa14d29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java @@ -186,6 +186,31 @@ public Token createContainerToken(ContainerId containerId, null, null, ContainerType.TASK); } + /** + * Helper function for creating ContainerTokens. + * + * @param containerId containerId. + * @param containerVersion containerVersion. + * @param nodeId nodeId. + * @param appSubmitter appSubmitter. + * @param capability capability. + * @param priority priority. + * @param createTime createTime. + * @param logAggregationContext logAggregationContext. + * @param nodeLabelExpression nodeLabelExpression. + * @param containerType containerType. + * @return the container-token. + */ + public Token createContainerToken(ContainerId containerId, + int containerVersion, NodeId nodeId, String appSubmitter, + Resource capability, Priority priority, long createTime, + LogAggregationContext logAggregationContext, String nodeLabelExpression, + ContainerType containerType) { + return createContainerToken(containerId, containerVersion, nodeId, + appSubmitter, capability, priority, createTime, null, null, + ContainerType.TASK, ExecutionType.GUARANTEED); + } + /** * Helper function for creating ContainerTokens * @@ -199,13 +224,14 @@ public Token createContainerToken(ContainerId containerId, * @param logAggregationContext Log Aggregation Context * @param nodeLabelExpression Node Label Expression * @param containerType Container Type + * @param execType Execution Type * @return the container-token */ public Token createContainerToken(ContainerId containerId, int containerVersion, NodeId nodeId, String appSubmitter, Resource capability, Priority priority, long createTime, LogAggregationContext logAggregationContext, String nodeLabelExpression, - ContainerType containerType) { + ContainerType containerType, ExecutionType execType) { byte[] password; ContainerTokenIdentifier tokenIdentifier; long expiryTimeStamp = @@ -220,7 +246,7 @@ public Token createContainerToken(ContainerId containerId, this.currentMasterKey.getMasterKey().getKeyId(), ResourceManager.getClusterTimeStamp(), priority, createTime, logAggregationContext, nodeLabelExpression, containerType, - ExecutionType.GUARANTEED); + execType); password = this.createPassword(tokenIdentifier); } finally {