From e9c72d04beddfe0252d2e81123a9fe66bdf04078 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Mon, 29 Jan 2018 20:43:08 +0530 Subject: [PATCH] YARN-7790. Improve Capacity Scheduler Async Scheduling to better handle node failures. Contributed by Wangda Tan. --- .../scheduler/AbstractYarnScheduler.java | 51 +++--- .../scheduler/SchedulerNode.java | 16 ++ .../scheduler/capacity/CapacityScheduler.java | 49 +++++- .../TestRMHAForAsyncScheduler.java | 38 ++++- .../TestCapacitySchedulerAsyncScheduling.java | 159 +++++++++++++++++- 5 files changed, 276 insertions(+), 37 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index c94c379596..4b7632741c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -980,11 +980,11 @@ public SchedulerNode getNode(NodeId nodeId) { /** * Get lists of new containers from NodeManager and process them. * @param nm The RMNode corresponding to the NodeManager + * @param schedulerNode schedulerNode * @return list of completed containers */ - protected List updateNewContainerInfo(RMNode nm) { - SchedulerNode node = getNode(nm.getNodeID()); - + private List updateNewContainerInfo(RMNode nm, + SchedulerNode schedulerNode) { List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList<>(); @@ -999,14 +999,15 @@ protected List updateNewContainerInfo(RMNode nm) { // Processing the newly launched containers for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); + containerLaunchedOnNode(launchedContainer.getContainerId(), + schedulerNode); } // Processing the newly increased containers List newlyIncreasedContainers = nm.pullNewlyIncreasedContainers(); for (Container container : newlyIncreasedContainers) { - containerIncreasedOnNode(container.getId(), node, container); + containerIncreasedOnNode(container.getId(), schedulerNode, container); } return completedContainers; @@ -1017,12 +1018,12 @@ protected List updateNewContainerInfo(RMNode nm) { * @param completedContainers Extracted list of completed containers * @param releasedResources Reference resource object for completed containers * @param nodeId NodeId corresponding to the NodeManager + * @param schedulerNode schedulerNode * @return The total number of released containers */ - protected int updateCompletedContainers(List - completedContainers, Resource releasedResources, NodeId nodeId) { + private int updateCompletedContainers(List completedContainers, + Resource releasedResources, NodeId nodeId, SchedulerNode schedulerNode) { int releasedContainers = 0; - SchedulerNode node = getNode(nodeId); List untrackedContainerIdList = new ArrayList(); for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); @@ -1030,8 +1031,8 @@ protected int updateCompletedContainers(List RMContainer container = getRMContainer(containerId); completedContainer(container, completedContainer, RMContainerEventType.FINISHED); - if (node != null) { - node.releaseContainer(containerId, true); + if (schedulerNode != null) { + schedulerNode.releaseContainer(containerId, true); } if (container != null) { @@ -1076,14 +1077,14 @@ protected void updateSchedulerHealthInformation(Resource releasedResources, /** * Update container and utilization information on the NodeManager. * @param nm The NodeManager to update + * @param schedulerNode schedulerNode */ - protected void updateNodeResourceUtilization(RMNode nm) { - SchedulerNode node = getNode(nm.getNodeID()); + protected void updateNodeResourceUtilization(RMNode nm, + SchedulerNode schedulerNode) { // Updating node resource utilization - node.setAggregatedContainersUtilization( + schedulerNode.setAggregatedContainersUtilization( nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); - + schedulerNode.setNodeUtilization(nm.getNodeUtilization()); } /** @@ -1097,12 +1098,17 @@ protected void nodeUpdate(RMNode nm) { } // Process new container information - List completedContainers = updateNewContainerInfo(nm); + SchedulerNode schedulerNode = getNode(nm.getNodeID()); + List completedContainers = updateNewContainerInfo(nm, + schedulerNode); + + // Notify Scheduler Node updated. + schedulerNode.notifyNodeUpdate(); // Process completed containers Resource releasedResources = Resource.newInstance(0, 0); int releasedContainers = updateCompletedContainers(completedContainers, - releasedResources, nm.getNodeID()); + releasedResources, nm.getNodeID(), schedulerNode); // If the node is decommissioning, send an update to have the total // resource equal to the used resource, so no available resource to @@ -1115,18 +1121,17 @@ protected void nodeUpdate(RMNode nm) { .getEventHandler() .handle( new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(nm.getNodeID()) - .getAllocatedResource(), 0))); + .newInstance(schedulerNode.getAllocatedResource(), 0))); } updateSchedulerHealthInformation(releasedResources, releasedContainers); - updateNodeResourceUtilization(nm); + updateNodeResourceUtilization(nm, schedulerNode); // Now node data structures are up-to-date and ready for scheduling. if(LOG.isDebugEnabled()) { - SchedulerNode node = getNode(nm.getNodeID()); - LOG.debug("Node being looked for scheduling " + nm + - " availableResource: " + node.getUnallocatedResource()); + LOG.debug( + "Node being looked for scheduling " + nm + " availableResource: " + + schedulerNode.getUnallocatedResource()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 05dbf1e51a..89f748d418 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; @@ -76,6 +77,9 @@ public abstract class SchedulerNode { private volatile Set labels = null; + // Last updated time + private volatile long lastHeartbeatMonotonicTime; + public SchedulerNode(RMNode node, boolean usePortForNodeName, Set labels) { this.rmNode = node; @@ -87,6 +91,7 @@ public SchedulerNode(RMNode node, boolean usePortForNodeName, nodeName = rmNode.getHostName(); } this.labels = ImmutableSet.copyOf(labels); + this.lastHeartbeatMonotonicTime = Time.monotonicNow(); } public SchedulerNode(RMNode node, boolean usePortForNodeName) { @@ -453,6 +458,17 @@ public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + public long getLastHeartbeatMonotonicTime() { + return lastHeartbeatMonotonicTime; + } + + /** + * This will be called for each node heartbeat. + */ + public void notifyNodeUpdate() { + this.lastHeartbeatMonotonicTime = Time.monotonicNow(); + } + private static class ContainerInfo { private final RMContainer container; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 99f4456b5b..03ca507bd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -142,7 +142,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; -import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -181,8 +180,6 @@ public class CapacityScheduler extends private CSConfigurationProvider csConfProvider; - protected Clock monotonicClock; - @Override public void setConf(Configuration conf) { yarnConf = conf; @@ -243,6 +240,8 @@ public Configuration getConf() { private RMNodeLabelsManager labelManager; private AppPriorityACLsManager appPriorityACLManager; + private static boolean printedVerboseLoggingForAsyncScheduling = false; + /** * EXPERT */ @@ -471,6 +470,22 @@ long getAsyncScheduleInterval() { private final static Random random = new Random(System.currentTimeMillis()); + private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node, + CapacityScheduler cs, boolean printVerboseLog) { + // Skip node which missed 2 heartbeats since the node might be dead and + // we should not continue allocate containers on that. + long timeElapsedFromLastHeartbeat = + Time.monotonicNow() - node.getLastHeartbeatMonotonicTime(); + if (timeElapsedFromLastHeartbeat > cs.nmHeartbeatInterval * 2) { + if (printVerboseLog && LOG.isDebugEnabled()) { + LOG.debug("Skip scheduling on node because it haven't heartbeated for " + + timeElapsedFromLastHeartbeat / 1000.0f + " secs"); + } + return true; + } + return false; + } + /** * Schedule on all nodes by starting at a random point. * @param cs @@ -481,16 +496,42 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{ Collection nodes = cs.nodeTracker.getAllNodes(); int start = random.nextInt(nodes.size()); + // To avoid too verbose DEBUG logging, only print debug log once for + // every 10 secs. + boolean printSkipedNodeLogging = false; + if (Time.monotonicNow() / 1000 % 10 == 0) { + printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling); + } else { + printedVerboseLoggingForAsyncScheduling = false; + } + + // Allocate containers of node [start, end) for (FiCaSchedulerNode node : nodes) { if (current++ >= start) { + if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { + continue; + } cs.allocateContainersToNode(node.getNodeID(), false); } } - // Now, just get everyone to be safe + + current = 0; + + // Allocate containers of node [0, start) for (FiCaSchedulerNode node : nodes) { + if (current++ > start) { + break; + } + if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { + continue; + } cs.allocateContainersToNode(node.getNodeID(), false); } + if (printSkipedNodeLogging) { + printedVerboseLoggingForAsyncScheduling = true; + } + Thread.sleep(cs.getAsyncScheduleInterval()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java index 46d5cda2b4..36f1762328 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java @@ -28,13 +28,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAsyncScheduling; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Arrays; +import java.util.List; + public class TestRMHAForAsyncScheduler extends RMHATestBase { + private TestCapacitySchedulerAsyncScheduling.NMHeartbeatThread + nmHeartbeatThread = null; @Before @Override @@ -57,26 +63,49 @@ public void setup() throws Exception { CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true); } + private void keepNMHeartbeat(List mockNMs, int interval) { + if (nmHeartbeatThread != null) { + nmHeartbeatThread.setShouldStop(); + nmHeartbeatThread = null; + } + nmHeartbeatThread = + new TestCapacitySchedulerAsyncScheduling.NMHeartbeatThread(mockNMs, + interval); + nmHeartbeatThread.start(); + } + + private void pauseNMHeartbeat() { + if (nmHeartbeatThread != null) { + nmHeartbeatThread.setShouldStop(); + nmHeartbeatThread = null; + } + } + @Test(timeout = 60000) public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception { // start two RMs, and transit rm1 to active, rm2 to standby startRMs(); // register NM - rm1.registerNode("h1:1234", 8192, 8); + MockNM nm = rm1.registerNode("192.1.1.1:1234", 8192, 8); // submit app1 and check RMApp app1 = submitAppAndCheckLaunched(rm1); + keepNMHeartbeat(Arrays.asList(nm), 1000); // failover RM1 to RM2 explicitFailover(); checkAsyncSchedulerThreads(Thread.currentThread()); + pauseNMHeartbeat(); // register NM, kill app1 - rm2.registerNode("h1:1234", 8192, 8); + nm = rm2.registerNode("192.1.1.1:1234", 8192, 8); + keepNMHeartbeat(Arrays.asList(nm), 1000); + rm2.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.LAUNCHED); rm2.killApp(app1.getApplicationId()); // submit app3 and check RMApp app2 = submitAppAndCheckLaunched(rm2); + pauseNMHeartbeat(); // failover RM2 to RM1 HAServiceProtocol.StateChangeRequestInfo requestInfo = @@ -92,12 +121,15 @@ public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception { checkAsyncSchedulerThreads(Thread.currentThread()); // register NM, kill app2 - rm1.registerNode("h1:1234", 8192, 8); + nm = rm1.registerNode("192.1.1.1:1234", 8192, 8); + keepNMHeartbeat(Arrays.asList(nm), 1000); + rm1.waitForState(app2.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.LAUNCHED); rm1.killApp(app2.getApplicationId()); // submit app3 and check submitAppAndCheckLaunched(rm1); + pauseNMHeartbeat(); rm1.stop(); rm2.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 77596e25be..548b909d23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; @@ -72,6 +73,8 @@ public class TestCapacitySchedulerAsyncScheduling { RMNodeLabelsManager mgr; + private NMHeartbeatThread nmHeartbeatThread = null; + @Before public void setUp() throws Exception { conf = new YarnConfiguration(); @@ -122,9 +125,11 @@ public RMNodeLabelsManager createNodeLabelManager() { List nms = new ArrayList<>(); // Add 10 nodes to the cluster, in the cluster we have 200 GB resource for (int i = 0; i < 10; i++) { - nms.add(rm.registerNode("h-" + i + ":1234", 20 * GB)); + nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB)); } + keepNMHeartbeat(nms, 1000); + List ams = new ArrayList<>(); // Add 3 applications to the cluster, one app in one queue // the i-th app ask (20 * i) containers. So in total we will have @@ -185,8 +190,8 @@ public void testCommitProposalForFailedAppAttempt() // init RM & NMs & Nodes final MockRM rm = new MockRM(disableAsyncConf); rm.start(); - final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); - final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB); + final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 9 * GB); List nmLst = new ArrayList<>(); nmLst.add(nm1); nmLst.add(nm2); @@ -277,8 +282,8 @@ public void testCommitOutdatedReservedProposal() throws Exception { // init RM & NMs & Nodes final MockRM rm = new MockRM(disableAsyncConf); rm.start(); - final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); - final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB); + final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("127.0.0.2:2234", 9 * GB); // init scheduler nodes int waitTime = 1000; @@ -416,8 +421,8 @@ public void testNodeResourceOverAllocated() // init RM & NMs & Nodes final MockRM rm = new MockRM(disableAsyncConf); rm.start(); - final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); - final MockNM nm2 = rm.registerNode("h2:1234", 9 * GB); + final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("127.0.0.2:1234", 9 * GB); List nmLst = new ArrayList<>(); nmLst.add(nm1); nmLst.add(nm2); @@ -476,6 +481,146 @@ public void testNodeResourceOverAllocated() rm.stop(); } + /** + * Make sure scheduler skips NMs which haven't heartbeat for a while. + * @throws Exception + */ + @Test + public void testAsyncSchedulerSkipNoHeartbeatNMs() throws Exception { + int heartbeatInterval = 100; + conf.setInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, + 1); + conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms", 100); + // Heartbeat interval is 100 ms. + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, heartbeatInterval); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + + // inject node label manager + MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + List nms = new ArrayList<>(); + // Add 10 nodes to the cluster, in the cluster we have 200 GB resource + for (int i = 0; i < 10; i++) { + nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB)); + } + + List ams = new ArrayList<>(); + + keepNMHeartbeat(nms, heartbeatInterval); + + for (int i = 0; i < 3; i++) { + RMApp rmApp = rm.submitApp(1024, "app", "user", null, false, + Character.toString((char) (i % 34 + 97)), 1, null, null, false); + MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm); + am.registerAppAttempt(); + ams.add(am); + } + + pauseNMHeartbeat(); + + Thread.sleep(heartbeatInterval * 3); + + // Applications request containers. + for (int i = 0; i < 3; i++) { + ams.get(i).allocate("*", 1024, 20 * (i + 1), new ArrayList<>()); + } + + for (int i = 0; i < 5; i++) { + // Do heartbeat for NM 0-4 + nms.get(i).nodeHeartbeat(true); + } + + // Wait for 2000 ms. + Thread.sleep(2000); + + // Make sure that NM5-9 don't have non-AM containers. + for (int i = 0; i < 9; i++) { + if (i < 5) { + Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) > 0); + } else { + Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) == 0); + } + } + + rm.close(); + } + + public static class NMHeartbeatThread extends Thread { + private List mockNMS; + private int interval; + private volatile boolean shouldStop = false; + + public NMHeartbeatThread(List mockNMs, int interval) { + this.mockNMS = mockNMs; + this.interval = interval; + } + + public void run() { + while (true) { + if (shouldStop) { + break; + } + for (MockNM nm : mockNMS) { + try { + nm.nodeHeartbeat(true); + } catch (Exception e) { + e.printStackTrace(); + } + } + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public void setShouldStop() { + shouldStop = true; + } + } + + private void keepNMHeartbeat(List mockNMs, int interval) { + if (nmHeartbeatThread != null) { + nmHeartbeatThread.setShouldStop(); + nmHeartbeatThread = null; + } + nmHeartbeatThread = new NMHeartbeatThread(mockNMs, interval); + nmHeartbeatThread.start(); + } + + private void pauseNMHeartbeat() { + if (nmHeartbeatThread != null) { + nmHeartbeatThread.setShouldStop(); + nmHeartbeatThread = null; + } + } + + private int checkNumNonAMContainersOnNode(CapacityScheduler cs, MockNM nm) { + SchedulerNode node = cs.getNode(nm.getNodeId()); + int nonAMContainer = 0; + for (RMContainer c : node.getCopiedListOfRunningContainers()) { + if (!c.isAMContainer()) { + nonAMContainer++; + } + } + return nonAMContainer; + } + private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, int nContainer, Resource resource, int priority, int startContainerId) throws Exception {