From fb512f50877438acb01fe6b3ec96c12b4db61694 Mon Sep 17 00:00:00 2001 From: Abhishek Modi Date: Tue, 12 Nov 2019 16:34:04 +0530 Subject: [PATCH] YARN-9697. Efficient allocation of Opportunistic containers. Contributed by Abhishek Modi. --- ...ibutedOpportunisticContainerAllocator.java | 13 +- .../OpportunisticContainerAllocator.java | 38 +- .../OpportunisticContainerContext.java | 2 +- ...butedOpportunisticContainerAllocator.java} | 10 +- ...ortunisticContainerAllocatorAMService.java | 20 +- ...alizedOpportunisticContainerAllocator.java | 340 +++++++++ .../distributed/NodeQueueLoadMonitor.java | 126 +++- ...alizedOpportunisticContainerAllocator.java | 669 ++++++++++++++++++ .../distributed/TestNodeQueueLoadMonitor.java | 137 +++- 9 files changed, 1312 insertions(+), 43 deletions(-) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/{TestOpportunisticContainerAllocator.java => TestDistributedOpportunisticContainerAllocator.java} (99%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestCentralizedOpportunisticContainerAllocator.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java index da90167fa4..be4417cffb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java @@ -201,7 +201,7 @@ private void allocateContainersInternal(long rmIdentifier, // * Rack local candidates selected in loop == 1 // * From loop == 2 onwards, we revert to off switch allocations. int loopIndex = OFF_SWITCH_LOOP; - if (enrichedAsk.getNodeLocations().size() > 0) { + if (enrichedAsk.getNodeMap().size() > 0) { loopIndex = NODE_LOCAL_LOOP; } while (numAllocated < toAllocate) { @@ -218,7 +218,7 @@ private void allocateContainersInternal(long rmIdentifier, } String location = ResourceRequest.ANY; if (loopIndex == NODE_LOCAL_LOOP) { - if (enrichedAsk.getNodeLocations().contains(rNodeHost)) { + if (enrichedAsk.getNodeMap().containsKey(rNodeHost)) { location = rNodeHost; } else { continue; @@ -229,7 +229,8 @@ private void allocateContainersInternal(long rmIdentifier, continue; } if (loopIndex == RACK_LOCAL_LOOP) { - if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) { + if (enrichedAsk.getRackMap().containsKey( + rNode.getRackName())) { location = rNode.getRackName(); } else { continue; @@ -248,7 +249,7 @@ private void allocateContainersInternal(long rmIdentifier, } } if (loopIndex == NODE_LOCAL_LOOP && - enrichedAsk.getRackLocations().size() > 0) { + enrichedAsk.getRackMap().size() > 0) { loopIndex = RACK_LOCAL_LOOP; } else { loopIndex++; @@ -318,7 +319,7 @@ private int collectRackLocalCandidates(Map allNodes, String partition = getRequestPartition(enrichedRR); for (RemoteNode rNode : allNodes.values()) { if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) && - enrichedRR.getRackLocations().contains(rNode.getRackName())) { + enrichedRR.getRackMap().containsKey(rNode.getRackName())) { String rHost = rNode.getNodeId().getHost(); if (blackList.contains(rHost)) { continue; @@ -341,7 +342,7 @@ private int collectNodeLocalCandidates(Map allNodes, EnrichedResourceRequest enrichedRR, List retList, int numContainers) { String partition = getRequestPartition(enrichedRR); - for (String nodeName : enrichedRR.getNodeLocations()) { + for (String nodeName : enrichedRR.getNodeMap().keySet()) { RemoteNode remoteNode = allNodes.get(nodeName); if (remoteNode != null && StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 4a17a659da..853a37c341 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.scheduler; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.util.Time; @@ -48,7 +47,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -205,20 +203,36 @@ public List getOpportunistic() { private final BaseContainerTokenSecretManager tokenSecretManager; - static class Allocation { + /** + * This class encapsulates container and resourceName for an allocation. + */ + public static class Allocation { private final Container container; private final String resourceName; - Allocation(Container container, String resourceName) { + /** + * Creates an instance of Allocation. + * @param container allocated container. + * @param resourceName location where it got allocated. + */ + public Allocation(Container container, String resourceName) { this.container = container; this.resourceName = resourceName; } - Container getContainer() { + /** + * Get container of the allocation. + * @return container of the allocation. + */ + public Container getContainer() { return container; } - String getResourceName() { + /** + * Get resource name of the allocation. + * @return resource name of the allocation. + */ + public String getResourceName() { return resourceName; } } @@ -273,12 +287,12 @@ public void removeLocation(String location) { } } - public Set getNodeLocations() { - return nodeLocations.keySet(); + public Map getNodeMap() { + return nodeLocations; } - public Set getRackLocations() { - return rackLocations.keySet(); + public Map getRackMap() { + return rackLocations; } } @@ -304,8 +318,8 @@ public OpportunisticContainerAllocator( this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat; } - @VisibleForTesting - void setMaxAllocationsPerAMHeartbeat(int maxAllocationsPerAMHeartbeat) { + public void setMaxAllocationsPerAMHeartbeat( + int maxAllocationsPerAMHeartbeat) { this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java index 84e7bb9dce..1f82b9d9d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -201,7 +201,7 @@ public void matchAllocationToOutstandingRequest(Resource capability, } @VisibleForTesting - OpportunisticSchedulerMetrics getOppSchedulerMetrics() { + public OpportunisticSchedulerMetrics getOppSchedulerMetrics() { return OpportunisticSchedulerMetrics.getMetrics(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestDistributedOpportunisticContainerAllocator.java similarity index 99% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestDistributedOpportunisticContainerAllocator.java index 6a91f41618..7fedfb0ea5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestDistributedOpportunisticContainerAllocator.java @@ -56,12 +56,16 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class TestOpportunisticContainerAllocator { +/** + * Test cases for DistributedOpportunisticContainerAllocator. + */ +public class TestDistributedOpportunisticContainerAllocator { private static final Logger LOG = - LoggerFactory.getLogger(TestOpportunisticContainerAllocator.class); + LoggerFactory.getLogger( + TestDistributedOpportunisticContainerAllocator.class); private static final int GB = 1024; - private OpportunisticContainerAllocator allocator = null; + private DistributedOpportunisticContainerAllocator allocator = null; private OpportunisticContainerContext oppCntxt = null; private static final Priority PRIORITY_NORMAL = Priority.newInstance(1); private static final Resource CAPABILITY_1GB = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index d51afa2698..f29d038e86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -20,7 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; -import org.apache.hadoop.yarn.server.scheduler.DistributedOpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.CentralizedOpportunisticContainerAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -232,10 +232,6 @@ public OpportunisticContainerAllocatorAMService(RMContext rmContext, YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT, YarnConfiguration. DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT); - this.oppContainerAllocator = - new DistributedOpportunisticContainerAllocator( - rmContext.getContainerTokenSecretManager(), - maxAllocationsPerAMHeartbeat); this.numNodes = rmContext.getYarnConfiguration().getInt( YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED, YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED); @@ -253,7 +249,7 @@ public OpportunisticContainerAllocatorAMService(RMContext rmContext, DEFAULT_NM_CONTAINER_QUEUING_LOAD_COMPARATOR)); NodeQueueLoadMonitor topKSelector = - new NodeQueueLoadMonitor(nodeSortInterval, comparator); + new NodeQueueLoadMonitor(nodeSortInterval, comparator, numNodes); float sigma = rmContext.getYarnConfiguration() .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV, @@ -285,6 +281,10 @@ public OpportunisticContainerAllocatorAMService(RMContext rmContext, topKSelector.initThresholdCalculator(sigma, limitMin, limitMax); this.nodeMonitor = topKSelector; + this.oppContainerAllocator = + new CentralizedOpportunisticContainerAllocator( + rmContext.getContainerTokenSecretManager(), + maxAllocationsPerAMHeartbeat, nodeMonitor); } @Override @@ -371,6 +371,14 @@ private void handleNewContainers(List allocContainers, } } + @Override + protected void serviceStop() throws Exception { + if (nodeMonitor != null) { + nodeMonitor.stop(); + } + super.serviceStop(); + } + @Override public void handle(SchedulerEvent event) { switch (event.getType()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java new file mode 100644 index 0000000000..85083bc3c2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +/** + *

+ * The CentralizedOpportunisticContainerAllocator allocates opportunistic + * containers by considering all the nodes present in the cluster, after + * modifying the container sizes to respect the limits set by the + * ResourceManager. It tries to distribute the containers as evenly as + * possible. + *

+ */ +public class CentralizedOpportunisticContainerAllocator extends + OpportunisticContainerAllocator { + + private static final Logger LOG = + LoggerFactory.getLogger(CentralizedOpportunisticContainerAllocator.class); + + private NodeQueueLoadMonitor nodeQueueLoadMonitor; + private OpportunisticSchedulerMetrics metrics = + OpportunisticSchedulerMetrics.getMetrics(); + + /** + * Create a new Centralized Opportunistic Container Allocator. + * @param tokenSecretManager TokenSecretManager + */ + public CentralizedOpportunisticContainerAllocator( + BaseContainerTokenSecretManager tokenSecretManager) { + super(tokenSecretManager); + } + + /** + * Create a new Centralized Opportunistic Container Allocator. + * @param tokenSecretManager TokenSecretManager + * @param maxAllocationsPerAMHeartbeat max number of containers to be + * allocated in one AM heartbeat + */ + public CentralizedOpportunisticContainerAllocator( + BaseContainerTokenSecretManager tokenSecretManager, + int maxAllocationsPerAMHeartbeat, + NodeQueueLoadMonitor nodeQueueLoadMonitor) { + super(tokenSecretManager, maxAllocationsPerAMHeartbeat); + this.nodeQueueLoadMonitor = nodeQueueLoadMonitor; + } + + @VisibleForTesting + void setNodeQueueLoadMonitor(NodeQueueLoadMonitor nodeQueueLoadMonitor) { + this.nodeQueueLoadMonitor = nodeQueueLoadMonitor; + } + + @Override + public List allocateContainers( + ResourceBlacklistRequest blackList, List oppResourceReqs, + ApplicationAttemptId applicationAttemptId, + OpportunisticContainerContext opportContext, long rmIdentifier, + String appSubmitter) throws YarnException { + + updateBlacklist(blackList, opportContext); + + // Add OPPORTUNISTIC requests to the outstanding ones. + opportContext.addToOutstandingReqs(oppResourceReqs); + + Set nodeBlackList = new HashSet<>(opportContext.getBlacklist()); + List allocatedContainers = new ArrayList<>(); + int maxAllocationsPerAMHeartbeat = getMaxAllocationsPerAMHeartbeat(); + List>> allocations = new ArrayList<>(); + + for (SchedulerRequestKey schedulerKey : + opportContext.getOutstandingOpReqs().descendingKeySet()) { + // Allocated containers : + // Key = Requested Capability, + // Value = List of Containers of given cap (the actual container size + // might be different than what is requested, which is why + // we need the requested capability (key) to match against + // the outstanding reqs) + int remAllocs = -1; + if (maxAllocationsPerAMHeartbeat > 0) { + remAllocs = + maxAllocationsPerAMHeartbeat - getTotalAllocations(allocations); + if (remAllocs <= 0) { + LOG.info("Not allocating more containers as we have reached max " + + "allocations per AM heartbeat {}", + maxAllocationsPerAMHeartbeat); + break; + } + } + Map> allocation = allocatePerSchedulerKey( + rmIdentifier, opportContext, schedulerKey, applicationAttemptId, + appSubmitter, nodeBlackList, remAllocs); + if (allocation.size() > 0) { + allocations.add(allocation); + } + } + matchAllocation(allocations, allocatedContainers, opportContext); + return allocatedContainers; + } + + private Map> allocatePerSchedulerKey( + long rmIdentifier, OpportunisticContainerContext appContext, + SchedulerRequestKey schedKey, ApplicationAttemptId appAttId, + String userName, Set blackList, int maxAllocations) + throws YarnException { + Map> allocations = new HashMap<>(); + int totalAllocated = 0; + for (EnrichedResourceRequest enrichedAsk : + appContext.getOutstandingOpReqs().get(schedKey).values()) { + int remainingAllocs = -1; + if (maxAllocations > 0) { + remainingAllocs = maxAllocations - totalAllocated; + if (remainingAllocs <= 0) { + LOG.info("Not allocating more containers as max allocations per AM " + + "heartbeat {} has reached", getMaxAllocationsPerAMHeartbeat()); + break; + } + } + + totalAllocated += allocateContainersPerRequest(rmIdentifier, + appContext.getAppParams(), + appContext.getContainerIdGenerator(), blackList, + appAttId, userName, allocations, enrichedAsk, + remainingAllocs); + ResourceRequest anyAsk = enrichedAsk.getRequest(); + if (!allocations.isEmpty()) { + LOG.info("Opportunistic allocation requested for [priority={}, " + + "allocationRequestId={}, num_containers={}, capability={}] " + + "allocated = {}", anyAsk.getPriority(), + anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(), + anyAsk.getCapability(), allocations.keySet()); + } + } + return allocations; + } + + @SuppressWarnings("checkstyle:parameternumber") + private int allocateContainersPerRequest(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, + ApplicationAttemptId id, + String userName, Map> allocations, + EnrichedResourceRequest enrichedAsk, int maxAllocations) + throws YarnException { + ResourceRequest anyAsk = enrichedAsk.getRequest(); + int totalAllocated = 0; + int maxToAllocate = anyAsk.getNumContainers() + - (allocations.isEmpty() ? 0 : + allocations.get(anyAsk.getCapability()).size()); + if (maxAllocations >= 0) { + maxToAllocate = Math.min(maxAllocations, maxToAllocate); + } + + // allocate node local + if (maxToAllocate > 0) { + Map nodeLocations = enrichedAsk.getNodeMap(); + for (Map.Entry nodeLocation : + nodeLocations.entrySet()) { + int numContainers = nodeLocation.getValue().get(); + numContainers = Math.min(numContainers, maxToAllocate); + List allocatedContainers = + allocateNodeLocal(enrichedAsk, nodeLocation.getKey(), + numContainers, rmIdentifier, appParams, idCounter, blacklist, + id, userName, allocations); + totalAllocated += allocatedContainers.size(); + maxToAllocate -= allocatedContainers.size(); + // no more containers to allocate + if (maxToAllocate <= 0) { + break; + } + } + } + + // if still left, allocate rack local + if (maxToAllocate > 0) { + Map rackLocations = enrichedAsk.getRackMap(); + for (Map.Entry rack : rackLocations.entrySet()) { + int numContainers = rack.getValue().get(); + numContainers = Math.min(numContainers, maxToAllocate); + List allocatedContainers = + allocateRackLocal(enrichedAsk, rack.getKey(), numContainers, + rmIdentifier, appParams, idCounter, blacklist, id, + userName, allocations); + totalAllocated += allocatedContainers.size(); + maxToAllocate -= allocatedContainers.size(); + // no more containers to allocate + if (maxToAllocate <= 0) { + break; + } + } + } + + // if still left, try on ANY + if (maxToAllocate > 0) { + List allocatedContainers = allocateAny(enrichedAsk, + maxToAllocate, rmIdentifier, appParams, idCounter, blacklist, + id, userName, allocations); + totalAllocated += allocatedContainers.size(); + } + return totalAllocated; + } + + @SuppressWarnings("checkstyle:parameternumber") + private List allocateNodeLocal( + EnrichedResourceRequest enrichedAsk, + String nodeLocation, + int toAllocate, long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, + ApplicationAttemptId id, + String userName, Map> allocations) + throws YarnException { + List allocatedContainers = new ArrayList<>(); + while (toAllocate > 0) { + RMNode node = nodeQueueLoadMonitor.selectLocalNode(nodeLocation, + blacklist); + if (node != null) { + toAllocate--; + Container container = createContainer(rmIdentifier, appParams, + idCounter, id, userName, allocations, nodeLocation, + enrichedAsk.getRequest(), convertToRemoteNode(node)); + allocatedContainers.add(container); + LOG.info("Allocated [{}] as opportunistic at location [{}]", + container.getId(), nodeLocation); + metrics.incrNodeLocalOppContainers(); + } else { + // we couldn't allocate any - break the loop. + break; + } + } + return allocatedContainers; + } + + @SuppressWarnings("checkstyle:parameternumber") + private List allocateRackLocal(EnrichedResourceRequest enrichedAsk, + String rackLocation, int toAllocate, long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, + ApplicationAttemptId id, + String userName, Map> allocations) + throws YarnException { + List allocatedContainers = new ArrayList<>(); + while (toAllocate > 0) { + RMNode node = nodeQueueLoadMonitor.selectRackLocalNode(rackLocation, + blacklist); + if (node != null) { + toAllocate--; + Container container = createContainer(rmIdentifier, appParams, + idCounter, id, userName, allocations, rackLocation, + enrichedAsk.getRequest(), convertToRemoteNode(node)); + allocatedContainers.add(container); + metrics.incrRackLocalOppContainers(); + LOG.info("Allocated [{}] as opportunistic at location [{}]", + container.getId(), rackLocation); + } else { + // we couldn't allocate any - break the loop. + break; + } + } + return allocatedContainers; + } + + @SuppressWarnings("checkstyle:parameternumber") + private List allocateAny(EnrichedResourceRequest enrichedAsk, + int toAllocate, long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, + ApplicationAttemptId id, + String userName, Map> allocations) + throws YarnException { + List allocatedContainers = new ArrayList<>(); + while (toAllocate > 0) { + RMNode node = nodeQueueLoadMonitor.selectAnyNode(blacklist); + if (node != null) { + toAllocate--; + Container container = createContainer(rmIdentifier, appParams, + idCounter, id, userName, allocations, ResourceRequest.ANY, + enrichedAsk.getRequest(), convertToRemoteNode(node)); + allocatedContainers.add(container); + metrics.incrOffSwitchOppContainers(); + LOG.info("Allocated [{}] as opportunistic at location [{}]", + container.getId(), ResourceRequest.ANY); + } else { + // we couldn't allocate any - break the loop. + break; + } + } + return allocatedContainers; + } + + private RemoteNode convertToRemoteNode(RMNode rmNode) { + if (rmNode != null) { + RemoteNode rNode = RemoteNode.newInstance(rmNode.getNodeID(), + rmNode.getHttpAddress()); + rNode.setRackName(rmNode.getRackName()); + return rNode; + } + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index e093b2d997..fa2ba3060e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -34,12 +34,17 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED; + /** * The NodeQueueLoadMonitor keeps track of load metrics (such as queue length * and total wait time) associated with Container Queues on the Node Manager. @@ -48,9 +53,12 @@ */ public class NodeQueueLoadMonitor implements ClusterMonitor { - final static Logger LOG = LoggerFactory. + private final static Logger LOG = LoggerFactory. getLogger(NodeQueueLoadMonitor.class); + private int numNodesForAnyAllocation = + DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED; + /** * The comparator used to specify the metric against which the load * of two Nodes are compared. @@ -68,14 +76,34 @@ public int compare(ClusterNode o1, ClusterNode o2) { } public int getMetric(ClusterNode c) { - return (this == QUEUE_LENGTH) ? c.queueLength : c.queueWaitTime; + return (this == QUEUE_LENGTH) ? + c.queueLength.get() : c.queueWaitTime.get(); + } + + /** + * Increment the metric by a delta if it is below the threshold. + * @param c ClusterNode + * @param incrementSize increment size + * @return true if the metric was below threshold and was incremented. + */ + public boolean compareAndIncrement(ClusterNode c, int incrementSize) { + if(this == QUEUE_LENGTH) { + int ret = c.queueLength.addAndGet(incrementSize); + if (ret <= c.queueCapacity) { + return true; + } + c.queueLength.addAndGet(-incrementSize); + return false; + } + // for queue wait time, we don't have any threshold. + return true; } } static class ClusterNode { - int queueLength = 0; - int queueWaitTime = -1; - double timestamp; + private AtomicInteger queueLength = new AtomicInteger(0); + private AtomicInteger queueWaitTime = new AtomicInteger(-1); + private long timestamp; final NodeId nodeId; private int queueCapacity = 0; @@ -85,12 +113,12 @@ public ClusterNode(NodeId nodeId) { } public ClusterNode setQueueLength(int qLength) { - this.queueLength = qLength; + this.queueLength.set(qLength); return this; } public ClusterNode setQueueWaitTime(int wTime) { - this.queueWaitTime = wTime; + this.queueWaitTime.set(wTime); return this; } @@ -106,7 +134,7 @@ public ClusterNode setQueueCapacity(int capacity) { public boolean isQueueFull() { return this.queueCapacity > 0 && - this.queueLength >= this.queueCapacity; + this.queueLength.get() >= this.queueCapacity; } } @@ -115,6 +143,10 @@ public boolean isQueueFull() { private final List sortedNodes; private final Map clusterNodes = new ConcurrentHashMap<>(); + private final Map nodeByHostName = + new ConcurrentHashMap<>(); + private final Map> nodeIdsByRack = + new ConcurrentHashMap<>(); private final LoadComparator comparator; private QueueLimitCalculator thresholdCalculator; private ReentrantReadWriteLock sortedNodesLock = new ReentrantReadWriteLock(); @@ -151,13 +183,14 @@ public void run() { } public NodeQueueLoadMonitor(long nodeComputationInterval, - LoadComparator comparator) { + LoadComparator comparator, int numNodes) { this.sortedNodes = new ArrayList<>(); this.scheduledExecutor = Executors.newScheduledThreadPool(1); this.comparator = comparator; this.scheduledExecutor.scheduleAtFixedRate(computeTask, nodeComputationInterval, nodeComputationInterval, TimeUnit.MILLISECONDS); + numNodesForAnyAllocation = numNodes; } List getSortedNodes() { @@ -168,6 +201,12 @@ public QueueLimitCalculator getThresholdCalculator() { return thresholdCalculator; } + public void stop() { + if (scheduledExecutor != null) { + scheduledExecutor.shutdown(); + } + } + Map getClusterNodes() { return clusterNodes; } @@ -184,15 +223,17 @@ public void initThresholdCalculator(float sigma, int limitMin, int limitMax) { @Override public void addNode(List containerStatuses, RMNode rmNode) { - LOG.debug("Node added event from: {}", rmNode.getNode().getName()); - + this.nodeByHostName.put(rmNode.getHostName(), rmNode); + addIntoNodeIdsByRack(rmNode); // Ignoring this currently : at least one NODE_UPDATE heartbeat is // required to ensure node eligibility. } @Override public void removeNode(RMNode removedRMNode) { - LOG.debug("Node delete event for: {}", removedRMNode.getNode().getName()); + LOG.info("Node delete event for: {}", removedRMNode.getNode().getName()); + this.nodeByHostName.remove(removedRMNode.getHostName()); + removeFromNodeIdsByRack(removedRMNode); ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock(); writeLock.lock(); ClusterNode node; @@ -303,6 +344,67 @@ public List selectLeastLoadedNodes(int k) { } } + public RMNode selectLocalNode(String hostName, Set blacklist) { + if (blacklist.contains(hostName)) { + return null; + } + RMNode node = nodeByHostName.get(hostName); + if (node != null) { + ClusterNode clusterNode = clusterNodes.get(node.getNodeID()); + if (comparator.compareAndIncrement(clusterNode, 1)) { + return node; + } + } + return null; + } + + public RMNode selectRackLocalNode(String rackName, Set blacklist) { + Set nodesOnRack = nodeIdsByRack.get(rackName); + if (nodesOnRack != null) { + for (NodeId nodeId : nodesOnRack) { + if (!blacklist.contains(nodeId.getHost())) { + ClusterNode node = clusterNodes.get(nodeId); + if (node != null && comparator.compareAndIncrement(node, 1)) { + return nodeByHostName.get(nodeId.getHost()); + } + } + } + } + return null; + } + + public RMNode selectAnyNode(Set blacklist) { + List nodeIds = selectLeastLoadedNodes(numNodesForAnyAllocation); + int size = nodeIds.size(); + if (size <= 0) { + return null; + } + Random rand = new Random(); + int startIndex = rand.nextInt(size); + for (int i = 0; i < size; ++i) { + int index = i + startIndex; + index %= size; + NodeId nodeId = nodeIds.get(index); + if (nodeId != null && !blacklist.contains(nodeId.getHost())) { + ClusterNode node = clusterNodes.get(nodeId); + if (node != null && comparator.compareAndIncrement(node, 1)) { + return nodeByHostName.get(nodeId.getHost()); + } + } + } + return null; + } + + private void removeFromNodeIdsByRack(RMNode removedNode) { + nodeIdsByRack.computeIfPresent(removedNode.getRackName(), + (k, v) -> v).remove(removedNode.getNodeID()); + } + + private void addIntoNodeIdsByRack(RMNode addedNode) { + nodeIdsByRack.compute(addedNode.getRackName(), (k, v) -> v == null ? + ConcurrentHashMap.newKeySet() : v).add(addedNode.getNodeID()); + } + private List sortNodes() { ReentrantReadWriteLock.ReadLock readLock = clusterNodesLock.readLock(); readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestCentralizedOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestCentralizedOpportunisticContainerAllocator.java new file mode 100644 index 0000000000..8301c13dfd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestCentralizedOpportunisticContainerAllocator.java @@ -0,0 +1,669 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; +import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test cases for Centralized Opportunistic Container Allocator. + */ +public class TestCentralizedOpportunisticContainerAllocator { + + private static final Logger LOG = LoggerFactory.getLogger( + TestCentralizedOpportunisticContainerAllocator.class); + private static final int GB = 1024; + private CentralizedOpportunisticContainerAllocator allocator = null; + private OpportunisticContainerContext oppCntxt = null; + private static final Priority PRIORITY_NORMAL = Priority.newInstance(1); + private static final Resource CAPABILITY_1GB = + Resources.createResource(GB); + private static final ResourceBlacklistRequest EMPTY_BLACKLIST_REQUEST = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + + @Before + public void setup() { + // creating a dummy master key to be used for creation of container. + final MasterKey mKey = new MasterKey() { + @Override + public int getKeyId() { + return 1; + } + @Override + public void setKeyId(int keyId) {} + @Override + public ByteBuffer getBytes() { + return ByteBuffer.allocate(8); + } + @Override + public void setBytes(ByteBuffer bytes) {} + }; + + // creating a dummy tokenSecretManager to be used for creation of + // container. + BaseContainerTokenSecretManager secMan = + new BaseContainerTokenSecretManager(new Configuration()) { + @Override + public MasterKey getCurrentKey() { + return mKey; + } + + @Override + public byte[] createPassword(ContainerTokenIdentifier identifier) { + return new byte[]{1, 2}; + } + }; + + allocator = new CentralizedOpportunisticContainerAllocator(secMan); + oppCntxt = new OpportunisticContainerContext(); + oppCntxt.getAppParams().setMinResource(Resource.newInstance(1024, 1)); + oppCntxt.getAppParams().setIncrementResource(Resource.newInstance(512, 1)); + oppCntxt.getAppParams().setMaxResource(Resource.newInstance(1024, 10)); + } + + /** + * Tests allocation of an Opportunistic container from single application. + * @throws Exception + */ + @Test + public void testSimpleAllocation() throws Exception { + List reqs = + Collections.singletonList(createResourceRequest(1, "*", 1)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(1, 2, 100)); + + List containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + assertEquals(1, containers.size()); + assertEquals(0, oppCntxt.getOutstandingOpReqs().size()); + } + + /** + * Tests Opportunistic container should not be allocated on blacklisted + * nodes. + * @throws Exception + */ + @Test + public void testBlacklistRejection() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + Arrays.asList("h1", "h2"), new ArrayList<>()); + List reqs = + Collections.singletonList(createResourceRequest(1, "*", 1)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(2, 2, 100)); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user"); + assertEquals(0, containers.size()); + assertEquals(1, oppCntxt.getOutstandingOpReqs().size()); + } + + /** + * Tests that allocation of Opportunistic containers should be spread out. + * @throws Exception + */ + @Test + public void testRoundRobinSimpleAllocation() throws Exception { + List reqs = + Arrays.asList( + createResourceRequest(1, ResourceRequest.ANY, 1), + createResourceRequest(2, ResourceRequest.ANY, 1), + createResourceRequest(3, ResourceRequest.ANY, 1)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 3)); + + List containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + LOG.info("Containers: {}", containers); + Set allocatedNodes = new HashSet<>(); + for (Container c : containers) { + allocatedNodes.add(c.getNodeId().toString()); + } + assertTrue(allocatedNodes.contains("h1:1234")); + assertTrue(allocatedNodes.contains("h2:1234")); + assertTrue(allocatedNodes.contains("h3:1234")); + assertEquals(3, containers.size()); + } + + /** + * Tests allocation of node local Opportunistic container requests. + * @throws Exception + */ + @Test + public void testNodeLocalAllocation() throws Exception { + List reqs = + Arrays.asList( + createResourceRequest(1, ResourceRequest.ANY, 1), + createResourceRequest(2, "/r1", 1), + createResourceRequest(2, "h1", 1), + createResourceRequest(2, ResourceRequest.ANY, 1), + createResourceRequest(3, "/r1", 1), + createResourceRequest(3, "h1", 1), + createResourceRequest(3, ResourceRequest.ANY, 1)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 5)); + + List containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + LOG.info("Containers: {}", containers); + // all 3 containers should be allocated. + assertEquals(3, containers.size()); + // container with allocation id 2 and 3 should be allocated on node h1 + for (Container c : containers) { + if (c.getAllocationRequestId() == 2 || c.getAllocationRequestId() == 3) { + assertEquals("h1:1234", c.getNodeId().toString()); + } + } + } + + /** + * Tests node local allocation of Opportunistic container requests with + * same allocation request id. + * @throws Exception + */ + @Test + public void testNodeLocalAllocationSameSchedulerKey() throws Exception { + List reqs = + Arrays.asList( + createResourceRequest(2, "/r1", 2), + createResourceRequest(2, "h1", 2), + createResourceRequest(2, ResourceRequest.ANY, 2)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 5)); + + List containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + LOG.info("Containers: {}", containers); + Set allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeId().toString()); + } + assertEquals(2, containers.size()); + assertTrue(allocatedHosts.contains("h1:1234")); + assertFalse(allocatedHosts.contains("h2:1234")); + assertFalse(allocatedHosts.contains("h3:1234")); + } + + /** + * Tests rack local allocation of Opportunistic container requests. + * @throws Exception + */ + @Test + public void testSimpleRackLocalAllocation() throws Exception { + List reqs = + Arrays.asList( + createResourceRequest(2, "/r1", 1), + createResourceRequest(2, "h4", 1), + createResourceRequest(2, ResourceRequest.ANY, 1)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + NodeQueueLoadMonitor selector = createNodeQueueLoadMonitor( + Arrays.asList("h1", "h2", "h3"), Arrays.asList("/r2", "/r1", "/r3"), + Arrays.asList(2, 2, 2), Arrays.asList(5, 5, 5)); + allocator.setNodeQueueLoadMonitor(selector); + + List containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + Set allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeId().toString()); + } + assertTrue(allocatedHosts.contains("h2:1234")); + assertFalse(allocatedHosts.contains("h3:1234")); + assertFalse(allocatedHosts.contains("h4:1234")); + assertEquals(1, containers.size()); + } + + /** + * Tests that allocation of rack local Opportunistic container requests + * should be spread out. + * @throws Exception + */ + @Test + public void testRoundRobinRackLocalAllocation() throws Exception { + List reqs = + Arrays.asList( + createResourceRequest(1, "/r1", 1), + createResourceRequest(1, "h5", 1), + createResourceRequest(1, ResourceRequest.ANY, 1), + createResourceRequest(2, "/r1", 1), + createResourceRequest(2, "h5", 1), + createResourceRequest(2, ResourceRequest.ANY, 1)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + NodeQueueLoadMonitor selector = createNodeQueueLoadMonitor( + Arrays.asList("h1", "h2", "h3", "h4"), + Arrays.asList("/r2", "/r1", "/r3", "/r1"), + Arrays.asList(4, 4, 4, 4), Arrays.asList(5, 5, 5, 5)); + allocator.setNodeQueueLoadMonitor(selector); + + List containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + Set allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeId().toString()); + } + LOG.info("Containers: {}", containers); + assertTrue(allocatedHosts.contains("h2:1234")); + assertTrue(allocatedHosts.contains("h4:1234")); + assertFalse(allocatedHosts.contains("h1:1234")); + assertFalse(allocatedHosts.contains("h3:1234")); + assertEquals(2, containers.size()); + } + + /** + * Tests that allocation of rack local Opportunistic container requests + * with same allocation request id should be spread out. + * @throws Exception + */ + @Test + public void testRoundRobinRackLocalAllocationSameSchedulerKey() + throws Exception { + List reqs = + Arrays.asList( + createResourceRequest(2, "/r1", 2), + createResourceRequest(2, "h5", 2), + createResourceRequest(2, ResourceRequest.ANY, 2)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + NodeQueueLoadMonitor selector = createNodeQueueLoadMonitor( + Arrays.asList("h1", "h2", "h3", "h4"), + Arrays.asList("/r2", "/r1", "/r3", "/r1"), + Arrays.asList(4, 4, 4, 4), Arrays.asList(5, 5, 5, 5)); + allocator.setNodeQueueLoadMonitor(selector); + + List containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + Set allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeId().toString()); + } + LOG.info("Containers: {}", containers); + assertTrue(allocatedHosts.contains("h2:1234")); + assertTrue(allocatedHosts.contains("h4:1234")); + assertFalse(allocatedHosts.contains("h1:1234")); + assertFalse(allocatedHosts.contains("h3:1234")); + assertEquals(2, containers.size()); + } + + /** + * Tests off switch allocation of Opportunistic containers. + * @throws Exception + */ + @Test + public void testOffSwitchAllocationWhenNoNodeOrRack() throws Exception { + List reqs = + Arrays.asList( + createResourceRequest(2, "/r3", 2), + createResourceRequest(2, "h6", 2), + createResourceRequest(2, ResourceRequest.ANY, 2)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + NodeQueueLoadMonitor selector = createNodeQueueLoadMonitor( + Arrays.asList("h1", "h2", "h3", "h4"), + Arrays.asList("/r2", "/r1", "/r2", "/r1"), + Arrays.asList(4, 4, 4, 4), Arrays.asList(5, 5, 5, 5)); + allocator.setNodeQueueLoadMonitor(selector); + + List containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + LOG.info("Containers: {}", containers); + assertEquals(2, containers.size()); + } + + /** + * Tests allocation of rack local Opportunistic containers with same + * scheduler key. + * @throws Exception + */ + @Test + public void testLotsOfContainersRackLocalAllocationSameSchedulerKey() + throws Exception { + List reqs = + Arrays.asList( + createResourceRequest(2, "/r1", 1000), + createResourceRequest(2, "h1", 1000), + createResourceRequest(2, ResourceRequest.ANY, 1000)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + NodeQueueLoadMonitor selector = createNodeQueueLoadMonitor( + Arrays.asList("h1", "h2", "h3", "h4"), + Arrays.asList("/r1", "/r1", "/r1", "/r2"), + Arrays.asList(0, 0, 0, 0), Arrays.asList(500, 500, 500, 300)); + allocator.setNodeQueueLoadMonitor(selector); + + List containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + + Map hostsToNumContainerMap = new HashMap<>(); + for (Container c : containers) { + String host = c.getNodeId().toString(); + int numContainers = 0; + if (hostsToNumContainerMap.containsKey(host)) { + numContainers = hostsToNumContainerMap.get(host); + } + hostsToNumContainerMap.put(host, numContainers + 1); + } + assertEquals(1000, containers.size()); + assertEquals(500, hostsToNumContainerMap.get("h1:1234").intValue()); + assertFalse(hostsToNumContainerMap.containsKey("h4:1234")); + } + + /** + * Tests scheduling of many rack local Opportunistic container requests. + * @throws Exception + */ + @Test + public void testLotsOfContainersRackLocalAllocation() + throws Exception { + List reqs = new ArrayList<>(); + // add 100 container requests. + for (int i = 0; i < 100; i++) { + reqs.add(createResourceRequest(i + 1, ResourceRequest.ANY, 1)); + reqs.add(createResourceRequest(i + 1, "h5", 1)); + reqs.add(createResourceRequest(i + 1, "/r1", 1)); + } + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + NodeQueueLoadMonitor selector = createNodeQueueLoadMonitor( + Arrays.asList("h1", "h2", "h3", "h4"), + Arrays.asList("/r1", "/r1", "/r1", "/r2"), + Arrays.asList(0, 0, 0, 0), Arrays.asList(500, 500, 500, 300)); + allocator.setNodeQueueLoadMonitor(selector); + + List containers = new ArrayList<>(); + containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + assertEquals(100, containers.size()); + } + + /** + * Tests maximum number of opportunistic containers that can be allocated in + * AM heartbeat. + * @throws Exception + */ + @Test + public void testMaxAllocationsPerAMHeartbeat() throws Exception { + allocator.setMaxAllocationsPerAMHeartbeat(2); + List reqs = Arrays.asList( + createResourceRequest(2, "/r3", 3), + createResourceRequest(2, "h6", 3), + createResourceRequest(2, ResourceRequest.ANY, 3)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 5)); + + List containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + LOG.info("Containers: {}", containers); + // Although capacity is present, but only 2 containers should be allocated + // as max allocation per AM heartbeat is set to 2. + assertEquals(2, containers.size()); + containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, new ArrayList<>(), appAttId, + oppCntxt, 1L, "user"); + LOG.info("Containers: {}", containers); + // Remaining 1 container should be allocated. + assertEquals(1, containers.size()); + } + + /** + * Tests maximum opportunistic container allocation per AM heartbeat for + * allocation requests with different scheduler key. + * @throws Exception + */ + @Test + public void testMaxAllocationsPerAMHeartbeatDifferentSchedKey() + throws Exception { + allocator.setMaxAllocationsPerAMHeartbeat(2); + List reqs = + Arrays.asList( + createResourceRequest(1, ResourceRequest.ANY, 1), + createResourceRequest(2, "h6", 2), + createResourceRequest(3, "/r3", 2)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 5)); + + List containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + LOG.info("Containers: {}", containers); + // Although capacity is present, but only 2 containers should be allocated + // as max allocation per AM heartbeat is set to 2. + assertEquals(2, containers.size()); + containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, new ArrayList<>(), appAttId, + oppCntxt, 1L, "user"); + LOG.info("Containers: {}", containers); + // 2 more containers should be allocated from pending allocation requests. + assertEquals(2, containers.size()); + containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, new ArrayList<>(), appAttId, + oppCntxt, 1L, "user"); + LOG.info("Containers: {}", containers); + // Remaining 1 container should be allocated. + assertEquals(1, containers.size()); + } + + /** + * Tests maximum opportunistic container allocation per AM heartbeat when + * limit is set to -1. + * @throws Exception + */ + @Test + public void testMaxAllocationsPerAMHeartbeatWithNoLimit() throws Exception { + allocator.setMaxAllocationsPerAMHeartbeat(-1); + + List reqs = new ArrayList<>(); + final int numContainers = 20; + for (int i = 0; i < numContainers; i++) { + reqs.add(createResourceRequest(i + 1, "h1", 1)); + } + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 500)); + + List containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + + // all containers should be allocated in single heartbeat. + assertEquals(numContainers, containers.size()); + } + + /** + * Tests maximum opportunistic container allocation per AM heartbeat when + * limit is set to higher value. + * @throws Exception + */ + @Test + public void testMaxAllocationsPerAMHeartbeatWithHighLimit() + throws Exception { + allocator.setMaxAllocationsPerAMHeartbeat(100); + final int numContainers = 20; + List reqs = new ArrayList<>(); + for (int i = 0; i < numContainers; i++) { + reqs.add(createResourceRequest(i + 1, "h1", 1)); + } + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 500)); + + List containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + + // all containers should be allocated in single heartbeat. + assertEquals(numContainers, containers.size()); + } + + /** + * Test opportunistic container allocation latency metrics. + * @throws Exception + */ + @Test + public void testAllocationLatencyMetrics() throws Exception { + oppCntxt = spy(oppCntxt); + OpportunisticSchedulerMetrics metrics = + mock(OpportunisticSchedulerMetrics.class); + when(oppCntxt.getOppSchedulerMetrics()).thenReturn(metrics); + List reqs = Arrays.asList( + createResourceRequest(2, "/r3", 2), + createResourceRequest(2, "h6", 2), + createResourceRequest(2, ResourceRequest.ANY, 2)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 5)); + + List containers = allocator.allocateContainers( + EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user"); + LOG.info("Containers: {}", containers); + assertEquals(2, containers.size()); + // for each allocated container, latency should be added. + verify(metrics, times(2)).addAllocateOLatencyEntry(anyLong()); + } + + private NodeQueueLoadMonitor createNodeQueueLoadMonitor(int numNodes, + int queueLength, int queueCapacity) { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + for (int i = 1; i <= numNodes; ++i) { + RMNode node = createRMNode("h" + i, 1234, queueLength, queueCapacity); + selector.addNode(null, node); + selector.updateNode(node); + } + selector.computeTask.run(); + return selector; + } + + private NodeQueueLoadMonitor createNodeQueueLoadMonitor(List hosts, + List racks, List queueLengths, + List queueCapacities) { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + for (int i = 0; i < hosts.size(); ++i) { + RMNode node = createRMNode(hosts.get(i), 1234, racks.get(i), + queueLengths.get(i), queueCapacities.get(i)); + selector.addNode(null, node); + selector.updateNode(node); + } + selector.computeTask.run(); + return selector; + } + + private ResourceRequest createResourceRequest(int allocationId, + String location, int numContainers) { + return ResourceRequest.newBuilder() + .allocationRequestId(allocationId) + .priority(PRIORITY_NORMAL) + .resourceName(location) + .capability(CAPABILITY_1GB) + .relaxLocality(true) + .numContainers(numContainers) + .executionType(ExecutionType.OPPORTUNISTIC).build(); + } + + private RMNode createRMNode(String host, int port, int queueLength, + int queueCapacity) { + return createRMNode(host, port, "default", queueLength, + queueCapacity); + } + + private RMNode createRMNode(String host, int port, String rack, + int queueLength, int queueCapacity) { + RMNode node1 = Mockito.mock(RMNode.class); + NodeId nID1 = new TestNodeQueueLoadMonitor.FakeNodeId(host, port); + Mockito.when(node1.getHostName()).thenReturn(host); + Mockito.when(node1.getRackName()).thenReturn(rack); + Mockito.when(node1.getNodeID()).thenReturn(nID1); + Mockito.when(node1.getState()).thenReturn(NodeState.RUNNING); + OpportunisticContainersStatus status1 = + Mockito.mock(OpportunisticContainersStatus.class); + Mockito.when(status1.getEstimatedQueueWaitTime()) + .thenReturn(-1); + Mockito.when(status1.getWaitQueueLength()) + .thenReturn(queueLength); + Mockito.when(status1.getOpportQueueCapacity()) + .thenReturn(queueCapacity); + Mockito.when(node1.getOpportunisticContainersStatus()).thenReturn(status1); + return node1; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java index bbc0086c37..eedd9de93b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java @@ -27,7 +27,9 @@ import org.junit.Test; import org.mockito.Mockito; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * Unit tests for NodeQueueLoadMonitor. @@ -228,6 +230,127 @@ public void testContainerQueuingLimit() { } + /** + * Tests selection of local node from NodeQueueLoadMonitor. This test covers + * selection of node based on queue limit and blacklisted nodes. + */ + @Test + public void testSelectLocalNode() { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + + RMNode h1 = createRMNode("h1", 1, -1, 2, 5); + RMNode h2 = createRMNode("h2", 2, -1, 5, 5); + RMNode h3 = createRMNode("h3", 3, -1, 4, 5); + + selector.addNode(null, h1); + selector.addNode(null, h2); + selector.addNode(null, h3); + + selector.updateNode(h1); + selector.updateNode(h2); + selector.updateNode(h3); + + // basic test for selecting node which has queue length less + // than queue capacity. + Set blacklist = new HashSet<>(); + RMNode node = selector.selectLocalNode("h1", blacklist); + Assert.assertEquals("h1", node.getHostName()); + + // if node has been added to blacklist + blacklist.add("h1"); + node = selector.selectLocalNode("h1", blacklist); + Assert.assertNull(node); + + node = selector.selectLocalNode("h2", blacklist); + Assert.assertNull(node); + + node = selector.selectLocalNode("h3", blacklist); + Assert.assertEquals("h3", node.getHostName()); + } + + /** + * Tests selection of rack local node from NodeQueueLoadMonitor. This test + * covers selection of node based on queue limit and blacklisted nodes. + */ + @Test + public void testSelectRackLocalNode() { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + + RMNode h1 = createRMNode("h1", 1, "rack1", -1, 2, 5); + RMNode h2 = createRMNode("h2", 2, "rack2", -1, 5, 5); + RMNode h3 = createRMNode("h3", 3, "rack2", -1, 4, 5); + + selector.addNode(null, h1); + selector.addNode(null, h2); + selector.addNode(null, h3); + + selector.updateNode(h1); + selector.updateNode(h2); + selector.updateNode(h3); + + // basic test for selecting node which has queue length less + // than queue capacity. + Set blacklist = new HashSet<>(); + RMNode node = selector.selectRackLocalNode("rack1", blacklist); + Assert.assertEquals("h1", node.getHostName()); + + // if node has been added to blacklist + blacklist.add("h1"); + node = selector.selectRackLocalNode("rack1", blacklist); + Assert.assertNull(node); + + node = selector.selectRackLocalNode("rack2", blacklist); + Assert.assertEquals("h3", node.getHostName()); + + blacklist.add("h3"); + node = selector.selectRackLocalNode("rack2", blacklist); + Assert.assertNull(node); + } + + /** + * Tests selection of any node from NodeQueueLoadMonitor. This test + * covers selection of node based on queue limit and blacklisted nodes. + */ + @Test + public void testSelectAnyNode() { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + + RMNode h1 = createRMNode("h1", 1, "rack1", -1, 2, 5); + RMNode h2 = createRMNode("h2", 2, "rack2", -1, 5, 5); + RMNode h3 = createRMNode("h3", 3, "rack2", -1, 4, 10); + + selector.addNode(null, h1); + selector.addNode(null, h2); + selector.addNode(null, h3); + + selector.updateNode(h1); + selector.updateNode(h2); + selector.updateNode(h3); + + selector.computeTask.run(); + + Assert.assertEquals(2, selector.getSortedNodes().size()); + + // basic test for selecting node which has queue length + // less than queue capacity. + Set blacklist = new HashSet<>(); + RMNode node = selector.selectAnyNode(blacklist); + Assert.assertTrue(node.getHostName().equals("h1") || + node.getHostName().equals("h3")); + + // if node has been added to blacklist + blacklist.add("h1"); + node = selector.selectAnyNode(blacklist); + Assert.assertEquals("h3", node.getHostName()); + + blacklist.add("h3"); + node = selector.selectAnyNode(blacklist); + Assert.assertNull(node); + } + private RMNode createRMNode(String host, int port, int waitTime, int queueLength) { return createRMNode(host, port, waitTime, queueLength, @@ -236,20 +359,28 @@ private RMNode createRMNode(String host, int port, private RMNode createRMNode(String host, int port, int waitTime, int queueLength, NodeState state) { - return createRMNode(host, port, waitTime, queueLength, + return createRMNode(host, port, "default", waitTime, queueLength, DEFAULT_MAX_QUEUE_LENGTH, state); } private RMNode createRMNode(String host, int port, int waitTime, int queueLength, int queueCapacity) { - return createRMNode(host, port, waitTime, queueLength, queueCapacity, + return createRMNode(host, port, "default", waitTime, queueLength, + queueCapacity, NodeState.RUNNING); + } + + private RMNode createRMNode(String host, int port, String rack, + int waitTime, int queueLength, int queueCapacity) { + return createRMNode(host, port, rack, waitTime, queueLength, queueCapacity, NodeState.RUNNING); } - private RMNode createRMNode(String host, int port, + private RMNode createRMNode(String host, int port, String rack, int waitTime, int queueLength, int queueCapacity, NodeState state) { RMNode node1 = Mockito.mock(RMNode.class); NodeId nID1 = new FakeNodeId(host, port); + Mockito.when(node1.getHostName()).thenReturn(host); + Mockito.when(node1.getRackName()).thenReturn(rack); Mockito.when(node1.getNodeID()).thenReturn(nID1); Mockito.when(node1.getState()).thenReturn(state); OpportunisticContainersStatus status1 =