YARN-9206. RMServerUtils does not count SHUTDOWN as an accepted state. Contributed by Kuhu Shukla.

This commit is contained in:
Sunil G 2019-02-04 12:48:28 +05:30
parent 0f9aa5b4d5
commit 604b2489a9
4 changed files with 66 additions and 10 deletions

View File

@ -55,4 +55,16 @@ public boolean isUnusable() {
return (this == UNHEALTHY || this == DECOMMISSIONED return (this == UNHEALTHY || this == DECOMMISSIONED
|| this == LOST || this == SHUTDOWN); || this == LOST || this == SHUTDOWN);
} }
public boolean isInactiveState() {
return this == NodeState.DECOMMISSIONED ||
this == NodeState.LOST || this == NodeState.REBOOTED ||
this == NodeState.SHUTDOWN;
}
public boolean isActiveState() {
return this == NodeState.NEW ||
this == NodeState.RUNNING || this == NodeState.UNHEALTHY ||
this == NodeState.DECOMMISSIONING;
}
} }

View File

@ -109,10 +109,20 @@ public static List<RMNode> queryRMNodes(RMContext context,
EnumSet<NodeState> acceptedStates) { EnumSet<NodeState> acceptedStates) {
// nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING. // nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING.
ArrayList<RMNode> results = new ArrayList<RMNode>(); ArrayList<RMNode> results = new ArrayList<RMNode>();
if (acceptedStates.contains(NodeState.NEW) || boolean hasActive = false;
acceptedStates.contains(NodeState.RUNNING) || boolean hasInactive = false;
acceptedStates.contains(NodeState.DECOMMISSIONING) || for (NodeState nodeState : acceptedStates) {
acceptedStates.contains(NodeState.UNHEALTHY)) { if (!hasInactive && nodeState.isInactiveState()) {
hasInactive = true;
}
if (!hasActive && nodeState.isActiveState()) {
hasActive = true;
}
if (hasActive && hasInactive) {
break;
}
}
if (hasActive) {
for (RMNode rmNode : context.getRMNodes().values()) { for (RMNode rmNode : context.getRMNodes().values()) {
if (acceptedStates.contains(rmNode.getState())) { if (acceptedStates.contains(rmNode.getState())) {
results.add(rmNode); results.add(rmNode);
@ -121,9 +131,7 @@ public static List<RMNode> queryRMNodes(RMContext context,
} }
// inactiveNodes contains nodes that are DECOMMISSIONED, LOST, OR REBOOTED // inactiveNodes contains nodes that are DECOMMISSIONED, LOST, OR REBOOTED
if (acceptedStates.contains(NodeState.DECOMMISSIONED) || if (hasInactive) {
acceptedStates.contains(NodeState.LOST) ||
acceptedStates.contains(NodeState.REBOOTED)) {
for (RMNode rmNode : context.getInactiveRMNodes().values()) { for (RMNode rmNode : context.getInactiveRMNodes().values()) {
if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) { if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) {
results.add(rmNode); results.add(rmNode);

View File

@ -441,9 +441,7 @@ public NodesInfo getNodes(@QueryParam(RMWSConsts.STATES) String states) {
NodesInfo nodesInfo = new NodesInfo(); NodesInfo nodesInfo = new NodesInfo();
for (RMNode rmNode : rmNodes) { for (RMNode rmNode : rmNodes) {
NodeInfo nodeInfo = new NodeInfo(rmNode, sched); NodeInfo nodeInfo = new NodeInfo(rmNode, sched);
if (EnumSet if (rmNode.getState().isInactiveState()) {
.of(NodeState.LOST, NodeState.DECOMMISSIONED, NodeState.REBOOTED)
.contains(rmNode.getState())) {
nodeInfo.setNodeHTTPAddress(RMWSConsts.EMPTY); nodeInfo.setNodeHTTPAddress(RMWSConsts.EMPTY);
} }
nodesInfo.add(nodeInfo); nodesInfo.add(nodeInfo);

View File

@ -23,16 +23,20 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -43,11 +47,14 @@
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestRMServerUtils { public class TestRMServerUtils {
@ -120,6 +127,37 @@ public void testValidateAndSplitUpdateResourceRequests() {
Assert.assertEquals(containerIdOk, increaseRequest.getContainerId()); Assert.assertEquals(containerIdOk, increaseRequest.getContainerId());
} }
@Test
public void testQueryRMNodes() throws Exception {
RMContext rmContext = mock(RMContext.class);
NodeId node1 = NodeId.newInstance("node1", 1234);
RMNode rmNode1 = mock(RMNode.class);
ConcurrentMap<NodeId, RMNode> inactiveList =
new ConcurrentHashMap<NodeId, RMNode>();
when(rmNode1.getState()).thenReturn(NodeState.SHUTDOWN);
inactiveList.put(node1, rmNode1);
when(rmContext.getInactiveRMNodes()).thenReturn(inactiveList);
List<RMNode> result = RMServerUtils.queryRMNodes(rmContext,
EnumSet.of(NodeState.SHUTDOWN));
Assert.assertTrue(result.size() != 0);
Assert.assertEquals(result.get(0), rmNode1);
when(rmNode1.getState()).thenReturn(NodeState.DECOMMISSIONED);
result = RMServerUtils.queryRMNodes(rmContext,
EnumSet.of(NodeState.DECOMMISSIONED));
Assert.assertTrue(result.size() != 0);
Assert.assertEquals(result.get(0), rmNode1);
when(rmNode1.getState()).thenReturn(NodeState.LOST);
result = RMServerUtils.queryRMNodes(rmContext,
EnumSet.of(NodeState.LOST));
Assert.assertTrue(result.size() != 0);
Assert.assertEquals(result.get(0), rmNode1);
when(rmNode1.getState()).thenReturn(NodeState.REBOOTED);
result = RMServerUtils.queryRMNodes(rmContext,
EnumSet.of(NodeState.REBOOTED));
Assert.assertTrue(result.size() != 0);
Assert.assertEquals(result.get(0), rmNode1);
}
@Test @Test
public void testGetApplicableNodeCountForAMLocality() throws Exception { public void testGetApplicableNodeCountForAMLocality() throws Exception {
List<NodeId> rack1Nodes = new ArrayList<>(); List<NodeId> rack1Nodes = new ArrayList<>();