diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index a678bb9ec4..5dfef731e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -554,6 +554,15 @@ void resetMinshareStarvation() { this.minshareStarvation = Resources.none(); } + /** + * Get last computed minshare starvation. + * + * @return last computed minshare starvation + */ + Resource getMinshareStarvation() { + return minshareStarvation; + } + void trackContainerForPreemption(RMContainer container) { synchronized (preemptionVariablesLock) { if (containersToPreempt.add(container)) { @@ -842,7 +851,10 @@ private Resource assignContainer( } // The desired container won't fit here, so reserve + // Reserve only, if app does not wait for preempted resources on the node, + // otherwise we may end up with duplicate reservations if (isReservable(capability) && + !node.isPreemptedForApp(this) && reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer, type, schedulerKey)) { updateAMDiagnosticMsg(capability, " exceeds the available resources of " @@ -1110,7 +1122,8 @@ Resource fairShareStarvation() { } if (!starved || - now - lastTimeAtFairShare < getQueue().getFairSharePreemptionTimeout()) { + now - lastTimeAtFairShare < + getQueue().getFairSharePreemptionTimeout()) { fairshareStarvation = Resources.none(); } else { // The app has been starved for longer than preemption-timeout. @@ -1138,7 +1151,7 @@ boolean isStarvedForFairShare() { } /** - * Is application starved for fairshare or minshare + * Is application starved for fairshare or minshare. */ boolean isStarved() { return isStarvedForFairShare() || !Resources.isNone(minshareStarvation); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 6575e0c3ca..93646f47ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -35,12 +36,15 @@ import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; +/** + * Fair Scheduler specific node features. + */ @Private @Unstable public class FSSchedulerNode extends SchedulerNode { @@ -122,7 +126,8 @@ public synchronized void unreserveResource( SchedulerApplicationAttempt application) { // Cannot unreserve for wrong application... ApplicationAttemptId reservedApplication = - getReservedContainer().getContainer().getId().getApplicationAttemptId(); + getReservedContainer().getContainer().getId() + .getApplicationAttemptId(); if (!reservedApplication.equals( application.getApplicationAttemptId())) { throw new IllegalStateException("Trying to unreserve " + @@ -151,20 +156,37 @@ synchronized LinkedHashMap getPreemptionList() { return new LinkedHashMap<>(resourcesPreemptedForApp); } + /** + * Returns whether a preemption is tracked on the node for the specified app. + * @return if preempted containers are reserved for the app + */ + synchronized boolean isPreemptedForApp(FSAppAttempt app){ + return resourcesPreemptedForApp.containsKey(app); + } + /** * Remove apps that have their preemption requests fulfilled. */ - private synchronized void cleanupPreemptionList() { - Iterator> iterator = - resourcesPreemptedForApp.entrySet().iterator(); - while(iterator.hasNext()) { - FSAppAttempt app = iterator.next().getKey(); - if (app.isStopped() || !app.isStarved()) { + private void cleanupPreemptionList() { + // Synchronize separately to avoid potential deadlocks + // This may cause delayed deletion of reservations + LinkedList candidates; + synchronized (this) { + candidates = Lists.newLinkedList(resourcesPreemptedForApp.keySet()); + } + for (FSAppAttempt app : candidates) { + if (app.isStopped() || !app.isStarved() || + (Resources.isNone(app.getFairshareStarvation()) && + Resources.isNone(app.getMinshareStarvation()))) { // App does not need more resources - Resources.subtractFrom(totalResourcesPreempted, - resourcesPreemptedForApp.get(app)); - appIdToAppMap.remove(app.getApplicationAttemptId()); - iterator.remove(); + synchronized (this) { + Resource removed = resourcesPreemptedForApp.remove(app); + if (removed != null) { + Resources.subtractFrom(totalResourcesPreempted, + removed); + appIdToAppMap.remove(app.getApplicationAttemptId()); + } + } } } } @@ -180,15 +202,23 @@ private synchronized void cleanupPreemptionList() { void addContainersForPreemption(Collection containers, FSAppAttempt app) { - appIdToAppMap.putIfAbsent(app.getApplicationAttemptId(), app); - resourcesPreemptedForApp.putIfAbsent(app, Resource.newInstance(0, 0)); - Resource appReserved = resourcesPreemptedForApp.get(app); + Resource appReserved = Resources.createResource(0); for(RMContainer container : containers) { - containersForPreemption.add(container); - Resources.addTo(appReserved, container.getAllocatedResource()); - Resources.addTo(totalResourcesPreempted, - container.getAllocatedResource()); + if(containersForPreemption.add(container)) { + Resources.addTo(appReserved, container.getAllocatedResource()); + } + } + + synchronized (this) { + if (!Resources.isNone(appReserved)) { + Resources.addTo(totalResourcesPreempted, + appReserved); + appIdToAppMap.putIfAbsent(app.getApplicationAttemptId(), app); + resourcesPreemptedForApp. + putIfAbsent(app, Resource.newInstance(0, 0)); + Resources.addTo(resourcesPreemptedForApp.get(app), appReserved); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index b41d3f71dc..db02bab3a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -985,25 +985,22 @@ private boolean shouldContinueAssigning(int containers, * Assign preempted containers to the applications that have reserved * resources for preempted containers. * @param node Node to check - * @return assignment has occurred */ - static boolean assignPreemptedContainers(FSSchedulerNode node) { - boolean assignedAny = false; + static void assignPreemptedContainers(FSSchedulerNode node) { for (Entry entry : node.getPreemptionList().entrySet()) { FSAppAttempt app = entry.getKey(); Resource preemptionPending = Resources.clone(entry.getValue()); while (!app.isStopped() && !Resources.isNone(preemptionPending)) { Resource assigned = app.assignContainer(node); - if (Resources.isNone(assigned)) { + if (Resources.isNone(assigned) || + assigned.equals(FairScheduler.CONTAINER_RESERVED)) { // Fail to assign, let's not try further break; } - assignedAny = true; Resources.subtractFromNonNegative(preemptionPending, assigned); } } - return assignedAny; } @VisibleForTesting diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java index 3927b00f68..0e3d3445b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java @@ -31,6 +31,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -68,6 +69,16 @@ private RMContainer createContainer( when(container.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); when(container.getAllocatedResource()). thenReturn(Resources.clone(request)); + when(container.compareTo(any())).thenAnswer(new Answer() { + public Integer answer(InvocationOnMock invocation) { + return + Long.compare( + ((RMContainer)invocation.getMock()).getContainerId() + .getContainerId(), + ((RMContainer)invocation.getArguments()[0]).getContainerId() + .getContainerId()); + } + }); containers.add(container); return container; } @@ -224,6 +235,47 @@ public void testSimplePreemption() { finalValidation(schedulerNode); } + /** + * Allocate a single container twice and release. + */ + @Test + public void testDuplicatePreemption() { + RMNode node = createNode(); + FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false); + + // Launch containers and saturate the cluster + saturateCluster(schedulerNode); + assertEquals("Container should be allocated", + Resources.multiply(containers.get(0).getContainer().getResource(), + containers.size()), + schedulerNode.getAllocatedResource()); + + // Request preemption twice + FSAppAttempt starvingApp = createStarvingApp(schedulerNode, + Resource.newInstance(1024, 1)); + schedulerNode.addContainersForPreemption( + Collections.singletonList(containers.get(0)), starvingApp); + schedulerNode.addContainersForPreemption( + Collections.singletonList(containers.get(0)), starvingApp); + assertEquals( + "No resource amount should be reserved for preemptees", + containers.get(0).getAllocatedResource(), + schedulerNode.getTotalReserved()); + + // Preemption occurs release one container + schedulerNode.releaseContainer(containers.get(0).getContainerId(), true); + allocateContainers(schedulerNode); + assertEquals("Container should be allocated", + schedulerNode.getTotalResource(), + schedulerNode.getAllocatedResource()); + + // Release all remaining containers + for (int i = 1; i < containers.size(); ++i) { + schedulerNode.releaseContainer(containers.get(i).getContainerId(), true); + } + finalValidation(schedulerNode); + } + /** * Allocate and release three containers requested by two apps. */