YARN-5831. FairScheduler: Propagate allowPreemptionFrom flag all the way down to the app. (Yufei Gu via kasha)

This commit is contained in:
Karthik Kambatla 2017-01-17 17:01:03 -08:00
parent 47dfda7c92
commit e224c96234
9 changed files with 65 additions and 44 deletions

View File

@ -568,6 +568,10 @@ private Resource getPreemptedResources() {
} }
boolean canContainerBePreempted(RMContainer container) { boolean canContainerBePreempted(RMContainer container) {
if (!isPreemptable()) {
return false;
}
// Sanity check that the app owns this container // Sanity check that the app owns this container
if (!getLiveContainersMap().containsKey(container.getContainerId()) && if (!getLiveContainersMap().containsKey(container.getContainerId()) &&
!newlyAllocatedContainers.contains(container)) { !newlyAllocatedContainers.contains(container)) {
@ -581,17 +585,6 @@ boolean canContainerBePreempted(RMContainer container) {
return false; return false;
} }
// Check if any of the parent queues are not preemptable
// TODO (YARN-5831): Propagate the "preemptable" flag all the way down to
// the app to avoid recursing up every time.
for (FSQueue q = getQueue();
!q.getQueueName().equals("root");
q = q.getParent()) {
if (!q.isPreemptable()) {
return false;
}
}
// Check if the app's allocation will be over its fairshare even // Check if the app's allocation will be over its fairshare even
// after preempting this container // after preempting this container
Resource currentUsage = getResourceUsage(); Resource currentUsage = getResourceUsage();
@ -1241,4 +1234,9 @@ public int hashCode() {
public boolean equals(Object o) { public boolean equals(Object o) {
return super.equals(o); return super.equals(o);
} }
@Override
public boolean isPreemptable() {
return getQueue().isPreemptable();
}
} }

View File

@ -108,21 +108,6 @@ void recomputeSteadyShares() {
} }
} }
@Override
public void updatePreemptionVariables() {
super.updatePreemptionVariables();
// For child queues
readLock.lock();
try {
for (FSQueue childQueue : childQueues) {
childQueue.updatePreemptionVariables();
}
} finally {
readLock.unlock();
}
}
@Override @Override
public Resource getDemand() { public Resource getDemand() {
readLock.lock(); readLock.lock();

View File

@ -91,6 +91,7 @@ public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
this.queueEntity = new PrivilegedEntity(EntityType.QUEUE, name); this.queueEntity = new PrivilegedEntity(EntityType.QUEUE, name);
this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf()); this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
this.parent = parent; this.parent = parent;
reinit(false);
} }
/** /**
@ -98,10 +99,19 @@ public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
* metrics. * metrics.
* This function is invoked when a new queue is created or reloading the * This function is invoked when a new queue is created or reloading the
* allocation configuration. * allocation configuration.
*
* @param recursive whether child queues should be reinitialized recursively
*/ */
public void init() { public void reinit(boolean recursive) {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
allocConf.initFSQueue(this, scheduler); allocConf.initFSQueue(this, scheduler);
updatePreemptionVariables();
if (recursive) {
for (FSQueue child : getChildQueues()) {
child.reinit(recursive);
}
}
} }
public String getName() { public String getName() {
@ -307,6 +317,7 @@ void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) {
this.fairSharePreemptionThreshold = fairSharePreemptionThreshold; this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
} }
@Override
public boolean isPreemptable() { public boolean isPreemptable() {
return preemptable; return preemptable;
} }
@ -329,7 +340,7 @@ public void update(Resource fairShare, boolean checkStarvation) {
* Update the min/fair share preemption timeouts, threshold and preemption * Update the min/fair share preemption timeouts, threshold and preemption
* disabled flag for this queue. * disabled flag for this queue.
*/ */
public void updatePreemptionVariables() { private void updatePreemptionVariables() {
// For min share timeout // For min share timeout
minSharePreemptionTimeout = scheduler.getAllocationConfiguration() minSharePreemptionTimeout = scheduler.getAllocationConfiguration()
.getMinSharePreemptionTimeout(getName()); .getMinSharePreemptionTimeout(getName());
@ -348,9 +359,15 @@ public void updatePreemptionVariables() {
if (fairSharePreemptionThreshold < 0 && parent != null) { if (fairSharePreemptionThreshold < 0 && parent != null) {
fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold(); fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold();
} }
// For option whether allow preemption from this queue // For option whether allow preemption from this queue.
preemptable = scheduler.getAllocationConfiguration() // If the parent is non-preemptable, this queue is non-preemptable as well,
.isPreemptable(getName()); // otherwise get the value from the allocation file.
if (parent != null && !parent.isPreemptable()) {
preemptable = false;
} else {
preemptable = scheduler.getAllocationConfiguration()
.isPreemptable(getName());
}
} }
/** /**

View File

@ -73,11 +73,12 @@ public FSParentQueue getRootQueue() {
public void initialize(Configuration conf) throws IOException, public void initialize(Configuration conf) throws IOException,
SAXException, AllocationConfigurationException, ParserConfigurationException { SAXException, AllocationConfigurationException, ParserConfigurationException {
rootQueue = new FSParentQueue("root", scheduler, null); rootQueue = new FSParentQueue("root", scheduler, null);
rootQueue.init();
queues.put(rootQueue.getName(), rootQueue); queues.put(rootQueue.getName(), rootQueue);
// Create the default queue // Create the default queue
getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true); getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
// Recursively reinitialize to propagate queue properties
rootQueue.reinit(true);
} }
/** /**
@ -281,11 +282,9 @@ private FSQueue createNewQueues(FSQueueType queueType,
queue = newParent; queue = newParent;
} }
queue.init();
parent.addChildQueue(queue); parent.addChildQueue(queue);
setChildResourceLimits(parent, queue, queueConf); setChildResourceLimits(parent, queue, queueConf);
queues.put(queue.getName(), queue); queues.put(queue.getName(), queue);
queue.updatePreemptionVariables();
// If we just created a leaf node, the newParent is null, but that's OK // If we just created a leaf node, the newParent is null, but that's OK
// because we only create a leaf node in the very last iteration. // because we only create a leaf node in the very last iteration.
@ -496,17 +495,11 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
} }
} }
} }
rootQueue.recomputeSteadyShares();
for (FSQueue queue : queues.values()) {
queue.init();
}
// Initialize all queues recursively
rootQueue.reinit(true);
// Update steady fair shares for all queues // Update steady fair shares for all queues
rootQueue.recomputeSteadyShares(); rootQueue.recomputeSteadyShares();
// Update the fair share preemption timeouts and preemption for all queues
// recursively
rootQueue.updatePreemptionVariables();
} }
/** /**

View File

@ -23,7 +23,6 @@
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/** /**
* A Schedulable represents an entity that can be scheduled such as an * A Schedulable represents an entity that can be scheduled such as an
@ -96,4 +95,11 @@ public interface Schedulable {
/** Assign a fair share to this Schedulable. */ /** Assign a fair share to this Schedulable. */
void setFairShare(Resource fairShare); void setFairShare(Resource fairShare);
/**
* Check whether the schedulable is preemptable.
* @return <code>true</code> if the schedulable is preemptable;
* <code>false</code> otherwise
*/
boolean isPreemptable();
} }

View File

@ -137,4 +137,9 @@ public Resource getMaxShare() {
@Override @Override
public void updateDemand() {} public void updateDemand() {}
@Override
public boolean isPreemptable() {
return true;
}
} }

View File

@ -81,7 +81,6 @@ public void testUpdateDemand() {
String queueName = "root.queue1"; String queueName = "root.queue1";
FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null); FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
schedulable.init();
schedulable.setMaxShare(maxResource); schedulable.setMaxShare(maxResource);
assertEquals(schedulable.getMetrics().getMaxApps(), Integer.MAX_VALUE); assertEquals(schedulable.getMetrics().getMaxApps(), Integer.MAX_VALUE);
assertEquals(schedulable.getMetrics().getSchedulingPolicy(), assertEquals(schedulable.getMetrics().getSchedulingPolicy(),

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.junit.After; import org.junit.After;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -166,6 +167,14 @@ private void setupCluster() throws IOException {
// Create and add two nodes to the cluster // Create and add two nodes to the cluster
addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE); addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE); addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
// Verify if child-1 and child-2 are preemptable
FSQueue child1 =
scheduler.getQueueManager().getQueue("nonpreemptable.child-1");
assertFalse(child1.isPreemptable());
FSQueue child2 =
scheduler.getQueueManager().getQueue("nonpreemptable.child-2");
assertFalse(child2.isPreemptable());
} }
private void sendEnoughNodeUpdatesToAssignFully() { private void sendEnoughNodeUpdatesToAssignFully() {
@ -197,6 +206,10 @@ private void submitApps(String queue1, String queue2)
scheduler.update(); scheduler.update();
sendEnoughNodeUpdatesToAssignFully(); sendEnoughNodeUpdatesToAssignFully();
assertEquals(8, greedyApp.getLiveContainers().size()); assertEquals(8, greedyApp.getLiveContainers().size());
// Verify preemptable for queue and app attempt
assertTrue(
scheduler.getQueueManager().getQueue(queue1).isPreemptable()
== greedyApp.isPreemptable());
// Create an app that takes up all the resources on the cluster // Create an app that takes up all the resources on the cluster
ApplicationAttemptId appAttemptId2 ApplicationAttemptId appAttemptId2

View File

@ -345,6 +345,11 @@ public String toString() {
", weights:" + weights + ", demand:" + demand + ", weights:" + weights + ", demand:" + demand +
", minShare:" + minShare + "}"; ", minShare:" + minShare + "}";
} }
@Override
public boolean isPreemptable() {
return true;
}
} }
} }