From 548997d6c9c5a1b9734ee00d065ce48a189458e6 Mon Sep 17 00:00:00 2001 From: Giovanni Matteo Fumarola Date: Thu, 21 Mar 2019 12:04:05 -0700 Subject: [PATCH] YARN-9402. Opportunistic containers should not be scheduled on Decommissioning nodes. Contributed by Abhishek Modi. --- .../distributed/NodeQueueLoadMonitor.java | 11 ++-- .../distributed/TestNodeQueueLoadMonitor.java | 50 +++++++++++++++++++ 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index ca358862ec..e093b2d997 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.api.records.NodeState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.records.NodeId; @@ -230,8 +231,9 @@ public class NodeQueueLoadMonitor implements ClusterMonitor { try { ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID()); if (currentNode == null) { - if (estimatedQueueWaitTime != -1 - || comparator == LoadComparator.QUEUE_LENGTH) { + if (rmNode.getState() != NodeState.DECOMMISSIONING && + (estimatedQueueWaitTime != -1 || + comparator == LoadComparator.QUEUE_LENGTH)) { this.clusterNodes.put(rmNode.getNodeID(), new ClusterNode(rmNode.getNodeID()) .setQueueWaitTime(estimatedQueueWaitTime) @@ -246,8 +248,9 @@ public class NodeQueueLoadMonitor implements ClusterMonitor { "wait queue length [" + waitQueueLength + "]"); } } else { - if (estimatedQueueWaitTime != -1 - || comparator == LoadComparator.QUEUE_LENGTH) { + if (rmNode.getState() != NodeState.DECOMMISSIONING && + (estimatedQueueWaitTime != -1 || + comparator == LoadComparator.QUEUE_LENGTH)) { currentNode .setQueueWaitTime(estimatedQueueWaitTime) .setQueueLength(waitQueueLength) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java index 85eddaa2ad..bbc0086c37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -99,6 +100,23 @@ public class TestNodeQueueLoadMonitor { Assert.assertEquals("h3:3", nodeIds.get(0).toString()); Assert.assertEquals("h2:2", nodeIds.get(1).toString()); Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + + // Now update node 2 to DECOMMISSIONING state + selector + .updateNode(createRMNode("h2", 2, 1, 10, NodeState.DECOMMISSIONING)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + Assert.assertEquals(2, nodeIds.size()); + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h1:1", nodeIds.get(1).toString()); + + // Now update node 2 back to RUNNING state + selector.updateNode(createRMNode("h2", 2, 1, 10, NodeState.RUNNING)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h3:3", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); } @Test @@ -145,6 +163,25 @@ public class TestNodeQueueLoadMonitor { Assert.assertEquals("h2:2", nodeIds.get(0).toString()); Assert.assertEquals("h1:1", nodeIds.get(1).toString()); Assert.assertEquals("h4:4", nodeIds.get(2).toString()); + + // Now update h2 to Decommissioning state + selector.updateNode(createRMNode("h2", 2, -1, + 5, NodeState.DECOMMISSIONING)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + Assert.assertEquals(2, nodeIds.size()); + Assert.assertEquals("h1:1", nodeIds.get(0).toString()); + Assert.assertEquals("h4:4", nodeIds.get(1).toString()); + + // Now update h2 back to Running state + selector.updateNode(createRMNode("h2", 2, -1, + 5, NodeState.RUNNING)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + Assert.assertEquals(3, nodeIds.size()); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h1:1", nodeIds.get(1).toString()); + Assert.assertEquals("h4:4", nodeIds.get(2).toString()); } @Test @@ -197,11 +234,24 @@ public class TestNodeQueueLoadMonitor { DEFAULT_MAX_QUEUE_LENGTH); } + private RMNode createRMNode(String host, int port, + int waitTime, int queueLength, NodeState state) { + return createRMNode(host, port, waitTime, queueLength, + DEFAULT_MAX_QUEUE_LENGTH, state); + } + private RMNode createRMNode(String host, int port, int waitTime, int queueLength, int queueCapacity) { + return createRMNode(host, port, waitTime, queueLength, queueCapacity, + NodeState.RUNNING); + } + + private RMNode createRMNode(String host, int port, + int waitTime, int queueLength, int queueCapacity, NodeState state) { RMNode node1 = Mockito.mock(RMNode.class); NodeId nID1 = new FakeNodeId(host, port); Mockito.when(node1.getNodeID()).thenReturn(nID1); + Mockito.when(node1.getState()).thenReturn(state); OpportunisticContainersStatus status1 = Mockito.mock(OpportunisticContainersStatus.class); Mockito.when(status1.getEstimatedQueueWaitTime())