diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 5600aa80db..b31bd69a9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -318,6 +318,7 @@ public class OpportunisticContainerAllocator { opportContext.addToOutstandingReqs(oppResourceReqs); Set nodeBlackList = new HashSet<>(opportContext.getBlacklist()); + Set allocatedNodes = new HashSet<>(); List allocatedContainers = new ArrayList<>(); // Satisfy the outstanding OPPORTUNISTIC requests. @@ -335,7 +336,7 @@ public class OpportunisticContainerAllocator { // the outstanding reqs) Map> allocation = allocate( rmIdentifier, opportContext, schedulerKey, applicationAttemptId, - appSubmitter, nodeBlackList); + appSubmitter, nodeBlackList, allocatedNodes); if (allocation.size() > 0) { allocations.add(allocation); continueLoop = true; @@ -357,14 +358,15 @@ public class OpportunisticContainerAllocator { private Map> allocate(long rmIdentifier, OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, - ApplicationAttemptId appAttId, String userName, Set blackList) + ApplicationAttemptId appAttId, String userName, Set blackList, + Set allocatedNodes) throws YarnException { Map> containers = new HashMap<>(); for (EnrichedResourceRequest enrichedAsk : appContext.getOutstandingOpReqs().get(schedKey).values()) { allocateContainersInternal(rmIdentifier, appContext.getAppParams(), - appContext.getContainerIdGenerator(), blackList, appAttId, - appContext.getNodeMap(), userName, containers, enrichedAsk); + appContext.getContainerIdGenerator(), blackList, allocatedNodes, + appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk); ResourceRequest anyAsk = enrichedAsk.getRequest(); if (!containers.isEmpty()) { LOG.info("Opportunistic allocation requested for [priority={}, " @@ -379,9 +381,9 @@ public class OpportunisticContainerAllocator { private void allocateContainersInternal(long rmIdentifier, AllocationParams appParams, ContainerIdGenerator idCounter, - Set blacklist, ApplicationAttemptId id, - Map allNodes, String userName, - Map> allocations, + Set blacklist, Set allocatedNodes, + ApplicationAttemptId id, Map allNodes, + String userName, Map> allocations, EnrichedResourceRequest enrichedAsk) throws YarnException { if (allNodes.size() == 0) { @@ -406,7 +408,8 @@ public class OpportunisticContainerAllocator { } while (numAllocated < toAllocate) { Collection nodeCandidates = - findNodeCandidates(loopIndex, allNodes, blacklist, enrichedAsk); + findNodeCandidates(loopIndex, allNodes, blacklist, allocatedNodes, + enrichedAsk); for (RemoteNode rNode : nodeCandidates) { String rNodeHost = rNode.getNodeId().getHost(); // Ignore black list @@ -422,6 +425,10 @@ public class OpportunisticContainerAllocator { } else { continue; } + } else if (allocatedNodes.contains(rNodeHost)) { + LOG.info("Opportunistic container has already been allocated on {}.", + rNodeHost); + continue; } if (loopIndex == RACK_LOCAL_LOOP) { if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) { @@ -435,11 +442,7 @@ public class OpportunisticContainerAllocator { anyAsk, rNode); numAllocated++; updateMetrics(loopIndex); - // Try to spread the allocations across the nodes. - // But don't add if it is a node local request. - if (loopIndex != NODE_LOCAL_LOOP) { - blacklist.add(rNode.getNodeId().getHost()); - } + allocatedNodes.add(rNodeHost); LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + "location [" + location + "]"); if (numAllocated >= toAllocate) { @@ -475,7 +478,7 @@ public class OpportunisticContainerAllocator { private Collection findNodeCandidates(int loopIndex, Map allNodes, Set blackList, - EnrichedResourceRequest enrichedRR) { + Set allocatedNodes, EnrichedResourceRequest enrichedRR) { LinkedList retList = new LinkedList<>(); String partition = getRequestPartition(enrichedRR); if (loopIndex > 1) { @@ -495,8 +498,9 @@ public class OpportunisticContainerAllocator { allNodes, enrichedRR, retList, numContainers); } else { // Rack local candidates - numContainers = collectRackLocalCandidates( - allNodes, enrichedRR, retList, blackList, numContainers); + numContainers = + collectRackLocalCandidates(allNodes, enrichedRR, retList, + blackList, allocatedNodes, numContainers); } if (numContainers == enrichedRR.getRequest().getNumContainers()) { // If there is no change in numContainers, then there is no point @@ -510,12 +514,16 @@ public class OpportunisticContainerAllocator { private int collectRackLocalCandidates(Map allNodes, EnrichedResourceRequest enrichedRR, LinkedList retList, - Set blackList, int numContainers) { + Set blackList, Set allocatedNodes, int numContainers) { String partition = getRequestPartition(enrichedRR); for (RemoteNode rNode : allNodes.values()) { if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) && enrichedRR.getRackLocations().contains(rNode.getRackName())) { - if (blackList.contains(rNode.getNodeId().getHost())) { + String rHost = rNode.getNodeId().getHost(); + if (blackList.contains(rHost)) { + continue; + } + if (allocatedNodes.contains(rHost)) { retList.addLast(rNode); } else { retList.addFirst(rNode); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java index 6f71b36f35..65ad74870f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java @@ -196,18 +196,6 @@ public class TestOpportunisticContainerAllocator { new ArrayList<>(), new ArrayList<>()); List reqs = Arrays.asList( - ResourceRequest.newBuilder().allocationRequestId(1) - .priority(Priority.newInstance(1)) - .resourceName("/r1") - .capability(Resources.createResource(1 * GB)) - .relaxLocality(true) - .executionType(ExecutionType.OPPORTUNISTIC).build(), - ResourceRequest.newBuilder().allocationRequestId(1) - .priority(Priority.newInstance(1)) - .resourceName("h1") - .capability(Resources.createResource(1 * GB)) - .relaxLocality(true) - .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(1) .priority(Priority.newInstance(1)) .resourceName(ResourceRequest.ANY) @@ -227,6 +215,24 @@ public class TestOpportunisticContainerAllocator { .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(3) + .priority(Priority.newInstance(1)) + .resourceName("/r1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(3) + .priority(Priority.newInstance(1)) + .resourceName("h1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(3) .priority(Priority.newInstance(1)) .resourceName(ResourceRequest.ANY) .capability(Resources.createResource(1 * GB)) @@ -247,14 +253,14 @@ public class TestOpportunisticContainerAllocator { List containers = allocator.allocateContainers( blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); LOG.info("Containers: {}", containers); - Set allocatedHosts = new HashSet<>(); + // all 3 containers should be allocated. + Assert.assertEquals(3, containers.size()); + // container with allocation id 2 and 3 should be allocated on node h1 for (Container c : containers) { - allocatedHosts.add(c.getNodeHttpAddress()); + if (c.getAllocationRequestId() == 2 || c.getAllocationRequestId() == 3) { + Assert.assertEquals("h1:1234", c.getNodeHttpAddress()); + } } - Assert.assertEquals(2, containers.size()); - Assert.assertTrue(allocatedHosts.contains("h1:1234")); - Assert.assertFalse(allocatedHosts.contains("h2:1234")); - Assert.assertFalse(allocatedHosts.contains("h3:1234")); } @Test