YARN-9402. Opportunistic containers should not be scheduled on Decommissioning nodes. Contributed by Abhishek Modi.

This commit is contained in:
Giovanni Matteo Fumarola 2019-03-21 12:04:05 -07:00
parent a99eb80659
commit 548997d6c9
2 changed files with 57 additions and 4 deletions
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src
main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed
test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
@ -230,8 +231,9 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
try { try {
ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID()); ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
if (currentNode == null) { if (currentNode == null) {
if (estimatedQueueWaitTime != -1 if (rmNode.getState() != NodeState.DECOMMISSIONING &&
|| comparator == LoadComparator.QUEUE_LENGTH) { (estimatedQueueWaitTime != -1 ||
comparator == LoadComparator.QUEUE_LENGTH)) {
this.clusterNodes.put(rmNode.getNodeID(), this.clusterNodes.put(rmNode.getNodeID(),
new ClusterNode(rmNode.getNodeID()) new ClusterNode(rmNode.getNodeID())
.setQueueWaitTime(estimatedQueueWaitTime) .setQueueWaitTime(estimatedQueueWaitTime)
@ -246,8 +248,9 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
"wait queue length [" + waitQueueLength + "]"); "wait queue length [" + waitQueueLength + "]");
} }
} else { } else {
if (estimatedQueueWaitTime != -1 if (rmNode.getState() != NodeState.DECOMMISSIONING &&
|| comparator == LoadComparator.QUEUE_LENGTH) { (estimatedQueueWaitTime != -1 ||
comparator == LoadComparator.QUEUE_LENGTH)) {
currentNode currentNode
.setQueueWaitTime(estimatedQueueWaitTime) .setQueueWaitTime(estimatedQueueWaitTime)
.setQueueLength(waitQueueLength) .setQueueLength(waitQueueLength)

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -99,6 +100,23 @@ public class TestNodeQueueLoadMonitor {
Assert.assertEquals("h3:3", nodeIds.get(0).toString()); Assert.assertEquals("h3:3", nodeIds.get(0).toString());
Assert.assertEquals("h2:2", nodeIds.get(1).toString()); Assert.assertEquals("h2:2", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString()); Assert.assertEquals("h1:1", nodeIds.get(2).toString());
// Now update node 2 to DECOMMISSIONING state
selector
.updateNode(createRMNode("h2", 2, 1, 10, NodeState.DECOMMISSIONING));
selector.computeTask.run();
nodeIds = selector.selectNodes();
Assert.assertEquals(2, nodeIds.size());
Assert.assertEquals("h3:3", nodeIds.get(0).toString());
Assert.assertEquals("h1:1", nodeIds.get(1).toString());
// Now update node 2 back to RUNNING state
selector.updateNode(createRMNode("h2", 2, 1, 10, NodeState.RUNNING));
selector.computeTask.run();
nodeIds = selector.selectNodes();
Assert.assertEquals("h2:2", nodeIds.get(0).toString());
Assert.assertEquals("h3:3", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
} }
@Test @Test
@ -145,6 +163,25 @@ public class TestNodeQueueLoadMonitor {
Assert.assertEquals("h2:2", nodeIds.get(0).toString()); Assert.assertEquals("h2:2", nodeIds.get(0).toString());
Assert.assertEquals("h1:1", nodeIds.get(1).toString()); Assert.assertEquals("h1:1", nodeIds.get(1).toString());
Assert.assertEquals("h4:4", nodeIds.get(2).toString()); Assert.assertEquals("h4:4", nodeIds.get(2).toString());
// Now update h2 to Decommissioning state
selector.updateNode(createRMNode("h2", 2, -1,
5, NodeState.DECOMMISSIONING));
selector.computeTask.run();
nodeIds = selector.selectNodes();
Assert.assertEquals(2, nodeIds.size());
Assert.assertEquals("h1:1", nodeIds.get(0).toString());
Assert.assertEquals("h4:4", nodeIds.get(1).toString());
// Now update h2 back to Running state
selector.updateNode(createRMNode("h2", 2, -1,
5, NodeState.RUNNING));
selector.computeTask.run();
nodeIds = selector.selectNodes();
Assert.assertEquals(3, nodeIds.size());
Assert.assertEquals("h2:2", nodeIds.get(0).toString());
Assert.assertEquals("h1:1", nodeIds.get(1).toString());
Assert.assertEquals("h4:4", nodeIds.get(2).toString());
} }
@Test @Test
@ -197,11 +234,24 @@ public class TestNodeQueueLoadMonitor {
DEFAULT_MAX_QUEUE_LENGTH); DEFAULT_MAX_QUEUE_LENGTH);
} }
private RMNode createRMNode(String host, int port,
int waitTime, int queueLength, NodeState state) {
return createRMNode(host, port, waitTime, queueLength,
DEFAULT_MAX_QUEUE_LENGTH, state);
}
private RMNode createRMNode(String host, int port, private RMNode createRMNode(String host, int port,
int waitTime, int queueLength, int queueCapacity) { int waitTime, int queueLength, int queueCapacity) {
return createRMNode(host, port, waitTime, queueLength, queueCapacity,
NodeState.RUNNING);
}
private RMNode createRMNode(String host, int port,
int waitTime, int queueLength, int queueCapacity, NodeState state) {
RMNode node1 = Mockito.mock(RMNode.class); RMNode node1 = Mockito.mock(RMNode.class);
NodeId nID1 = new FakeNodeId(host, port); NodeId nID1 = new FakeNodeId(host, port);
Mockito.when(node1.getNodeID()).thenReturn(nID1); Mockito.when(node1.getNodeID()).thenReturn(nID1);
Mockito.when(node1.getState()).thenReturn(state);
OpportunisticContainersStatus status1 = OpportunisticContainersStatus status1 =
Mockito.mock(OpportunisticContainersStatus.class); Mockito.mock(OpportunisticContainersStatus.class);
Mockito.when(status1.getEstimatedQueueWaitTime()) Mockito.when(status1.getEstimatedQueueWaitTime())