diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/ClusterNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/ClusterNode.java new file mode 100644 index 0000000000..f92a92bb17 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/ClusterNode.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.yarn.api.records.NodeId; + +/** + * Represents a node in the cluster from the NodeQueueLoadMonitor's perspective + */ +public class ClusterNode { + private final AtomicInteger queueLength = new AtomicInteger(0); + private final AtomicInteger queueWaitTime = new AtomicInteger(-1); + private long timestamp; + final NodeId nodeId; + private int queueCapacity = 0; + private final HashSet labels; + + public ClusterNode(NodeId nodeId) { + this.nodeId = nodeId; + this.labels = new HashSet<>(); + updateTimestamp(); + } + + public ClusterNode setQueueLength(int qLength) { + this.queueLength.set(qLength); + return this; + } + + public ClusterNode setQueueWaitTime(int wTime) { + this.queueWaitTime.set(wTime); + return this; + } + + public ClusterNode updateTimestamp() { + this.timestamp = System.currentTimeMillis(); + return this; + } + + public ClusterNode setQueueCapacity(int capacity) { + this.queueCapacity = capacity; + return this; + } + + public ClusterNode setNodeLabels(Collection labelsToAdd) { + labels.clear(); + labels.addAll(labelsToAdd); + return this; + } + + public boolean hasLabel(String label) { + return this.labels.contains(label); + } + + public long getTimestamp() { + return this.timestamp; + } + + public AtomicInteger getQueueLength() { + return this.queueLength; + } + + public AtomicInteger getQueueWaitTime() { + return this.queueWaitTime; + } + + public int getQueueCapacity() { + return this.queueCapacity; + } + + public boolean isQueueFull() { + return this.queueCapacity > 0 && + this.queueLength.get() >= this.queueCapacity; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index fa2ba3060e..bad98c54b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -40,8 +40,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED; @@ -53,10 +53,10 @@ */ public class NodeQueueLoadMonitor implements ClusterMonitor { - private final static Logger LOG = LoggerFactory. + protected final static Logger LOG = LoggerFactory. getLogger(NodeQueueLoadMonitor.class); - private int numNodesForAnyAllocation = + protected int numNodesForAnyAllocation = DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED; /** @@ -70,14 +70,14 @@ public enum LoadComparator implements Comparator { @Override public int compare(ClusterNode o1, ClusterNode o2) { if (getMetric(o1) == getMetric(o2)) { - return (int)(o2.timestamp - o1.timestamp); + return (int)(o2.getTimestamp() - o1.getTimestamp()); } return getMetric(o1) - getMetric(o2); } public int getMetric(ClusterNode c) { return (this == QUEUE_LENGTH) ? - c.queueLength.get() : c.queueWaitTime.get(); + c.getQueueLength().get() : c.getQueueWaitTime().get(); } /** @@ -88,11 +88,11 @@ public int getMetric(ClusterNode c) { */ public boolean compareAndIncrement(ClusterNode c, int incrementSize) { if(this == QUEUE_LENGTH) { - int ret = c.queueLength.addAndGet(incrementSize); - if (ret <= c.queueCapacity) { + int ret = c.getQueueLength().addAndGet(incrementSize); + if (ret <= c.getQueueCapacity()) { return true; } - c.queueLength.addAndGet(-incrementSize); + c.getQueueLength().addAndGet(-incrementSize); return false; } // for queue wait time, we don't have any threshold. @@ -100,57 +100,19 @@ public boolean compareAndIncrement(ClusterNode c, int incrementSize) { } } - static class ClusterNode { - private AtomicInteger queueLength = new AtomicInteger(0); - private AtomicInteger queueWaitTime = new AtomicInteger(-1); - private long timestamp; - final NodeId nodeId; - private int queueCapacity = 0; - - public ClusterNode(NodeId nodeId) { - this.nodeId = nodeId; - updateTimestamp(); - } - - public ClusterNode setQueueLength(int qLength) { - this.queueLength.set(qLength); - return this; - } - - public ClusterNode setQueueWaitTime(int wTime) { - this.queueWaitTime.set(wTime); - return this; - } - - public ClusterNode updateTimestamp() { - this.timestamp = System.currentTimeMillis(); - return this; - } - - public ClusterNode setQueueCapacity(int capacity) { - this.queueCapacity = capacity; - return this; - } - - public boolean isQueueFull() { - return this.queueCapacity > 0 && - this.queueLength.get() >= this.queueCapacity; - } - } - private final ScheduledExecutorService scheduledExecutor; - private final List sortedNodes; - private final Map clusterNodes = + protected final List sortedNodes; + protected final Map clusterNodes = new ConcurrentHashMap<>(); - private final Map nodeByHostName = + protected final Map nodeByHostName = new ConcurrentHashMap<>(); - private final Map> nodeIdsByRack = + protected final Map> nodeIdsByRack = new ConcurrentHashMap<>(); - private final LoadComparator comparator; - private QueueLimitCalculator thresholdCalculator; - private ReentrantReadWriteLock sortedNodesLock = new ReentrantReadWriteLock(); - private ReentrantReadWriteLock clusterNodesLock = + protected final LoadComparator comparator; + protected QueueLimitCalculator thresholdCalculator; + protected ReentrantReadWriteLock sortedNodesLock = new ReentrantReadWriteLock(); + protected ReentrantReadWriteLock clusterNodesLock = new ReentrantReadWriteLock(); Runnable computeTask = new Runnable() { @@ -160,9 +122,7 @@ public void run() { writeLock.lock(); try { try { - List nodeIds = sortNodes(); - sortedNodes.clear(); - sortedNodes.addAll(nodeIds); + updateSortedNodes(); } catch (Exception ex) { LOG.warn("Got Exception while sorting nodes..", ex); } @@ -193,6 +153,14 @@ public NodeQueueLoadMonitor(long nodeComputationInterval, numNodesForAnyAllocation = numNodes; } + protected void updateSortedNodes() { + List nodeIds = sortNodes(true).stream() + .map(n -> n.nodeId) + .collect(Collectors.toList()); + sortedNodes.clear(); + sortedNodes.addAll(nodeIds); + } + List getSortedNodes() { return sortedNodes; } @@ -239,6 +207,7 @@ public void removeNode(RMNode removedRMNode) { ClusterNode node; try { node = this.clusterNodes.remove(removedRMNode.getNodeID()); + onNodeRemoved(node); } finally { writeLock.unlock(); } @@ -251,6 +220,13 @@ public void removeNode(RMNode removedRMNode) { } } + /** + * Provide an integration point for extended class + * @param node the node removed + */ + protected void onNodeRemoved(ClusterNode node) { + } + @Override public void updateNode(RMNode rmNode) { LOG.debug("Node update event from: {}", rmNode.getNodeID()); @@ -260,58 +236,83 @@ public void updateNode(RMNode rmNode) { opportunisticContainersStatus = OpportunisticContainersStatus.newInstance(); } - int opportQueueCapacity = - opportunisticContainersStatus.getOpportQueueCapacity(); - int estimatedQueueWaitTime = - opportunisticContainersStatus.getEstimatedQueueWaitTime(); - int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength(); + // Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node // UNLESS comparator is based on queue length. ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock(); writeLock.lock(); try { - ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID()); - if (currentNode == null) { - if (rmNode.getState() != NodeState.DECOMMISSIONING && - (estimatedQueueWaitTime != -1 || - comparator == LoadComparator.QUEUE_LENGTH)) { - this.clusterNodes.put(rmNode.getNodeID(), - new ClusterNode(rmNode.getNodeID()) - .setQueueWaitTime(estimatedQueueWaitTime) - .setQueueLength(waitQueueLength) - .setQueueCapacity(opportQueueCapacity)); - LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); - } else { - LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "] " + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); - } + ClusterNode clusterNode = this.clusterNodes.get(rmNode.getNodeID()); + if (clusterNode == null) { + onNewNodeAdded(rmNode, opportunisticContainersStatus); } else { - if (rmNode.getState() != NodeState.DECOMMISSIONING && - (estimatedQueueWaitTime != -1 || - comparator == LoadComparator.QUEUE_LENGTH)) { - currentNode - .setQueueWaitTime(estimatedQueueWaitTime) - .setQueueLength(waitQueueLength) - .updateTimestamp(); - LOG.debug("Updating ClusterNode [{}] with queue wait time [{}] and" - + " wait queue length [{}]", rmNode.getNodeID(), - estimatedQueueWaitTime, waitQueueLength); - - } else { - this.clusterNodes.remove(rmNode.getNodeID()); - LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "] " + - "with queue wait time [" + currentNode.queueWaitTime + "] and " + - "wait queue length [" + currentNode.queueLength + "]"); - } + onExistingNodeUpdated(rmNode, clusterNode, opportunisticContainersStatus); } } finally { writeLock.unlock(); } } + protected void onNewNodeAdded( + RMNode rmNode, OpportunisticContainersStatus status) { + int opportQueueCapacity = status.getOpportQueueCapacity(); + int estimatedQueueWaitTime = status.getEstimatedQueueWaitTime(); + int waitQueueLength = status.getWaitQueueLength(); + + if (rmNode.getState() != NodeState.DECOMMISSIONING && + (estimatedQueueWaitTime != -1 || + comparator == LoadComparator.QUEUE_LENGTH)) { + this.clusterNodes.put(rmNode.getNodeID(), + new ClusterNode(rmNode.getNodeID()) + .setQueueWaitTime(estimatedQueueWaitTime) + .setQueueLength(waitQueueLength) + .setNodeLabels(rmNode.getNodeLabels()) + .setQueueCapacity(opportQueueCapacity)); + LOG.info( + "Inserting ClusterNode [{}] with queue wait time [{}] and " + + "wait queue length [{}]", + rmNode.getNode(), + estimatedQueueWaitTime, + waitQueueLength + ); + } else { + LOG.warn( + "IGNORING ClusterNode [{}] with queue wait time [{}] and " + + "wait queue length [{}]", + rmNode.getNode(), + estimatedQueueWaitTime, + waitQueueLength + ); + } + } + + protected void onExistingNodeUpdated( + RMNode rmNode, ClusterNode clusterNode, + OpportunisticContainersStatus status) { + + int estimatedQueueWaitTime = status.getEstimatedQueueWaitTime(); + int waitQueueLength = status.getWaitQueueLength(); + + if (rmNode.getState() != NodeState.DECOMMISSIONING && + (estimatedQueueWaitTime != -1 || + comparator == LoadComparator.QUEUE_LENGTH)) { + clusterNode + .setQueueWaitTime(estimatedQueueWaitTime) + .setQueueLength(waitQueueLength) + .setNodeLabels(rmNode.getNodeLabels()) + .updateTimestamp(); + LOG.debug("Updating ClusterNode [{}] with queue wait time [{}] and" + + " wait queue length [{}]", rmNode.getNodeID(), + estimatedQueueWaitTime, waitQueueLength); + + } else { + this.clusterNodes.remove(rmNode.getNodeID()); + LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "] " + + "with queue wait time [" + clusterNode.getQueueWaitTime() + "] and " + + "wait queue length [" + clusterNode.getQueueLength() + "]"); + } + } + @Override public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { LOG.debug("Node resource update event from: {}", rmNode.getNodeID()); @@ -374,7 +375,7 @@ public RMNode selectRackLocalNode(String rackName, Set blacklist) { } public RMNode selectAnyNode(Set blacklist) { - List nodeIds = selectLeastLoadedNodes(numNodesForAnyAllocation); + List nodeIds = getCandidatesForSelectAnyNode(); int size = nodeIds.size(); if (size <= 0) { return null; @@ -395,22 +396,26 @@ public RMNode selectAnyNode(Set blacklist) { return null; } - private void removeFromNodeIdsByRack(RMNode removedNode) { + protected List getCandidatesForSelectAnyNode() { + return selectLeastLoadedNodes(numNodesForAnyAllocation); + } + + protected void removeFromNodeIdsByRack(RMNode removedNode) { nodeIdsByRack.computeIfPresent(removedNode.getRackName(), (k, v) -> v).remove(removedNode.getNodeID()); } - private void addIntoNodeIdsByRack(RMNode addedNode) { + protected void addIntoNodeIdsByRack(RMNode addedNode) { nodeIdsByRack.compute(addedNode.getRackName(), (k, v) -> v == null ? ConcurrentHashMap.newKeySet() : v).add(addedNode.getNodeID()); } - private List sortNodes() { + protected List sortNodes(boolean excludeFullNodes) { ReentrantReadWriteLock.ReadLock readLock = clusterNodesLock.readLock(); readLock.lock(); try { - ArrayList aList = new ArrayList<>(this.clusterNodes.values()); - List retList = new ArrayList<>(); + ArrayList aList = new ArrayList<>(this.clusterNodes.values()); + List retList = new ArrayList<>(); Object[] nodes = aList.toArray(); // Collections.sort would do something similar by calling Arrays.sort // internally but would finally iterate through the input list (aList) @@ -420,9 +425,11 @@ private List sortNodes() { Arrays.sort(nodes, (Comparator)comparator); for (int j=0; j < nodes.length; j++) { ClusterNode cNode = (ClusterNode)nodes[j]; - // Exclude nodes whose queue is already full. - if (!cNode.isQueueFull()) { - retList.add(cNode.nodeId); + // Only add node to the result list when either condition is met: + // 1. we don't exclude full nodes + // 2. we do exclude full nodes, but the current node is not full + if (!excludeFullNodes || !cNode.isQueueFull()) { + retList.add(cNode); } } return retList; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java index ab3a577d9f..8bd2f24d8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java @@ -20,7 +20,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.ClusterNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.LoadComparator; import java.util.List;