YARN-6895. [FairScheduler] Preemption reservation may cause regular reservation leaks. (Miklos Szegedi via Yufei Gu)

This commit is contained in:
Yufei Gu 2017-08-02 09:25:19 -07:00
parent 48899134d2
commit 45535f8afa
4 changed files with 119 additions and 27 deletions

View File

@ -554,6 +554,15 @@ void resetMinshareStarvation() {
this.minshareStarvation = Resources.none(); this.minshareStarvation = Resources.none();
} }
/**
* Get last computed minshare starvation.
*
* @return last computed minshare starvation
*/
Resource getMinshareStarvation() {
return minshareStarvation;
}
void trackContainerForPreemption(RMContainer container) { void trackContainerForPreemption(RMContainer container) {
synchronized (preemptionVariablesLock) { synchronized (preemptionVariablesLock) {
if (containersToPreempt.add(container)) { if (containersToPreempt.add(container)) {
@ -842,7 +851,10 @@ private Resource assignContainer(
} }
// The desired container won't fit here, so reserve // The desired container won't fit here, so reserve
// Reserve only, if app does not wait for preempted resources on the node,
// otherwise we may end up with duplicate reservations
if (isReservable(capability) && if (isReservable(capability) &&
!node.isPreemptedForApp(this) &&
reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer, reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer,
type, schedulerKey)) { type, schedulerKey)) {
updateAMDiagnosticMsg(capability, " exceeds the available resources of " updateAMDiagnosticMsg(capability, " exceeds the available resources of "
@ -1110,7 +1122,8 @@ Resource fairShareStarvation() {
} }
if (!starved || if (!starved ||
now - lastTimeAtFairShare < getQueue().getFairSharePreemptionTimeout()) { now - lastTimeAtFairShare <
getQueue().getFairSharePreemptionTimeout()) {
fairshareStarvation = Resources.none(); fairshareStarvation = Resources.none();
} else { } else {
// The app has been starved for longer than preemption-timeout. // The app has been starved for longer than preemption-timeout.
@ -1138,7 +1151,7 @@ boolean isStarvedForFairShare() {
} }
/** /**
* Is application starved for fairshare or minshare * Is application starved for fairshare or minshare.
*/ */
boolean isStarved() { boolean isStarved() {
return isStarvedForFairShare() || !Resources.isNone(minshareStarvation); return isStarvedForFairShare() || !Resources.isNone(minshareStarvation);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -35,12 +36,15 @@
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
/**
* Fair Scheduler specific node features.
*/
@Private @Private
@Unstable @Unstable
public class FSSchedulerNode extends SchedulerNode { public class FSSchedulerNode extends SchedulerNode {
@ -122,7 +126,8 @@ public synchronized void unreserveResource(
SchedulerApplicationAttempt application) { SchedulerApplicationAttempt application) {
// Cannot unreserve for wrong application... // Cannot unreserve for wrong application...
ApplicationAttemptId reservedApplication = ApplicationAttemptId reservedApplication =
getReservedContainer().getContainer().getId().getApplicationAttemptId(); getReservedContainer().getContainer().getId()
.getApplicationAttemptId();
if (!reservedApplication.equals( if (!reservedApplication.equals(
application.getApplicationAttemptId())) { application.getApplicationAttemptId())) {
throw new IllegalStateException("Trying to unreserve " + throw new IllegalStateException("Trying to unreserve " +
@ -151,20 +156,37 @@ synchronized LinkedHashMap<FSAppAttempt, Resource> getPreemptionList() {
return new LinkedHashMap<>(resourcesPreemptedForApp); return new LinkedHashMap<>(resourcesPreemptedForApp);
} }
/**
* Returns whether a preemption is tracked on the node for the specified app.
* @return if preempted containers are reserved for the app
*/
synchronized boolean isPreemptedForApp(FSAppAttempt app){
return resourcesPreemptedForApp.containsKey(app);
}
/** /**
* Remove apps that have their preemption requests fulfilled. * Remove apps that have their preemption requests fulfilled.
*/ */
private synchronized void cleanupPreemptionList() { private void cleanupPreemptionList() {
Iterator<Map.Entry<FSAppAttempt, Resource>> iterator = // Synchronize separately to avoid potential deadlocks
resourcesPreemptedForApp.entrySet().iterator(); // This may cause delayed deletion of reservations
while(iterator.hasNext()) { LinkedList<FSAppAttempt> candidates;
FSAppAttempt app = iterator.next().getKey(); synchronized (this) {
if (app.isStopped() || !app.isStarved()) { candidates = Lists.newLinkedList(resourcesPreemptedForApp.keySet());
}
for (FSAppAttempt app : candidates) {
if (app.isStopped() || !app.isStarved() ||
(Resources.isNone(app.getFairshareStarvation()) &&
Resources.isNone(app.getMinshareStarvation()))) {
// App does not need more resources // App does not need more resources
Resources.subtractFrom(totalResourcesPreempted, synchronized (this) {
resourcesPreemptedForApp.get(app)); Resource removed = resourcesPreemptedForApp.remove(app);
appIdToAppMap.remove(app.getApplicationAttemptId()); if (removed != null) {
iterator.remove(); Resources.subtractFrom(totalResourcesPreempted,
removed);
appIdToAppMap.remove(app.getApplicationAttemptId());
}
}
} }
} }
} }
@ -180,15 +202,23 @@ private synchronized void cleanupPreemptionList() {
void addContainersForPreemption(Collection<RMContainer> containers, void addContainersForPreemption(Collection<RMContainer> containers,
FSAppAttempt app) { FSAppAttempt app) {
appIdToAppMap.putIfAbsent(app.getApplicationAttemptId(), app); Resource appReserved = Resources.createResource(0);
resourcesPreemptedForApp.putIfAbsent(app, Resource.newInstance(0, 0));
Resource appReserved = resourcesPreemptedForApp.get(app);
for(RMContainer container : containers) { for(RMContainer container : containers) {
containersForPreemption.add(container); if(containersForPreemption.add(container)) {
Resources.addTo(appReserved, container.getAllocatedResource()); Resources.addTo(appReserved, container.getAllocatedResource());
Resources.addTo(totalResourcesPreempted, }
container.getAllocatedResource()); }
synchronized (this) {
if (!Resources.isNone(appReserved)) {
Resources.addTo(totalResourcesPreempted,
appReserved);
appIdToAppMap.putIfAbsent(app.getApplicationAttemptId(), app);
resourcesPreemptedForApp.
putIfAbsent(app, Resource.newInstance(0, 0));
Resources.addTo(resourcesPreemptedForApp.get(app), appReserved);
}
} }
} }

View File

@ -985,25 +985,22 @@ private boolean shouldContinueAssigning(int containers,
* Assign preempted containers to the applications that have reserved * Assign preempted containers to the applications that have reserved
* resources for preempted containers. * resources for preempted containers.
* @param node Node to check * @param node Node to check
* @return assignment has occurred
*/ */
static boolean assignPreemptedContainers(FSSchedulerNode node) { static void assignPreemptedContainers(FSSchedulerNode node) {
boolean assignedAny = false;
for (Entry<FSAppAttempt, Resource> entry : for (Entry<FSAppAttempt, Resource> entry :
node.getPreemptionList().entrySet()) { node.getPreemptionList().entrySet()) {
FSAppAttempt app = entry.getKey(); FSAppAttempt app = entry.getKey();
Resource preemptionPending = Resources.clone(entry.getValue()); Resource preemptionPending = Resources.clone(entry.getValue());
while (!app.isStopped() && !Resources.isNone(preemptionPending)) { while (!app.isStopped() && !Resources.isNone(preemptionPending)) {
Resource assigned = app.assignContainer(node); Resource assigned = app.assignContainer(node);
if (Resources.isNone(assigned)) { if (Resources.isNone(assigned) ||
assigned.equals(FairScheduler.CONTAINER_RESERVED)) {
// Fail to assign, let's not try further // Fail to assign, let's not try further
break; break;
} }
assignedAny = true;
Resources.subtractFromNonNegative(preemptionPending, assigned); Resources.subtractFromNonNegative(preemptionPending, assigned);
} }
} }
return assignedAny;
} }
@VisibleForTesting @VisibleForTesting

View File

@ -31,6 +31,7 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -68,6 +69,16 @@ private RMContainer createContainer(
when(container.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); when(container.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
when(container.getAllocatedResource()). when(container.getAllocatedResource()).
thenReturn(Resources.clone(request)); thenReturn(Resources.clone(request));
when(container.compareTo(any())).thenAnswer(new Answer<Integer>() {
public Integer answer(InvocationOnMock invocation) {
return
Long.compare(
((RMContainer)invocation.getMock()).getContainerId()
.getContainerId(),
((RMContainer)invocation.getArguments()[0]).getContainerId()
.getContainerId());
}
});
containers.add(container); containers.add(container);
return container; return container;
} }
@ -224,6 +235,47 @@ public void testSimplePreemption() {
finalValidation(schedulerNode); finalValidation(schedulerNode);
} }
/**
* Allocate a single container twice and release.
*/
@Test
public void testDuplicatePreemption() {
RMNode node = createNode();
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
// Launch containers and saturate the cluster
saturateCluster(schedulerNode);
assertEquals("Container should be allocated",
Resources.multiply(containers.get(0).getContainer().getResource(),
containers.size()),
schedulerNode.getAllocatedResource());
// Request preemption twice
FSAppAttempt starvingApp = createStarvingApp(schedulerNode,
Resource.newInstance(1024, 1));
schedulerNode.addContainersForPreemption(
Collections.singletonList(containers.get(0)), starvingApp);
schedulerNode.addContainersForPreemption(
Collections.singletonList(containers.get(0)), starvingApp);
assertEquals(
"No resource amount should be reserved for preemptees",
containers.get(0).getAllocatedResource(),
schedulerNode.getTotalReserved());
// Preemption occurs release one container
schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
allocateContainers(schedulerNode);
assertEquals("Container should be allocated",
schedulerNode.getTotalResource(),
schedulerNode.getAllocatedResource());
// Release all remaining containers
for (int i = 1; i < containers.size(); ++i) {
schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
}
finalValidation(schedulerNode);
}
/** /**
* Allocate and release three containers requested by two apps. * Allocate and release three containers requested by two apps.
*/ */