diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 135f3486ce..8fe9c7931a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -404,18 +404,51 @@ public boolean isAllInvalidDivisor(Resource r) { @Override public float ratio(Resource a, Resource b) { - float ratio = 0.0f; + return ratio(a, b, true); + } + + /** + * Computes the ratio of resource a over resource b, + * where the boolean flag {@literal isDominantShare} allows + * specification of whether the max- or min-share should be computed. + * @param a the numerator resource. + * @param b the denominator resource. + * @param isDominantShare whether the dominant (max) share should be computed, + * computes the min-share if false. + * @return the max- or min-share ratio of the resources. + */ + private float ratio(Resource a, Resource b, boolean isDominantShare) { + float ratio = isDominantShare ? 0.0f : 1.0f; int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation aResourceInformation = a.getResourceInformation(i); ResourceInformation bResourceInformation = b.getResourceInformation(i); final float tmp = divideSafelyAsFloat(aResourceInformation.getValue(), bResourceInformation.getValue()); - ratio = ratio > tmp ? ratio : tmp; + if (isDominantShare) { + ratio = Math.max(ratio, tmp); + } else { + ratio = Math.min(ratio, tmp); + } } return ratio; } + /** + * Computes the ratio of resource a over resource b. + * However, different from ratio(Resource, Resource), + * this returns the min-share of the resources. + * For example, ratio(Resource(10, 50), Resource(100, 100)) would return 0.5, + * whereas minRatio(Resource(10, 50), Resource(100, 100)) would return 0.1. + * @param a the numerator resource. + * @param b the denominator resource. + * @return the min-share ratio of the resources. + */ + @Unstable + public float minRatio(Resource a, Resource b) { + return ratio(a, b, false); + } + @Override public Resource divideAndCeil(Resource numerator, int denominator) { return divideAndCeil(numerator, (long) denominator); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 01c9b93417..15b5c7f765 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -257,7 +257,9 @@ public OpportunisticContainerAllocatorAMService(RMContext rmContext, int limitMin, limitMax; - if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) { + if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH || + comparator == + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES) { limitMin = rmContext.getYarnConfiguration() .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH, YarnConfiguration. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java index 223e1a1bf3..482612de25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java @@ -251,14 +251,15 @@ private List allocateNodeLocal( String userName, Map> allocations) throws YarnException { List allocatedContainers = new ArrayList<>(); + final ResourceRequest resourceRequest = enrichedAsk.getRequest(); while (toAllocate > 0) { RMNode node = nodeQueueLoadMonitor.selectLocalNode(nodeLocation, - blacklist); + blacklist, resourceRequest.getCapability()); if (node != null) { toAllocate--; Container container = createContainer(rmIdentifier, appParams, idCounter, id, userName, allocations, nodeLocation, - enrichedAsk.getRequest(), convertToRemoteNode(node)); + resourceRequest, convertToRemoteNode(node)); allocatedContainers.add(container); LOG.info("Allocated [{}] as opportunistic at location [{}]", container.getId(), nodeLocation); @@ -280,14 +281,15 @@ private List allocateRackLocal(EnrichedResourceRequest enrichedAsk, String userName, Map> allocations) throws YarnException { List allocatedContainers = new ArrayList<>(); + final ResourceRequest resourceRequest = enrichedAsk.getRequest(); while (toAllocate > 0) { RMNode node = nodeQueueLoadMonitor.selectRackLocalNode(rackLocation, - blacklist); + blacklist, resourceRequest.getCapability()); if (node != null) { toAllocate--; Container container = createContainer(rmIdentifier, appParams, idCounter, id, userName, allocations, rackLocation, - enrichedAsk.getRequest(), convertToRemoteNode(node)); + resourceRequest, convertToRemoteNode(node)); allocatedContainers.add(container); metrics.incrRackLocalOppContainers(); LOG.info("Allocated [{}] as opportunistic at location [{}]", @@ -309,13 +311,15 @@ private List allocateAny(EnrichedResourceRequest enrichedAsk, String userName, Map> allocations) throws YarnException { List allocatedContainers = new ArrayList<>(); + final ResourceRequest resourceRequest = enrichedAsk.getRequest(); while (toAllocate > 0) { - RMNode node = nodeQueueLoadMonitor.selectAnyNode(blacklist); + RMNode node = nodeQueueLoadMonitor.selectAnyNode( + blacklist, resourceRequest.getCapability()); if (node != null) { toAllocate--; Container container = createContainer(rmIdentifier, appParams, idCounter, id, userName, allocations, ResourceRequest.ANY, - enrichedAsk.getRequest(), convertToRemoteNode(node)); + resourceRequest, convertToRemoteNode(node)); allocatedContainers.add(container); metrics.incrOffSwitchOppContainers(); LOG.info("Allocated [{}] as opportunistic at location [{}]", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/ClusterNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/ClusterNode.java index f92a92bb17..065f97c4ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/ClusterNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/ClusterNode.java @@ -20,74 +20,243 @@ import java.util.Collection; import java.util.HashSet; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; /** * Represents a node in the cluster from the NodeQueueLoadMonitor's perspective */ public class ClusterNode { - private final AtomicInteger queueLength = new AtomicInteger(0); - private final AtomicInteger queueWaitTime = new AtomicInteger(-1); + /** + * Properties class used to initialize/change fields in ClusterNode. + */ + public static final class Properties { + private int queueLength = 0; + private int queueWaitTime = -1; + private long timestamp; + private int queueCapacity = 0; + private boolean queueCapacityIsSet = false; + private final HashSet labels; + private Resource capability = null; + private Resource allocatedResource = null; + + public static Properties newInstance() { + return new Properties(); + } + + Properties setQueueLength(int qLength) { + this.queueLength = qLength; + return this; + } + + Properties setQueueWaitTime(int wTime) { + this.queueWaitTime = wTime; + return this; + } + + Properties updateTimestamp() { + this.timestamp = System.currentTimeMillis(); + return this; + } + + Properties setQueueCapacity(int capacity) { + this.queueCapacity = capacity; + this.queueCapacityIsSet = true; + return this; + } + + Properties setNodeLabels(Collection labelsToAdd) { + labels.clear(); + labels.addAll(labelsToAdd); + return this; + } + + Properties setCapability(Resource nodeCapability) { + this.capability = nodeCapability; + return this; + } + + Properties setAllocatedResource(Resource allocResource) { + this.allocatedResource = allocResource; + return this; + } + + private Properties() { + labels = new HashSet<>(); + } + } + + private int queueLength = 0; + private int queueWaitTime = -1; private long timestamp; final NodeId nodeId; private int queueCapacity = 0; private final HashSet labels; + private Resource capability = Resources.none(); + private Resource allocatedResource = Resources.none(); + private final ReentrantReadWriteLock.WriteLock writeLock; + private final ReentrantReadWriteLock.ReadLock readLock; public ClusterNode(NodeId nodeId) { this.nodeId = nodeId; this.labels = new HashSet<>(); - updateTimestamp(); - } - - public ClusterNode setQueueLength(int qLength) { - this.queueLength.set(qLength); - return this; - } - - public ClusterNode setQueueWaitTime(int wTime) { - this.queueWaitTime.set(wTime); - return this; - } - - public ClusterNode updateTimestamp() { + final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.writeLock = lock.writeLock(); + this.readLock = lock.readLock(); this.timestamp = System.currentTimeMillis(); - return this; } - public ClusterNode setQueueCapacity(int capacity) { - this.queueCapacity = capacity; - return this; + public ClusterNode setProperties(final Properties properties) { + writeLock.lock(); + try { + if (properties.capability == null) { + this.capability = Resources.none(); + } else { + this.capability = properties.capability; + } + + if (properties.allocatedResource == null) { + this.allocatedResource = Resources.none(); + } else { + this.allocatedResource = properties.allocatedResource; + } + + this.queueLength = properties.queueLength; + this.queueWaitTime = properties.queueWaitTime; + this.timestamp = properties.timestamp; + if (properties.queueCapacityIsSet) { + // queue capacity is only set on node add, not on node updates + this.queueCapacity = properties.queueCapacity; + } + this.labels.clear(); + this.labels.addAll(properties.labels); + return this; + } finally { + writeLock.unlock(); + } } - public ClusterNode setNodeLabels(Collection labelsToAdd) { - labels.clear(); - labels.addAll(labelsToAdd); - return this; + public Resource getAllocatedResource() { + readLock.lock(); + try { + return this.allocatedResource; + } finally { + readLock.unlock(); + } + } + + public Resource getAvailableResource() { + readLock.lock(); + try { + return Resources.subtractNonNegative(capability, allocatedResource); + } finally { + readLock.unlock(); + } + } + + public Resource getCapability() { + readLock.lock(); + try { + return this.capability; + } finally { + readLock.unlock(); + } } public boolean hasLabel(String label) { - return this.labels.contains(label); + readLock.lock(); + try { + return this.labels.contains(label); + } finally { + readLock.unlock(); + } } public long getTimestamp() { - return this.timestamp; + readLock.lock(); + try { + return this.timestamp; + } finally { + readLock.unlock(); + } } - public AtomicInteger getQueueLength() { - return this.queueLength; + public int getQueueLength() { + readLock.lock(); + try { + return this.queueLength; + } finally { + readLock.unlock(); + } } - public AtomicInteger getQueueWaitTime() { - return this.queueWaitTime; + public int getQueueWaitTime() { + readLock.lock(); + try { + return this.queueWaitTime; + } finally { + readLock.unlock(); + } } public int getQueueCapacity() { - return this.queueCapacity; + readLock.lock(); + try { + return this.queueCapacity; + } finally { + readLock.unlock(); + } + } + + public boolean compareAndIncrementAllocation( + final int incrementQLen, + final ResourceCalculator resourceCalculator, + final Resource requested) { + writeLock.lock(); + try { + final Resource currAvailable = Resources.subtractNonNegative( + capability, allocatedResource); + if (resourceCalculator.fitsIn(requested, currAvailable)) { + allocatedResource = Resources.add(allocatedResource, requested); + return true; + } + + if (!resourceCalculator.fitsIn(requested, capability)) { + // If does not fit at all, do not allocate + return false; + } + + return compareAndIncrementAllocation(incrementQLen); + } finally { + writeLock.unlock(); + } + } + + public boolean compareAndIncrementAllocation(final int incrementQLen) { + writeLock.lock(); + try { + final int added = queueLength + incrementQLen; + if (added <= queueCapacity) { + queueLength = added; + return true; + } + return false; + } finally { + writeLock.unlock(); + } } public boolean isQueueFull() { - return this.queueCapacity > 0 && - this.queueLength.get() >= this.queueCapacity; + readLock.lock(); + try { + return this.queueCapacity > 0 && + this.queueLength >= this.queueCapacity; + } finally { + readLock.unlock(); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index 1e2b7ef6da..74dc2ce7f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -18,8 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; +import org.apache.commons.math3.util.Precision; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.records.NodeId; @@ -64,39 +69,165 @@ public class NodeQueueLoadMonitor implements ClusterMonitor { * of two Nodes are compared. */ public enum LoadComparator implements Comparator { + /** + * This policy only considers queue length. + * When allocating, increments queue length without looking at resources + * available on the node, and when sorting, also only sorts by queue length. + */ QUEUE_LENGTH, - QUEUE_WAIT_TIME; + /** + * This policy only considers the wait time of containers in the queue. + * Neither looks at resources nor at queue length. + */ + QUEUE_WAIT_TIME, + /** + * This policy considers both queue length and resources. + * When allocating, first decrements resources available on a node. + * If resources are available, does not place OContainers on the node queue. + * When sorting, it first sorts by queue length, + * then by available resources. + */ + QUEUE_LENGTH_THEN_RESOURCES; + + private Resource clusterResource = Resources.none(); + private final DominantResourceCalculator resourceCalculator = + new DominantResourceCalculator(); + + private boolean shouldPerformMinRatioComputation() { + if (clusterResource == null) { + return false; + } + + return !resourceCalculator.isAnyMajorResourceZeroOrNegative( + clusterResource); + } + + /** + * Compares queue length of nodes first (shortest first), + * then compares available resources normalized + * over cluster resources (most available resources first). + * @param o1 the first ClusterNode + * @param o2 the second ClusterNode + * @return the difference the two ClusterNodes for sorting + */ + private int compareQueueLengthThenResources( + final ClusterNode o1, final ClusterNode o2) { + int diff = o1.getQueueLength() - o2.getQueueLength(); + if (diff != 0) { + return diff; + } + + final Resource availableResource1 = o1.getAvailableResource(); + final Resource availableResource2 = o2.getAvailableResource(); + + // Cluster resource should be valid before performing min-ratio logic + // Use raw available resource comparison otherwise + if (shouldPerformMinRatioComputation()) { + // Takes the least available resource of the two nodes, + // normalized to the overall cluster resource + final float availableRatio1 = + resourceCalculator.minRatio(availableResource1, clusterResource); + final float availableRatio2 = + resourceCalculator.minRatio(availableResource2, clusterResource); + + // The one with more available resources should be placed first + diff = Precision.compareTo( + availableRatio2, availableRatio1, Precision.EPSILON); + } + + if (diff == 0) { + // Compare absolute value if ratios are the same + diff = availableResource2.getVirtualCores() - availableResource1.getVirtualCores(); + } + + if (diff == 0) { + diff = Long.compare(availableResource2.getMemorySize(), + availableResource1.getMemorySize()); + } + + return diff; + } @Override public int compare(ClusterNode o1, ClusterNode o2) { - if (getMetric(o1) == getMetric(o2)) { - return (int)(o2.getTimestamp() - o1.getTimestamp()); + int diff; + switch (this) { + case QUEUE_LENGTH_THEN_RESOURCES: + diff = compareQueueLengthThenResources(o1, o2); + break; + case QUEUE_WAIT_TIME: + case QUEUE_LENGTH: + default: + diff = getMetric(o1) - getMetric(o2); + break; } - return getMetric(o1) - getMetric(o2); + + if (diff == 0) { + return (int) (o2.getTimestamp() - o1.getTimestamp()); + } + return diff; + } + + @VisibleForTesting + void setClusterResource(Resource clusterResource) { + this.clusterResource = clusterResource; + } + + public ResourceCalculator getResourceCalculator() { + return resourceCalculator; } public int getMetric(ClusterNode c) { - return (this == QUEUE_LENGTH) ? - c.getQueueLength().get() : c.getQueueWaitTime().get(); + switch (this) { + case QUEUE_WAIT_TIME: + return c.getQueueWaitTime(); + case QUEUE_LENGTH: + case QUEUE_LENGTH_THEN_RESOURCES: + default: + return c.getQueueLength(); + } } /** * Increment the metric by a delta if it is below the threshold. * @param c ClusterNode * @param incrementSize increment size + * @param requested the requested resource * @return true if the metric was below threshold and was incremented. */ - public boolean compareAndIncrement(ClusterNode c, int incrementSize) { - if(this == QUEUE_LENGTH) { - int ret = c.getQueueLength().addAndGet(incrementSize); - if (ret <= c.getQueueCapacity()) { - return true; - } - c.getQueueLength().addAndGet(-incrementSize); - return false; + public boolean compareAndIncrement( + ClusterNode c, int incrementSize, Resource requested) { + switch (this) { + case QUEUE_LENGTH_THEN_RESOURCES: + return c.compareAndIncrementAllocation( + incrementSize, resourceCalculator, requested); + case QUEUE_WAIT_TIME: + // for queue wait time, we don't have any threshold. + return true; + case QUEUE_LENGTH: + default: + return c.compareAndIncrementAllocation(incrementSize); } - // for queue wait time, we don't have any threshold. - return true; + } + + /** + * Whether we should be placing OContainers on a node. + * @param cn the clusterNode + * @return whether we should be placing OContainers on a node. + */ + public boolean isNodeAvailable(final ClusterNode cn) { + int queueCapacity = cn.getQueueCapacity(); + int queueLength = cn.getQueueLength(); + if (this == LoadComparator.QUEUE_LENGTH_THEN_RESOURCES) { + if (queueCapacity <= 0) { + return queueLength <= 0; + } else { + return queueLength < queueCapacity; + } + } + // In the special case where queueCapacity is 0 for the node, + // the container can be allocated on the node but will be rejected there + return queueCapacity <= 0 || queueLength < queueCapacity; } } @@ -261,13 +392,21 @@ protected void onNewNodeAdded( if (rmNode.getState() != NodeState.DECOMMISSIONING && (estimatedQueueWaitTime != -1 || - comparator == LoadComparator.QUEUE_LENGTH)) { - this.clusterNodes.put(rmNode.getNodeID(), - new ClusterNode(rmNode.getNodeID()) + comparator == LoadComparator.QUEUE_LENGTH || + comparator == LoadComparator.QUEUE_LENGTH_THEN_RESOURCES)) { + final ClusterNode.Properties properties = + ClusterNode.Properties.newInstance() .setQueueWaitTime(estimatedQueueWaitTime) .setQueueLength(waitQueueLength) .setNodeLabels(rmNode.getNodeLabels()) - .setQueueCapacity(opportQueueCapacity)); + .setCapability(rmNode.getTotalCapability()) + .setAllocatedResource(rmNode.getAllocatedContainerResource()) + .setQueueCapacity(opportQueueCapacity) + .updateTimestamp(); + + this.clusterNodes.put(rmNode.getNodeID(), + new ClusterNode(rmNode.getNodeID()).setProperties(properties)); + LOG.info( "Inserting ClusterNode [{}] with queue wait time [{}] and " + "wait queue length [{}]", @@ -295,12 +434,19 @@ protected void onExistingNodeUpdated( if (rmNode.getState() != NodeState.DECOMMISSIONING && (estimatedQueueWaitTime != -1 || - comparator == LoadComparator.QUEUE_LENGTH)) { - clusterNode - .setQueueWaitTime(estimatedQueueWaitTime) - .setQueueLength(waitQueueLength) - .setNodeLabels(rmNode.getNodeLabels()) - .updateTimestamp(); + comparator == LoadComparator.QUEUE_LENGTH || + comparator == LoadComparator.QUEUE_LENGTH_THEN_RESOURCES)) { + final ClusterNode.Properties properties = + ClusterNode.Properties.newInstance() + .setQueueWaitTime(estimatedQueueWaitTime) + .setQueueLength(waitQueueLength) + .setNodeLabels(rmNode.getNodeLabels()) + .setCapability(rmNode.getTotalCapability()) + .setAllocatedResource(rmNode.getAllocatedContainerResource()) + .updateTimestamp(); + + clusterNode.setProperties(properties); + LOG.debug("Updating ClusterNode [{}] with queue wait time [{}] and" + " wait queue length [{}]", rmNode.getNodeID(), estimatedQueueWaitTime, waitQueueLength); @@ -345,27 +491,47 @@ public List selectLeastLoadedNodes(int k) { } } - public RMNode selectLocalNode(String hostName, Set blacklist) { + /** + * Selects the node as specified by hostName for resource allocation, + * unless the node has been blacklisted. + * @param hostName the hostname of the node for local resource allocation + * @param blacklist the blacklisted nodes + * @param request the requested resource + * @return the selected node, null if the node is full or is blacklisted + */ + public RMNode selectLocalNode( + String hostName, Set blacklist, Resource request) { if (blacklist.contains(hostName)) { return null; } RMNode node = nodeByHostName.get(hostName); if (node != null) { ClusterNode clusterNode = clusterNodes.get(node.getNodeID()); - if (comparator.compareAndIncrement(clusterNode, 1)) { + if (clusterNode != null && comparator + .compareAndIncrement(clusterNode, 1, request)) { return node; } } return null; } - public RMNode selectRackLocalNode(String rackName, Set blacklist) { + /** + * Selects a node from the rack as specified by rackName + * for resource allocation, excluding blacklisted nodes + * @param rackName the rack name for rack-local resource allocation + * @param blacklist the blacklisted nodes + * @param request the requested resource + * @return the selected node, null if no suitable nodes + */ + public RMNode selectRackLocalNode( + String rackName, Set blacklist, Resource request) { Set nodesOnRack = nodeIdsByRack.get(rackName); if (nodesOnRack != null) { for (NodeId nodeId : nodesOnRack) { if (!blacklist.contains(nodeId.getHost())) { ClusterNode node = clusterNodes.get(nodeId); - if (node != null && comparator.compareAndIncrement(node, 1)) { + if (node != null && + comparator.compareAndIncrement(node, 1, request)) { return nodeByHostName.get(nodeId.getHost()); } } @@ -374,7 +540,14 @@ public RMNode selectRackLocalNode(String rackName, Set blacklist) { return null; } - public RMNode selectAnyNode(Set blacklist) { + /** + * Selects a node from all ClusterNodes for resource allocation, + * excluding blacklisted nodes. + * @param blacklist the blacklisted nodes + * @param request the requested resource + * @return the selected node, null if no suitable nodes + */ + public RMNode selectAnyNode(Set blacklist, Resource request) { List nodeIds = getCandidatesForSelectAnyNode(); int size = nodeIds.size(); if (size <= 0) { @@ -388,7 +561,8 @@ public RMNode selectAnyNode(Set blacklist) { NodeId nodeId = nodeIds.get(index); if (nodeId != null && !blacklist.contains(nodeId.getHost())) { ClusterNode node = clusterNodes.get(nodeId); - if (node != null && comparator.compareAndIncrement(node, 1)) { + if (node != null && comparator.compareAndIncrement( + node, 1, request)) { return nodeByHostName.get(nodeId.getHost()); } } @@ -402,7 +576,10 @@ protected List getCandidatesForSelectAnyNode() { protected void removeFromNodeIdsByRack(RMNode removedNode) { nodeIdsByRack.computeIfPresent(removedNode.getRackName(), - (k, v) -> v).remove(removedNode.getNodeID()); + (k, v) -> { + v.remove(removedNode.getNodeID()); + return v; + }); } protected void addIntoNodeIdsByRack(RMNode addedNode) { @@ -414,21 +591,21 @@ protected List sortNodes(boolean excludeFullNodes) { ReentrantReadWriteLock.ReadLock readLock = clusterNodesLock.readLock(); readLock.lock(); try { - ArrayList aList = new ArrayList<>(this.clusterNodes.values()); - List retList = new ArrayList<>(); - Object[] nodes = aList.toArray(); - // Collections.sort would do something similar by calling Arrays.sort - // internally but would finally iterate through the input list (aList) - // to reset the value of each element. Since we don't really care about - // 'aList', we can use the iteration to create the list of nodeIds which - // is what we ultimately care about. - Arrays.sort(nodes, (Comparator)comparator); - for (int j=0; j < nodes.length; j++) { - ClusterNode cNode = (ClusterNode)nodes[j]; - // Only add node to the result list when either condition is met: - // 1. we don't exclude full nodes - // 2. we do exclude full nodes, but the current node is not full - if (!excludeFullNodes || !cNode.isQueueFull()) { + final ClusterNode[] nodes = new ClusterNode[clusterNodes.size()]; + int nodesIdx = 0; + final Resource clusterResource = Resource.newInstance(Resources.none()); + for (final ClusterNode node : this.clusterNodes.values()) { + Resources.addTo(clusterResource, node.getCapability()); + nodes[nodesIdx] = node; + nodesIdx++; + } + + comparator.setClusterResource(clusterResource); + + final List retList = new ArrayList<>(); + Arrays.sort(nodes, comparator); + for (final ClusterNode cNode : nodes) { + if (!excludeFullNodes || comparator.isNodeAvailable(cNode)) { retList.add(cNode); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java index eedd9de93b..c8ebe6f8f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java @@ -18,17 +18,29 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; +import org.apache.hadoop.net.NodeBase; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -36,8 +48,18 @@ */ public class TestNodeQueueLoadMonitor { + // Extra resource type to test that all resource dimensions are considered + private static final String NETWORK_RESOURCE = "network"; private final static int DEFAULT_MAX_QUEUE_LENGTH = 200; + // Note: The following variables are private static resources + // re-initialized on each test because resource dimensions considered + // are initialized in a static method. + // Declaring them as static final will "lock-in" resource dimensions and + // disallow specification of a new resource dimension ("network") in tests. + private static Resource defaultResourceRequested; + private static Resource defaultCapacity; + static class FakeNodeId extends NodeId { final String host; final int port; @@ -70,6 +92,44 @@ public String toString() { } } + private static Resource newResourceInstance(long memory, int vCores) { + return newResourceInstance(memory, vCores, 0L); + } + + private static Resource newResourceInstance( + final long memory, final int vCores, final long network) { + return Resource.newInstance(memory, vCores, + ImmutableMap.of(NETWORK_RESOURCE, network)); + } + + private static long getNetworkResourceValue(final Resource resource) { + return resource.getResourceValue(NETWORK_RESOURCE); + } + + public static void addNewTypesToResources(String... resourceTypes) { + // Initialize resource map + Map riMap = new HashMap<>(); + + // Initialize mandatory resources + riMap.put(ResourceInformation.MEMORY_URI, ResourceInformation.MEMORY_MB); + riMap.put(ResourceInformation.VCORES_URI, ResourceInformation.VCORES); + + for (String newResource : resourceTypes) { + riMap.put(newResource, ResourceInformation + .newInstance(newResource, "", 0, ResourceTypes.COUNTABLE, 0, + Integer.MAX_VALUE)); + } + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + } + + @BeforeClass + public static void classSetUp() { + addNewTypesToResources(NETWORK_RESOURCE); + defaultResourceRequested = newResourceInstance(128, 1, 1); + defaultCapacity = newResourceInstance(1024, 8, 1000); + } + @Test public void testWaitTimeSort() { NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( @@ -79,7 +139,6 @@ public void testWaitTimeSort() { selector.updateNode(createRMNode("h3", 3, 10, 10)); selector.computeTask.run(); List nodeIds = selector.selectNodes(); - System.out.println("1-> " + nodeIds); Assert.assertEquals("h2:2", nodeIds.get(0).toString()); Assert.assertEquals("h3:3", nodeIds.get(1).toString()); Assert.assertEquals("h1:1", nodeIds.get(2).toString()); @@ -88,7 +147,6 @@ public void testWaitTimeSort() { selector.updateNode(createRMNode("h3", 3, 2, 10)); selector.computeTask.run(); nodeIds = selector.selectNodes(); - System.out.println("2-> "+ nodeIds); Assert.assertEquals("h3:3", nodeIds.get(0).toString()); Assert.assertEquals("h2:2", nodeIds.get(1).toString()); Assert.assertEquals("h1:1", nodeIds.get(2).toString()); @@ -97,7 +155,6 @@ public void testWaitTimeSort() { selector.updateNode(createRMNode("h4", 4, -1, 10)); selector.computeTask.run(); nodeIds = selector.selectNodes(); - System.out.println("3-> "+ nodeIds); // No change Assert.assertEquals("h3:3", nodeIds.get(0).toString()); Assert.assertEquals("h2:2", nodeIds.get(1).toString()); @@ -186,6 +243,208 @@ public void testQueueLengthSort() { Assert.assertEquals("h4:4", nodeIds.get(2).toString()); } + @Test + public void testQueueLengthThenResourcesSort() { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES); + + // Node and queue sizes were selected such that we can determine the + // order of these nodes in the selectNodes call deterministically + // h2 -> h1 -> h3 -> h4 + selector.updateNode(createRMNode( + "h1", 1, -1, 0, + Resources.multiply(defaultResourceRequested, 3), defaultCapacity)); + selector.updateNode(createRMNode( + "h2", 2, -1, 0, + Resources.multiply(defaultResourceRequested, 2), defaultCapacity)); + selector.updateNode(createRMNode( + "h3", 3, -1, 5, + Resources.multiply(defaultResourceRequested, 3), defaultCapacity)); + selector.updateNode(createRMNode( + "h4", 4, -1, 10, + Resources.multiply(defaultResourceRequested, 2), defaultCapacity)); + selector.computeTask.run(); + List nodeIds = selector.selectNodes(); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h1:1", nodeIds.get(1).toString()); + Assert.assertEquals("h3:3", nodeIds.get(2).toString()); + Assert.assertEquals("h4:4", nodeIds.get(3).toString()); + + // Now update node3 + // node3 should now rank after node4 since it has the same queue length + // but less resources available + selector.updateNode(createRMNode( + "h3", 3, -1, 10, + Resources.multiply(defaultResourceRequested, 3), defaultCapacity)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h1:1", nodeIds.get(1).toString()); + Assert.assertEquals("h4:4", nodeIds.get(2).toString()); + Assert.assertEquals("h3:3", nodeIds.get(3).toString()); + + // Now update h3 and fill its queue -- it should no longer be available + selector.updateNode(createRMNode("h3", 3, -1, + DEFAULT_MAX_QUEUE_LENGTH)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + // h3 is queued up, so we should only have 3 nodes left + Assert.assertEquals(3, nodeIds.size()); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h1:1", nodeIds.get(1).toString()); + Assert.assertEquals("h4:4", nodeIds.get(2).toString()); + + // Now update h2 to Decommissioning state + selector.updateNode(createRMNode("h2", 2, -1, + 5, NodeState.DECOMMISSIONING)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + // h2 is decommissioned, and h3 is full, so we should only have 2 nodes + Assert.assertEquals(2, nodeIds.size()); + Assert.assertEquals("h1:1", nodeIds.get(0).toString()); + Assert.assertEquals("h4:4", nodeIds.get(1).toString()); + + // Now update h2 back to Running state + selector.updateNode(createRMNode( + "h2", 2, -1, 0, + Resources.multiply(defaultResourceRequested, 2), defaultCapacity)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + Assert.assertEquals(3, nodeIds.size()); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h1:1", nodeIds.get(1).toString()); + Assert.assertEquals("h4:4", nodeIds.get(2).toString()); + + // Now update h2 to have a zero queue capacity. + // Make sure that here it is still in the pool. + selector.updateNode(createRMNode( + "h2", 2, -1, 0, 0, + Resources.multiply(defaultResourceRequested, 2), + defaultCapacity)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + Assert.assertEquals(3, nodeIds.size()); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h1:1", nodeIds.get(1).toString()); + Assert.assertEquals("h4:4", nodeIds.get(2).toString()); + + // Now update h2 to have a positive queue length but a zero queue capacity. + // Make sure that here it is no longer in the pool. + // Need to first remove the node, because node capacity is not updated. + selector.removeNode(createRMNode( + "h2", 2, -1, 0, 0, + Resources.multiply(defaultResourceRequested, 2), + defaultCapacity)); + selector.updateNode(createRMNode( + "h2", 2, -1, 1, 0, + Resources.multiply(defaultResourceRequested, 2), + defaultCapacity)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + Assert.assertEquals(2, nodeIds.size()); + Assert.assertEquals("h1:1", nodeIds.get(0).toString()); + Assert.assertEquals("h4:4", nodeIds.get(1).toString()); + } + + /** + * Tests that when using QUEUE_LENGTH_THEN_RESOURCES decrements the amount + * of resources on the internal {@link ClusterNode} representation. + */ + @Test + public void testQueueLengthThenResourcesDecrementsAvailable() { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES); + RMNode node = createRMNode("h1", 1, -1, 0); + selector.addNode(null, node); + selector.updateNode(node); + selector.updateSortedNodes(); + + ClusterNode clusterNode = selector.getClusterNodes().get(node.getNodeID()); + Assert.assertEquals(Resources.none(), + clusterNode.getAllocatedResource()); + + // Has enough resources + RMNode selectedNode = selector.selectAnyNode( + Collections.emptySet(), defaultResourceRequested); + Assert.assertNotNull(selectedNode); + Assert.assertEquals(node.getNodeID(), selectedNode.getNodeID()); + + clusterNode = selector.getClusterNodes().get(node.getNodeID()); + Assert.assertEquals(defaultResourceRequested, + clusterNode.getAllocatedResource()); + + // Does not have enough resources, but can queue + selectedNode = selector.selectAnyNode( + Collections.emptySet(), defaultCapacity); + Assert.assertNotNull(selectedNode); + Assert.assertEquals(node.getNodeID(), selectedNode.getNodeID()); + + clusterNode = selector.getClusterNodes().get(node.getNodeID()); + Assert.assertEquals(1, clusterNode.getQueueLength()); + + // Does not have enough resources and cannot queue + selectedNode = selector.selectAnyNode( + Collections.emptySet(), + Resources.add(defaultResourceRequested, defaultCapacity)); + Assert.assertNull(selectedNode); + } + + @Test + public void testQueueLengthThenResourcesCapabilityChange() { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES); + + // Node sizes were selected such that we can determine the + // order of these nodes in the selectNodes call deterministically + // h1 -> h2 -> h3 -> h4 + selector.updateNode(createRMNode( + "h1", 1, -1, 0, + Resources.multiply(defaultResourceRequested, 1), defaultCapacity)); + selector.updateNode(createRMNode( + "h2", 2, -1, 0, + Resources.multiply(defaultResourceRequested, 2), defaultCapacity)); + selector.updateNode(createRMNode( + "h3", 3, -1, 0, + Resources.multiply(defaultResourceRequested, 3), defaultCapacity)); + selector.updateNode(createRMNode( + "h4", 4, -1, 0, + Resources.multiply(defaultResourceRequested, 4), defaultCapacity)); + selector.computeTask.run(); + List nodeIds = selector.selectNodes(); + Assert.assertEquals("h1:1", nodeIds.get(0).toString()); + Assert.assertEquals("h2:2", nodeIds.get(1).toString()); + Assert.assertEquals("h3:3", nodeIds.get(2).toString()); + Assert.assertEquals("h4:4", nodeIds.get(3).toString()); + + // Now update node1 to have only defaultResourceRequested available + // by changing its capability to 2x defaultResourceReqeusted + // node1 should now rank last + selector.updateNode(createRMNode( + "h1", 1, -1, 0, + Resources.multiply(defaultResourceRequested, 1), + Resources.multiply(defaultResourceRequested, 2))); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h3:3", nodeIds.get(1).toString()); + Assert.assertEquals("h4:4", nodeIds.get(2).toString()); + Assert.assertEquals("h1:1", nodeIds.get(3).toString()); + + // Now update node2 to have no resources available + // by changing its capability to 1x defaultResourceReqeusted + // node2 should now rank last + selector.updateNode(createRMNode( + "h2", 2, -1, 0, + Resources.multiply(defaultResourceRequested, 1), + Resources.multiply(defaultResourceRequested, 1))); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h4:4", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + Assert.assertEquals("h2:2", nodeIds.get(3).toString()); + } + @Test public void testContainerQueuingLimit() { NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( @@ -254,18 +513,22 @@ public void testSelectLocalNode() { // basic test for selecting node which has queue length less // than queue capacity. Set blacklist = new HashSet<>(); - RMNode node = selector.selectLocalNode("h1", blacklist); + RMNode node = selector.selectLocalNode( + "h1", blacklist, defaultResourceRequested); Assert.assertEquals("h1", node.getHostName()); // if node has been added to blacklist blacklist.add("h1"); - node = selector.selectLocalNode("h1", blacklist); + node = selector.selectLocalNode( + "h1", blacklist, defaultResourceRequested); Assert.assertNull(node); - node = selector.selectLocalNode("h2", blacklist); + node = selector.selectLocalNode( + "h2", blacklist, defaultResourceRequested); Assert.assertNull(node); - node = selector.selectLocalNode("h3", blacklist); + node = selector.selectLocalNode( + "h3", blacklist, defaultResourceRequested); Assert.assertEquals("h3", node.getHostName()); } @@ -293,19 +556,23 @@ public void testSelectRackLocalNode() { // basic test for selecting node which has queue length less // than queue capacity. Set blacklist = new HashSet<>(); - RMNode node = selector.selectRackLocalNode("rack1", blacklist); + RMNode node = selector.selectRackLocalNode( + "rack1", blacklist, defaultResourceRequested); Assert.assertEquals("h1", node.getHostName()); // if node has been added to blacklist blacklist.add("h1"); - node = selector.selectRackLocalNode("rack1", blacklist); + node = selector.selectRackLocalNode( + "rack1", blacklist, defaultResourceRequested); Assert.assertNull(node); - node = selector.selectRackLocalNode("rack2", blacklist); + node = selector.selectRackLocalNode( + "rack2", blacklist, defaultResourceRequested); Assert.assertEquals("h3", node.getHostName()); blacklist.add("h3"); - node = selector.selectRackLocalNode("rack2", blacklist); + node = selector.selectRackLocalNode( + "rack2", blacklist, defaultResourceRequested); Assert.assertNull(node); } @@ -337,20 +604,217 @@ public void testSelectAnyNode() { // basic test for selecting node which has queue length // less than queue capacity. Set blacklist = new HashSet<>(); - RMNode node = selector.selectAnyNode(blacklist); + RMNode node = selector.selectAnyNode(blacklist, defaultResourceRequested); Assert.assertTrue(node.getHostName().equals("h1") || node.getHostName().equals("h3")); // if node has been added to blacklist blacklist.add("h1"); - node = selector.selectAnyNode(blacklist); + node = selector.selectAnyNode(blacklist, defaultResourceRequested); Assert.assertEquals("h3", node.getHostName()); blacklist.add("h3"); - node = selector.selectAnyNode(blacklist); + node = selector.selectAnyNode(blacklist, defaultResourceRequested); Assert.assertNull(node); } + @Test + public void testQueueLengthThenResourcesComparator() { + NodeQueueLoadMonitor.LoadComparator comparator = + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES; + + NodeId n1 = new FakeNodeId("n1", 5000); + NodeId n2 = new FakeNodeId("n2", 5000); + + // Case 1: larger available cores should be ranked first + { + ClusterNode.Properties cn1Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(newResourceInstance(5, 5)) + .setCapability(newResourceInstance(10, 10, 1000)) + .setQueueLength(10); + ClusterNode cn1 = new ClusterNode(n1); + cn1.setProperties(cn1Props); + + ClusterNode.Properties cn2Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(newResourceInstance(6, 6)) + .setCapability(newResourceInstance(10, 10, 1000)) + .setQueueLength(10); + ClusterNode cn2 = new ClusterNode(n2); + cn2.setProperties(cn2Props); + + comparator.setClusterResource( + Resources.add(cn1.getCapability(), cn2.getCapability())); + Assert.assertTrue(comparator.compare(cn1, cn2) < 0); + } + + // Case 2: Shorter queue should be ranked first before comparing resources + { + ClusterNode.Properties cn1Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(newResourceInstance(5, 5)) + .setCapability(newResourceInstance(10, 10, 1000)) + .setQueueLength(5); + ClusterNode cn1 = new ClusterNode(n1); + cn1.setProperties(cn1Props); + + ClusterNode.Properties cn2Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(newResourceInstance(3, 3)) + .setCapability(newResourceInstance(10, 10, 1000)) + .setQueueLength(10); + ClusterNode cn2 = new ClusterNode(n2); + cn2.setProperties(cn2Props); + + comparator.setClusterResource( + Resources.add(cn1.getCapability(), cn2.getCapability())); + Assert.assertTrue(comparator.compare(cn1, cn2) < 0); + } + + // Case 3: No capability vs with capability, + // with capability should come first + { + ClusterNode.Properties cn1Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(Resources.none()) + .setCapability(newResourceInstance(1, 1, 1000)) + .setQueueLength(5); + ClusterNode cn1 = new ClusterNode(n1); + cn1.setProperties(cn1Props); + + ClusterNode.Properties cn2Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(Resources.none()) + .setCapability(Resources.none()) + .setQueueLength(5); + ClusterNode cn2 = new ClusterNode(n2); + cn2.setProperties(cn2Props); + + comparator.setClusterResource( + Resources.add(cn1.getCapability(), cn2.getCapability())); + Assert.assertTrue(comparator.compare(cn1, cn2) < 0); + } + + // Case 4: Compare same values + { + ClusterNode.Properties cn1Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(newResourceInstance(5, 5)) + .setCapability(newResourceInstance(10, 10, 1000)) + .setQueueLength(10); + ClusterNode cn1 = new ClusterNode(n1); + cn1.setProperties(cn1Props); + + ClusterNode.Properties cn2Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(newResourceInstance(5, 5)) + .setCapability(newResourceInstance(10, 10, 1000)) + .setQueueLength(10); + ClusterNode cn2 = new ClusterNode(n2); + cn2.setProperties(cn2Props); + + comparator.setClusterResource( + Resources.add(cn1.getCapability(), cn2.getCapability())); + Assert.assertEquals(0, comparator.compare(cn1, cn2)); + } + + // Case 5: If ratio is the same, compare raw values + // by VCores first, then memory + { + ClusterNode.Properties cn1Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(newResourceInstance(6, 5)) + .setCapability(newResourceInstance(10, 10, 1000)) + .setQueueLength(10); + ClusterNode cn1 = new ClusterNode(n1); + cn1.setProperties(cn1Props); + + ClusterNode.Properties cn2Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(newResourceInstance(5, 6)) + .setCapability(newResourceInstance(10, 10, 1000)) + .setQueueLength(10); + ClusterNode cn2 = new ClusterNode(n2); + cn2.setProperties(cn2Props); + + comparator.setClusterResource( + Resources.add(cn1.getCapability(), cn2.getCapability())); + // Both are 60% allocated, but CN1 has 5 avail VCores, CN2 only has 4 + Assert.assertTrue(comparator.compare(cn1, cn2) < 0); + } + + // Case 6: by VCores absolute value + { + ClusterNode.Properties cn1Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(newResourceInstance(5, 5)) + .setCapability(newResourceInstance(10, 10, 1000)) + .setQueueLength(10); + ClusterNode cn1 = new ClusterNode(n1); + cn1.setProperties(cn1Props); + + ClusterNode.Properties cn2Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(newResourceInstance(5, 6)) + .setCapability(newResourceInstance(10, 12, 1000)) + .setQueueLength(10); + ClusterNode cn2 = new ClusterNode(n2); + cn2.setProperties(cn2Props); + + comparator.setClusterResource( + Resources.add(cn1.getCapability(), cn2.getCapability())); + Assert.assertTrue(comparator.compare(cn2, cn1) < 0); + } + + // Case 7: by memory absolute value + { + ClusterNode.Properties cn1Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(newResourceInstance(5, 5)) + .setCapability(newResourceInstance(10, 10, 1000)) + .setQueueLength(10); + ClusterNode cn1 = new ClusterNode(n1); + cn1.setProperties(cn1Props); + + ClusterNode.Properties cn2Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(newResourceInstance(6, 5)) + .setCapability(newResourceInstance(12, 10, 1000)) + .setQueueLength(10); + ClusterNode cn2 = new ClusterNode(n2); + cn2.setProperties(cn2Props); + + comparator.setClusterResource( + Resources.add(cn1.getCapability(), cn2.getCapability())); + Assert.assertTrue(comparator.compare(cn2, cn1) < 0); + } + + // Case 8: Memory should be more constraining in the overall cluster, + // so rank the node with less allocated memory first + { + ClusterNode.Properties cn1Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(newResourceInstance(5, 11)) + .setCapability(newResourceInstance(10, 100, 1000)) + .setQueueLength(10); + ClusterNode cn1 = new ClusterNode(n1); + cn1.setProperties(cn1Props); + + ClusterNode.Properties cn2Props = + ClusterNode.Properties.newInstance() + .setAllocatedResource(newResourceInstance(6, 10)) + .setCapability(newResourceInstance(10, 100, 1000)) + .setQueueLength(10); + ClusterNode cn2 = new ClusterNode(n2); + cn2.setProperties(cn2Props); + + comparator.setClusterResource( + Resources.add(cn1.getCapability(), cn2.getCapability())); + Assert.assertTrue(comparator.compare(cn1, cn2) < 0); + } + } + private RMNode createRMNode(String host, int port, int waitTime, int queueLength) { return createRMNode(host, port, waitTime, queueLength, @@ -377,12 +841,40 @@ private RMNode createRMNode(String host, int port, String rack, private RMNode createRMNode(String host, int port, String rack, int waitTime, int queueLength, int queueCapacity, NodeState state) { + return createRMNode(host, port, rack, waitTime, queueLength, queueCapacity, + state, Resources.none(), defaultCapacity); + } + + private RMNode createRMNode( + String host, int port, int waitTime, int queueLength, + Resource allocatedResource, Resource nodeResource) { + return createRMNode(host, port, waitTime, queueLength, + DEFAULT_MAX_QUEUE_LENGTH, allocatedResource, nodeResource); + } + + private RMNode createRMNode( + String host, int port, int waitTime, int queueLength, int queueCapacity, + Resource allocatedResource, Resource nodeResource) { + return createRMNode(host, port, "default", waitTime, queueLength, + queueCapacity, NodeState.RUNNING, allocatedResource, nodeResource); + } + + @SuppressWarnings("parameternumber") + private RMNode createRMNode(String host, int port, String rack, + int waitTime, int queueLength, int queueCapacity, NodeState state, + Resource allocatedResource, Resource nodeResource) { RMNode node1 = Mockito.mock(RMNode.class); NodeId nID1 = new FakeNodeId(host, port); Mockito.when(node1.getHostName()).thenReturn(host); Mockito.when(node1.getRackName()).thenReturn(rack); + Mockito.when(node1.getNode()).thenReturn(new NodeBase("/" + host)); Mockito.when(node1.getNodeID()).thenReturn(nID1); Mockito.when(node1.getState()).thenReturn(state); + Mockito.when(node1.getTotalCapability()).thenReturn(nodeResource); + Mockito.when(node1.getNodeUtilization()).thenReturn( + ResourceUtilization.newInstance(0, 0, 0)); + Mockito.when(node1.getAllocatedContainerResource()).thenReturn( + allocatedResource); OpportunisticContainersStatus status1 = Mockito.mock(OpportunisticContainersStatus.class); Mockito.when(status1.getEstimatedQueueWaitTime())