From 604b2489a9e168a0fd702343f6f8844df9e86d17 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Mon, 4 Feb 2019 12:48:28 +0530 Subject: [PATCH] YARN-9206. RMServerUtils does not count SHUTDOWN as an accepted state. Contributed by Kuhu Shukla. --- .../hadoop/yarn/api/records/NodeState.java | 12 ++++++ .../server/resourcemanager/RMServerUtils.java | 22 +++++++---- .../resourcemanager/webapp/RMWebServices.java | 4 +- .../resourcemanager/TestRMServerUtils.java | 38 +++++++++++++++++++ 4 files changed, 66 insertions(+), 10 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java index d0344fb269..2700cf296b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java @@ -55,4 +55,16 @@ public boolean isUnusable() { return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST || this == SHUTDOWN); } + + public boolean isInactiveState() { + return this == NodeState.DECOMMISSIONED || + this == NodeState.LOST || this == NodeState.REBOOTED || + this == NodeState.SHUTDOWN; + } + + public boolean isActiveState() { + return this == NodeState.NEW || + this == NodeState.RUNNING || this == NodeState.UNHEALTHY || + this == NodeState.DECOMMISSIONING; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index b18b12e845..bad890fbfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -109,10 +109,20 @@ public static List queryRMNodes(RMContext context, EnumSet acceptedStates) { // nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING. ArrayList results = new ArrayList(); - if (acceptedStates.contains(NodeState.NEW) || - acceptedStates.contains(NodeState.RUNNING) || - acceptedStates.contains(NodeState.DECOMMISSIONING) || - acceptedStates.contains(NodeState.UNHEALTHY)) { + boolean hasActive = false; + boolean hasInactive = false; + for (NodeState nodeState : acceptedStates) { + if (!hasInactive && nodeState.isInactiveState()) { + hasInactive = true; + } + if (!hasActive && nodeState.isActiveState()) { + hasActive = true; + } + if (hasActive && hasInactive) { + break; + } + } + if (hasActive) { for (RMNode rmNode : context.getRMNodes().values()) { if (acceptedStates.contains(rmNode.getState())) { results.add(rmNode); @@ -121,9 +131,7 @@ public static List queryRMNodes(RMContext context, } // inactiveNodes contains nodes that are DECOMMISSIONED, LOST, OR REBOOTED - if (acceptedStates.contains(NodeState.DECOMMISSIONED) || - acceptedStates.contains(NodeState.LOST) || - acceptedStates.contains(NodeState.REBOOTED)) { + if (hasInactive) { for (RMNode rmNode : context.getInactiveRMNodes().values()) { if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) { results.add(rmNode); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 01173768c1..33858ea6c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -441,9 +441,7 @@ public NodesInfo getNodes(@QueryParam(RMWSConsts.STATES) String states) { NodesInfo nodesInfo = new NodesInfo(); for (RMNode rmNode : rmNodes) { NodeInfo nodeInfo = new NodeInfo(rmNode, sched); - if (EnumSet - .of(NodeState.LOST, NodeState.DECOMMISSIONED, NodeState.REBOOTED) - .contains(rmNode.getState())) { + if (rmNode.getState().isInactiveState()) { nodeInfo.setNodeHTTPAddress(RMWSConsts.EMPTY); } nodesInfo.add(nodeInfo); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMServerUtils.java index c80469bfe9..9c233b9cf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMServerUtils.java @@ -23,16 +23,20 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -43,11 +47,14 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestRMServerUtils { @@ -120,6 +127,37 @@ public void testValidateAndSplitUpdateResourceRequests() { Assert.assertEquals(containerIdOk, increaseRequest.getContainerId()); } + @Test + public void testQueryRMNodes() throws Exception { + RMContext rmContext = mock(RMContext.class); + NodeId node1 = NodeId.newInstance("node1", 1234); + RMNode rmNode1 = mock(RMNode.class); + ConcurrentMap inactiveList = + new ConcurrentHashMap(); + when(rmNode1.getState()).thenReturn(NodeState.SHUTDOWN); + inactiveList.put(node1, rmNode1); + when(rmContext.getInactiveRMNodes()).thenReturn(inactiveList); + List result = RMServerUtils.queryRMNodes(rmContext, + EnumSet.of(NodeState.SHUTDOWN)); + Assert.assertTrue(result.size() != 0); + Assert.assertEquals(result.get(0), rmNode1); + when(rmNode1.getState()).thenReturn(NodeState.DECOMMISSIONED); + result = RMServerUtils.queryRMNodes(rmContext, + EnumSet.of(NodeState.DECOMMISSIONED)); + Assert.assertTrue(result.size() != 0); + Assert.assertEquals(result.get(0), rmNode1); + when(rmNode1.getState()).thenReturn(NodeState.LOST); + result = RMServerUtils.queryRMNodes(rmContext, + EnumSet.of(NodeState.LOST)); + Assert.assertTrue(result.size() != 0); + Assert.assertEquals(result.get(0), rmNode1); + when(rmNode1.getState()).thenReturn(NodeState.REBOOTED); + result = RMServerUtils.queryRMNodes(rmContext, + EnumSet.of(NodeState.REBOOTED)); + Assert.assertTrue(result.size() != 0); + Assert.assertEquals(result.get(0), rmNode1); + } + @Test public void testGetApplicableNodeCountForAMLocality() throws Exception { List rack1Nodes = new ArrayList<>();