diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java index b0cec5a84c..c476c611b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.Timer; @@ -36,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; @@ -58,13 +59,8 @@ import org.apache.hadoop.yarn.util.MonotonicClock; * a DECOMMISSIONING node will be DECOMMISSIONED no later than * DECOMMISSIONING_TIMEOUT regardless of running containers or applications. * - * To be efficient, DecommissioningNodesWatcher skip tracking application - * containers on a particular node before the node is in DECOMMISSIONING state. - * It only tracks containers once the node is in DECOMMISSIONING state. * DecommissioningNodesWatcher basically is no cost when no node is - * DECOMMISSIONING. This sacrifices the possibility that the node once - * host containers of an application that is still running - * (the affected map tasks will be rescheduled). + * DECOMMISSIONING. */ public class DecommissioningNodesWatcher { private static final Logger LOG = @@ -88,8 +84,8 @@ public class DecommissioningNodesWatcher { // number of running containers at the moment. private int numActiveContainers; - // All applications run on the node at or after decommissioningStartTime. - private Set appIds; + // All applications run on the node. + private List appIds; // First moment the node is observed in DECOMMISSIONED state. private long decommissionedTime; @@ -102,7 +98,7 @@ public class DecommissioningNodesWatcher { public DecommissioningNodeContext(NodeId nodeId, int timeoutSec) { this.nodeId = nodeId; - this.appIds = new HashSet(); + this.appIds = new ArrayList<>(); this.decommissioningStartTime = mclock.getTime(); this.timeoutMs = 1000L * timeoutSec; } @@ -164,9 +160,7 @@ public class DecommissioningNodesWatcher { context.updateTimeout(rmNode.getDecommissioningTimeout()); context.lastUpdateTime = now; - if (remoteNodeStatus.getKeepAliveApplications() != null) { - context.appIds.addAll(remoteNodeStatus.getKeepAliveApplications()); - } + context.appIds = rmNode.getRunningApps(); // Count number of active containers. int numActiveContainers = 0; @@ -176,14 +170,7 @@ public class DecommissioningNodesWatcher { newState == ContainerState.NEW) { numActiveContainers++; } - context.numActiveContainers = numActiveContainers; - ApplicationId aid = cs.getContainerId() - .getApplicationAttemptId().getApplicationId(); - if (!context.appIds.contains(aid)) { - context.appIds.add(aid); - } } - context.numActiveContainers = numActiveContainers; // maintain lastContainerFinishTime. @@ -254,7 +241,6 @@ public class DecommissioningNodesWatcher { DecommissioningNodeStatus.TIMEOUT; } - removeCompletedApps(context); if (context.appIds.size() == 0) { return DecommissioningNodeStatus.READY; } else { @@ -336,25 +322,6 @@ public class DecommissioningNodesWatcher { return rmNode; } - private void removeCompletedApps(DecommissioningNodeContext context) { - Iterator it = context.appIds.iterator(); - while (it.hasNext()) { - ApplicationId appId = it.next(); - RMApp rmApp = rmContext.getRMApps().get(appId); - if (rmApp == null) { - LOG.debug("Consider non-existing app {} as completed", appId); - it.remove(); - continue; - } - if (rmApp.getState() == RMAppState.FINISHED || - rmApp.getState() == RMAppState.FAILED || - rmApp.getState() == RMAppState.KILLED) { - LOG.debug("Remove {} app {}", rmApp.getState(), appId); - it.remove(); - } - } - } - // Time in second to be decommissioned. private int getTimeoutInSec(DecommissioningNodeContext context) { if (context.nodeState == NodeState.DECOMMISSIONED) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java index 4371156085..0695689e16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java @@ -19,11 +19,11 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -35,7 +35,8 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecommissioningNodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -58,38 +59,106 @@ public class TestDecommissioningNodesWatcher { new DecommissioningNodesWatcher(rm.getRMContext()); MockNM nm1 = rm.registerNode("host1:1234", 10240); - RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNodeImpl node1 = + (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId()); NodeId id1 = nm1.getNodeId(); rm.waitForState(id1, NodeState.RUNNING); - Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); RMApp app = rm.submitApp(2000); MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + NodeStatus nodeStatus = createNodeStatus(id1, app, 3); + node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus)); + // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher. rm.sendNodeGracefulDecommission(nm1, YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); rm.waitForState(id1, NodeState.DECOMMISSIONING); // Update status with decreasing number of running containers until 0. - watcher.update(node1, createNodeStatus(id1, app, 12)); - watcher.update(node1, createNodeStatus(id1, app, 11)); + nodeStatus = createNodeStatus(id1, app, 3); + node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus)); + watcher.update(node1, nodeStatus); + + nodeStatus = createNodeStatus(id1, app, 2); + node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus)); + watcher.update(node1, nodeStatus); Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); - watcher.update(node1, createNodeStatus(id1, app, 1)); + nodeStatus = createNodeStatus(id1, app, 1); + node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus)); + watcher.update(node1, nodeStatus); Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER, - watcher.checkDecommissioningStatus(id1)); + watcher.checkDecommissioningStatus(id1)); - watcher.update(node1, createNodeStatus(id1, app, 0)); + nodeStatus = createNodeStatus(id1, app, 0); + watcher.update(node1, nodeStatus); + node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus)); Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP, - watcher.checkDecommissioningStatus(id1)); + watcher.checkDecommissioningStatus(id1)); // Set app to be FINISHED and verified DecommissioningNodeStatus is READY. MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + watcher.update(node1, nodeStatus); Assert.assertEquals(DecommissioningNodeStatus.READY, - watcher.checkDecommissioningStatus(id1)); + watcher.checkDecommissioningStatus(id1)); + } + + @Test + public void testDecommissioningNodesWatcherWithPreviousRunningApps() + throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, "40"); + + rm = new MockRM(conf); + rm.start(); + + DecommissioningNodesWatcher watcher = + new DecommissioningNodesWatcher(rm.getRMContext()); + + MockNM nm1 = rm.registerNode("host1:1234", 10240); + RMNodeImpl node1 = + (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + NodeId id1 = nm1.getNodeId(); + + rm.waitForState(id1, NodeState.RUNNING); + + RMApp app = rm.submitApp(2000); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + + NodeStatus nodeStatus = createNodeStatus(id1, app, 3); + node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus)); + + Assert.assertEquals(1, node1.getRunningApps().size()); + + // update node with 0 running containers + nodeStatus = createNodeStatus(id1, app, 0); + node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus)); + + Assert.assertEquals(1, node1.getRunningApps().size()); + + // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher. Right now + // there is no container running on the node. + rm.sendNodeGracefulDecommission(nm1, + YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); + rm.waitForState(id1, NodeState.DECOMMISSIONING); + + // we should still get WAIT_APP as container for a running app previously + // ran on this node. + watcher.update(node1, nodeStatus); + Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); + Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP, + watcher.checkDecommissioningStatus(id1)); + + // Set app to be FINISHED and verified DecommissioningNodeStatus is READY. + MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); + rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + Assert.assertEquals(0, node1.getRunningApps().size()); + watcher.update(node1, nodeStatus); + Assert.assertEquals(DecommissioningNodeStatus.READY, + watcher.checkDecommissioningStatus(id1)); } @After @@ -103,7 +172,7 @@ public class TestDecommissioningNodesWatcher { NodeId nodeId, RMApp app, int numRunningContainers) { return NodeStatus.newInstance( nodeId, 0, getContainerStatuses(app, numRunningContainers), - new ArrayList(), + Collections.emptyList(), NodeHealthStatus.newInstance( true, "", System.currentTimeMillis() - 1000), null, null, null); @@ -113,8 +182,8 @@ public class TestDecommissioningNodesWatcher { // where numRunningContainers are RUNNING. private List getContainerStatuses( RMApp app, int numRunningContainers) { - // Total 12 containers - final int total = 12; + // Total 3 containers + final int total = 3; numRunningContainers = Math.min(total, numRunningContainers); List output = new ArrayList(); for (int i = 0; i < total; i++) { @@ -122,8 +191,8 @@ public class TestDecommissioningNodesWatcher { ContainerState.COMPLETE : ContainerState.RUNNING; output.add(ContainerStatus.newInstance( ContainerId.newContainerId( - ApplicationAttemptId.newInstance(app.getApplicationId(), i), 1), - cstate, "Dummy", 0)); + ApplicationAttemptId.newInstance(app.getApplicationId(), 0), i), + cstate, "", 0)); } return output; }