diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml index 5bb5a415d9..208cbdf6c1 100644 --- a/hadoop-tools/hadoop-sls/pom.xml +++ b/hadoop-tools/hadoop-sls/pom.xml @@ -73,6 +73,11 @@ com.fasterxml.jackson.core jackson-databind + + org.mockito + mockito-core + test + diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 1330e4d2f2..5315eaa1a3 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -25,7 +25,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -118,6 +121,8 @@ public abstract class AMSimulator extends TaskRunner.Task { private Map appIdToAMSim; + private Set ranNodes = new ConcurrentSkipListSet(); + public AMSimulator() { this.responseQueue = new LinkedBlockingQueue<>(); } @@ -236,6 +241,11 @@ public void lastStep() throws Exception { LOG.info("AM container is null"); } + // Clear runningApps for ranNodes of this app + for (NodeId nodeId : ranNodes) { + se.getNmMap().get(nodeId).finishApplication(getApplicationId()); + } + if (null == appAttemptId) { // If appAttemptId == null, AM is not launched from RM's perspective, so // it's unnecessary to finish am as well @@ -497,4 +507,8 @@ public ApplicationId getApplicationId() { public ApplicationAttemptId getApplicationAttemptId() { return appAttemptId; } + + public Set getRanNodes() { + return this.ranNodes; + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java index f886a69e02..c67544ee38 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java @@ -189,7 +189,8 @@ protected void processResponseQueue() throws Exception { appId, container.getId()); assignedContainers.put(container.getId(), cs); se.getNmMap().get(container.getNodeId()) - .addNewContainer(container, cs.getLifeTime()); + .addNewContainer(container, cs.getLifeTime(), appId); + getRanNodes().add(container.getNodeId()); } } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index 586c671afe..184fdca2e5 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -231,14 +231,16 @@ protected void processResponseQueue() throws Exception { appId, container.getId()); assignedMaps.put(container.getId(), cs); se.getNmMap().get(container.getNodeId()) - .addNewContainer(container, cs.getLifeTime()); + .addNewContainer(container, cs.getLifeTime(), appId); + getRanNodes().add(container.getNodeId()); } else if (! this.scheduledReduces.isEmpty()) { ContainerSimulator cs = scheduledReduces.remove(); LOG.debug("Application {} starts to launch a reducer ({}).", appId, container.getId()); assignedReduces.put(container.getId(), cs); se.getNmMap().get(container.getNodeId()) - .addNewContainer(container, cs.getLifeTime()); + .addNewContainer(container, cs.getLifeTime(), appId); + getRanNodes().add(container.getNodeId()); } } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java index 46bc90a337..7e3545191f 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java @@ -172,7 +172,8 @@ protected void processResponseQueue() throws Exception { container.getId()); assignedStreams.put(container.getId(), cs); se.getNmMap().get(container.getNodeId()).addNewContainer(container, - cs.getLifeTime()); + cs.getLifeTime(), appId); + getRanNodes().add(container.getNodeId()); } } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java index 2ec39762b8..54311d5538 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java @@ -250,7 +250,8 @@ public RMNode getNode() { /** * launch a new container with the given life time */ - public void addNewContainer(Container container, long lifeTimeMS) { + public void addNewContainer(Container container, long lifeTimeMS, + ApplicationId applicationId) { LOG.debug("NodeManager {} launches a new container ({}).", node.getNodeID(), container.getId()); if (lifeTimeMS != -1) { @@ -267,6 +268,15 @@ public void addNewContainer(Container container, long lifeTimeMS) { amContainerList.add(container.getId()); } } + + // update runningApplications on the node + if (applicationId != null + && !getNode().getRunningApps().contains(applicationId)) { + getNode().getRunningApps().add(applicationId); + } + LOG.debug("Adding running app: {} on node: {}. " + + "Updated runningApps on this node are: {}", + applicationId, getNode().getNodeID(), getNode().getRunningApps()); } /** @@ -296,4 +306,13 @@ List getAMContainers() { List getCompletedContainers() { return completedContainerList; } + + public void finishApplication(ApplicationId applicationId) { + if (getNode().getRunningApps().contains(applicationId)) { + getNode().getRunningApps().remove(applicationId); + LOG.debug("Removed running app: {} from node: {}. " + + "Updated runningApps on this node are: {}", + applicationId, getNode().getNodeID(), getNode().getRunningApps()); + } + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java index 37bf96afa0..d28407669c 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java @@ -104,7 +104,8 @@ public void handle(AMLauncherEvent event) { LOG.info("Notify AM launcher launched:" + amContainer.getId()); se.getNmMap().get(amContainer.getNodeId()) - .addNewContainer(amContainer, -1); + .addNewContainer(amContainer, -1, appId); + ams.getRanNodes().add(amContainer.getNodeId()); return; } catch (Exception e) { throw new YarnRuntimeException(e); diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index ec7c81d63b..50ac700d9c 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -32,7 +33,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; import org.apache.hadoop.yarn.sls.scheduler.*; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; @@ -41,6 +44,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.Mockito; import java.io.IOException; import java.nio.file.Files; @@ -50,8 +54,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentMap; +import static org.mockito.Mockito.when; + @RunWith(Parameterized.class) public class TestAMSimulator { private ResourceManager rm; @@ -288,6 +295,28 @@ public void testPackageRequests() { Assert.assertEquals(3, nodeRequestCount); } + @Test + public void testAMSimulatorRanNodesCleared() throws Exception { + NMSimulator nm = new NMSimulator(); + nm.init("/rack1/testNode1", Resources.createResource(1024 * 10, 10), 0, 1000, + rm, -1f); + + Map nmMap = new HashMap<>(); + nmMap.put(nm.getNode().getNodeID(), nm); + + MockAMSimulator app = new MockAMSimulator(); + app.appId = ApplicationId.newInstance(0l, 1); + SLSRunner slsRunner = Mockito.mock(SLSRunner.class); + app.se = slsRunner; + when(slsRunner.getNmMap()).thenReturn(nmMap); + app.getRanNodes().add(nm.getNode().getNodeID()); + nm.getNode().getRunningApps().add(app.appId); + Assert.assertTrue(nm.getNode().getRunningApps().contains(app.appId)); + + app.lastStep(); + Assert.assertFalse(nm.getNode().getRunningApps().contains(app.appId)); + Assert.assertTrue(nm.getNode().getRunningApps().isEmpty()); + } @After public void tearDown() { diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java index 003417e6d2..f82ce916e1 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java @@ -19,6 +19,8 @@ import java.util.function.Supplier; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -109,7 +111,7 @@ public void testNMSimulator() throws Exception { ContainerId cId1 = newContainerId(1, 1, 1); Container container1 = Container.newInstance(cId1, null, null, Resources.createResource(GB, 1), null, null); - node1.addNewContainer(container1, 100000l); + node1.addNewContainer(container1, 100000l, null); Assert.assertTrue("Node1 should have one running container.", node1.getRunningContainers().containsKey(cId1)); @@ -117,7 +119,7 @@ public void testNMSimulator() throws Exception { ContainerId cId2 = newContainerId(2, 1, 1); Container container2 = Container.newInstance(cId2, null, null, Resources.createResource(GB, 1), null, null); - node1.addNewContainer(container2, -1l); + node1.addNewContainer(container2, -1l, null); Assert.assertTrue("Node1 should have one running AM container", node1.getAMContainers().contains(cId2)); @@ -137,6 +139,100 @@ private ContainerId newContainerId(int appId, int appAttemptId, int cId) { appAttemptId), cId); } + @Test + public void testNMSimAppAddedAndRemoved() throws Exception { + // Register one node + NMSimulator node = new NMSimulator(); + node.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000, + rm, -1f); + node.middleStep(); + + int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes(); + int cumulativeSleepTime = 0; + int sleepInterval = 100; + + while (numClusterNodes != 1 && cumulativeSleepTime < 5000) { + Thread.sleep(sleepInterval); + cumulativeSleepTime = cumulativeSleepTime + sleepInterval; + numClusterNodes = rm.getResourceScheduler().getNumClusterNodes(); + } + + GenericTestUtils.waitFor(new com.google.common.base.Supplier() { + @Override + public Boolean get() { + return rm.getResourceScheduler().getRootQueueMetrics() + .getAvailableMB() > 0; + } + }, 500, 10000); + + Assert.assertEquals("Node should have no runningApps.", + node.getNode().getRunningApps().size(), 0); + + // Allocate one app container on node + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 1); + ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 1); + Container container = Container.newInstance(cId, null, null, + Resources.createResource(GB, 1), null, null); + node.addNewContainer(container, 100000l, appId); + Assert.assertTrue("Node should have app: " + + appId + " in runningApps list.", + node.getNode().getRunningApps().contains(appId)); + + // Finish the app on the node. + node.finishApplication(appId); + Assert.assertFalse("Node should not have app: " + + appId + " in runningApps list.", + node.getNode().getRunningApps().contains(appId)); + Assert.assertEquals("Node should have no runningApps.", + node.getNode().getRunningApps().size(), 0); + } + + @Test + public void testNMSimNullAppAddedAndRemoved() throws Exception { + // Register one node + NMSimulator node = new NMSimulator(); + node.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000, + rm, -1f); + node.middleStep(); + + int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes(); + int cumulativeSleepTime = 0; + int sleepInterval = 100; + + while (numClusterNodes != 1 && cumulativeSleepTime < 5000) { + Thread.sleep(sleepInterval); + cumulativeSleepTime = cumulativeSleepTime + sleepInterval; + numClusterNodes = rm.getResourceScheduler().getNumClusterNodes(); + } + + GenericTestUtils.waitFor(new com.google.common.base.Supplier() { + @Override + public Boolean get() { + return rm.getResourceScheduler().getRootQueueMetrics() + .getAvailableMB() > 0; + } + }, 500, 10000); + + Assert.assertEquals("Node should have no runningApps.", + node.getNode().getRunningApps().size(), 0); + + // Allocate null app container on node + ContainerId cId = newContainerId(1, 1, 1); + Container container = Container.newInstance(cId, null, null, + Resources.createResource(GB, 1), null, null); + node.addNewContainer(container, 100000l, null); + Assert.assertEquals("Node should have no runningApps if appId is null.", + node.getNode().getRunningApps().size(), 0); + + // Finish non-existent app on the node. + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + node.finishApplication(appId); + Assert.assertEquals("Node should have no runningApps.", + node.getNode().getRunningApps().size(), 0); + } + @After public void tearDown() throws Exception { rm.stop();