diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index 413bbdf7db..e487f698d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -18,11 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.util.resource.Resources; @@ -50,7 +52,8 @@ public class ClusterNodeTracker { private Lock writeLock = readWriteLock.writeLock(); private HashMap nodes = new HashMap<>(); - private Map nodesPerRack = new HashMap<>(); + private Map nodeNameToNodeMap = new HashMap<>(); + private Map> nodesPerRack = new HashMap<>(); private Resource clusterCapacity = Resources.clone(Resources.none()); private Resource staleClusterCapacity = null; @@ -66,14 +69,16 @@ public class ClusterNodeTracker { writeLock.lock(); try { nodes.put(node.getNodeID(), node); + nodeNameToNodeMap.put(node.getNodeName(), node); // Update nodes per rack as well String rackName = node.getRackName(); - Integer numNodes = nodesPerRack.get(rackName); - if (numNodes == null) { - numNodes = 0; + List nodesList = nodesPerRack.get(rackName); + if (nodesList == null) { + nodesList = new ArrayList<>(); + nodesPerRack.put(rackName, nodesList); } - nodesPerRack.put(rackName, ++numNodes); + nodesList.add(node); // Update cluster capacity Resources.addTo(clusterCapacity, node.getTotalResource()); @@ -126,8 +131,8 @@ public class ClusterNodeTracker { readLock.lock(); String rName = rackName == null ? "NULL" : rackName; try { - Integer nodeCount = nodesPerRack.get(rName); - return nodeCount == null ? 0 : nodeCount; + List nodesList = nodesPerRack.get(rName); + return nodesList == null ? 0 : nodesList.size(); } finally { readLock.unlock(); } @@ -154,14 +159,18 @@ public class ClusterNodeTracker { LOG.warn("Attempting to remove a non-existent node " + nodeId); return null; } + nodeNameToNodeMap.remove(node.getNodeName()); // Update nodes per rack as well String rackName = node.getRackName(); - Integer numNodes = nodesPerRack.get(rackName); - if (numNodes > 0) { - nodesPerRack.put(rackName, --numNodes); - } else { + List nodesList = nodesPerRack.get(rackName); + if (nodesList == null) { LOG.error("Attempting to remove node from an empty rack " + rackName); + } else { + nodesList.remove(node); + if (nodesList.isEmpty()) { + nodesPerRack.remove(rackName); + } } // Update cluster capacity @@ -297,4 +306,29 @@ public class ClusterNodeTracker { Collections.sort(sortedList, comparator); return sortedList; } + + /** + * Convenience method to return list of nodes corresponding to resourceName + * passed in the {@link ResourceRequest}. + * + * @param resourceName Host/rack name of the resource, or + * {@link ResourceRequest#ANY} + * @return list of nodes that match the resourceName + */ + public List getNodesByResourceName(final String resourceName) { + Preconditions.checkArgument( + resourceName != null && !resourceName.isEmpty()); + List retNodes = new ArrayList<>(); + if (ResourceRequest.ANY.equals(resourceName)) { + return getAllNodes(); + } else if (nodeNameToNodeMap.containsKey(resourceName)) { + retNodes.add(nodeNameToNodeMap.get(resourceName)); + } else if (nodesPerRack.containsKey(resourceName)) { + return nodesPerRack.get(resourceName); + } else { + LOG.info( + "Could not find a node matching given resourceName " + resourceName); + } + return retNodes; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java new file mode 100644 index 0000000000..7f527f1b43 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Test class to verify ClusterNodeTracker. Using FSSchedulerNode without + * loss of generality. + */ +public class TestClusterNodeTracker { + private ClusterNodeTracker nodeTracker = + new ClusterNodeTracker<>(); + + @Before + public void setup() { + List rmNodes = + MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4)); + for (RMNode rmNode : rmNodes) { + nodeTracker.addNode(new FSSchedulerNode(rmNode, false)); + } + } + + @Test + public void testGetNodeCount() { + assertEquals("Incorrect number of nodes in the cluster", + 8, nodeTracker.nodeCount()); + + assertEquals("Incorrect number of nodes in each rack", + 4, nodeTracker.nodeCount("rack0")); + } + + @Test + public void testGetNodesForResourceName() throws Exception { + assertEquals("Incorrect number of nodes matching ANY", + 8, nodeTracker.getNodesByResourceName(ResourceRequest.ANY).size()); + + assertEquals("Incorrect number of nodes matching rack", + 4, nodeTracker.getNodesByResourceName("rack0").size()); + + assertEquals("Incorrect number of nodes matching node", + 1, nodeTracker.getNodesByResourceName("host0").size()); + } +} \ No newline at end of file