YARN-7290. Method canContainerBePreempted can return true when it shouldn't. (Contributed by Steven Rand)

This commit is contained in:
Yufei Gu 2017-11-24 23:32:46 -08:00
parent 834e91ee91
commit 2bde3aedf1
3 changed files with 93 additions and 35 deletions

View File

@ -588,7 +588,8 @@ Set<ContainerId> getPreemptionContainerIds() {
} }
} }
boolean canContainerBePreempted(RMContainer container) { boolean canContainerBePreempted(RMContainer container,
Resource alreadyConsideringForPreemption) {
if (!isPreemptable()) { if (!isPreemptable()) {
return false; return false;
} }
@ -610,6 +611,15 @@ boolean canContainerBePreempted(RMContainer container) {
// Check if the app's allocation will be over its fairshare even // Check if the app's allocation will be over its fairshare even
// after preempting this container // after preempting this container
Resource usageAfterPreemption = getUsageAfterPreemptingContainer(
container.getAllocatedResource(),
alreadyConsideringForPreemption);
return !isUsageBelowShare(usageAfterPreemption, getFairShare());
}
private Resource getUsageAfterPreemptingContainer(Resource containerResources,
Resource alreadyConsideringForPreemption) {
Resource usageAfterPreemption = Resources.clone(getResourceUsage()); Resource usageAfterPreemption = Resources.clone(getResourceUsage());
// Subtract resources of containers already queued for preemption // Subtract resources of containers already queued for preemption
@ -617,10 +627,13 @@ boolean canContainerBePreempted(RMContainer container) {
Resources.subtractFrom(usageAfterPreemption, resourcesToBePreempted); Resources.subtractFrom(usageAfterPreemption, resourcesToBePreempted);
} }
// Subtract this container's allocation to compute usage after preemption // Subtract resources of this container and other containers of this app
Resources.subtractFrom( // that the FSPreemptionThread is already considering for preemption.
usageAfterPreemption, container.getAllocatedResource()); Resources.subtractFrom(usageAfterPreemption, containerResources);
return !isUsageBelowShare(usageAfterPreemption, getFairShare()); Resources.subtractFrom(usageAfterPreemption,
alreadyConsideringForPreemption);
return usageAfterPreemption;
} }
/** /**

View File

@ -19,7 +19,7 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -29,7 +29,9 @@
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -130,10 +132,21 @@ private List<RMContainer> identifyContainersToPreempt(
} }
} // End of iteration through nodes for one RR } // End of iteration through nodes for one RR
if (bestContainers != null && bestContainers.containers.size() > 0) { if (bestContainers != null) {
containersToPreempt.addAll(bestContainers.containers); List<RMContainer> containers = bestContainers.getAllContainers();
// Reserve the containers for the starved app if (containers.size() > 0) {
trackPreemptionsAgainstNode(bestContainers.containers, starvedApp); containersToPreempt.addAll(containers);
// Reserve the containers for the starved app
trackPreemptionsAgainstNode(containers, starvedApp);
// Warn application about containers to be killed
for (RMContainer container : containers) {
FSAppAttempt app = scheduler.getSchedulerApp(
container.getApplicationAttemptId());
LOG.info("Preempting container " + container +
" from queue " + app.getQueueName());
app.trackContainerForPreemption(container);
}
}
} }
} }
} // End of iteration over RRs } // End of iteration over RRs
@ -170,10 +183,12 @@ private PreemptableContainers identifyContainersToPreemptOnNode(
for (RMContainer container : containersToCheck) { for (RMContainer container : containersToCheck) {
FSAppAttempt app = FSAppAttempt app =
scheduler.getSchedulerApp(container.getApplicationAttemptId()); scheduler.getSchedulerApp(container.getApplicationAttemptId());
ApplicationId appId = app.getApplicationId();
if (app.canContainerBePreempted(container)) { if (app.canContainerBePreempted(container,
preemptableContainers.getResourcesToPreemptForApp(appId))) {
// Flag container for preemption // Flag container for preemption
if (!preemptableContainers.addContainer(container)) { if (!preemptableContainers.addContainer(container, appId)) {
return null; return null;
} }
@ -199,15 +214,6 @@ private void trackPreemptionsAgainstNode(List<RMContainer> containers,
} }
private void preemptContainers(List<RMContainer> containers) { private void preemptContainers(List<RMContainer> containers) {
// Warn application about containers to be killed
for (RMContainer container : containers) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
LOG.info("Preempting container " + container +
" from queue " + app.getQueueName());
app.trackContainerForPreemption(container);
}
// Schedule timer task to kill containers // Schedule timer task to kill containers
preemptionTimer.schedule( preemptionTimer.schedule(
new PreemptContainersTask(containers), warnTimeBeforeKill); new PreemptContainersTask(containers), warnTimeBeforeKill);
@ -237,14 +243,14 @@ public void run() {
* A class to track preemptable containers. * A class to track preemptable containers.
*/ */
private static class PreemptableContainers { private static class PreemptableContainers {
List<RMContainer> containers; Map<ApplicationId, List<RMContainer>> containersByApp;
int numAMContainers; int numAMContainers;
int maxAMContainers; int maxAMContainers;
PreemptableContainers(int maxAMContainers) { PreemptableContainers(int maxAMContainers) {
containers = new ArrayList<>();
numAMContainers = 0; numAMContainers = 0;
this.maxAMContainers = maxAMContainers; this.maxAMContainers = maxAMContainers;
this.containersByApp = new HashMap<>();
} }
/** /**
@ -254,7 +260,7 @@ private static class PreemptableContainers {
* @param container the container to add * @param container the container to add
* @return true if success; false otherwise * @return true if success; false otherwise
*/ */
private boolean addContainer(RMContainer container) { private boolean addContainer(RMContainer container, ApplicationId appId) {
if (container.isAMContainer()) { if (container.isAMContainer()) {
numAMContainers++; numAMContainers++;
if (numAMContainers >= maxAMContainers) { if (numAMContainers >= maxAMContainers) {
@ -262,8 +268,30 @@ private boolean addContainer(RMContainer container) {
} }
} }
containers.add(container); if (!containersByApp.containsKey(appId)) {
containersByApp.put(appId, new ArrayList<>());
}
containersByApp.get(appId).add(container);
return true; return true;
} }
private List<RMContainer> getAllContainers() {
List<RMContainer> allContainers = new ArrayList<>();
for (List<RMContainer> containersForApp : containersByApp.values()) {
allContainers.addAll(containersForApp);
}
return allContainers;
}
private Resource getResourcesToPreemptForApp(ApplicationId appId) {
Resource resourcesToPreempt = Resources.createResource(0, 0);
if (containersByApp.containsKey(appId)) {
for (RMContainer container : containersByApp.get(appId)) {
Resources.addTo(resourcesToPreempt, container.getAllocatedResource());
}
}
return resourcesToPreempt;
}
} }
} }

View File

@ -278,11 +278,12 @@ private void submitApps(String queue1, String queue2)
preemptHalfResources(queue2); preemptHalfResources(queue2);
} }
private void verifyPreemption(int numStarvedAppContainers) private void verifyPreemption(int numStarvedAppContainers,
int numGreedyAppContainers)
throws InterruptedException { throws InterruptedException {
// Sleep long enough for four containers to be preempted. // Sleep long enough for four containers to be preempted.
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
if (greedyApp.getLiveContainers().size() == 2 * numStarvedAppContainers) { if (greedyApp.getLiveContainers().size() == numGreedyAppContainers) {
break; break;
} }
Thread.sleep(10); Thread.sleep(10);
@ -290,12 +291,12 @@ private void verifyPreemption(int numStarvedAppContainers)
// Post preemption, verify the greedyApp has the correct # of containers. // Post preemption, verify the greedyApp has the correct # of containers.
assertEquals("Incorrect # of containers on the greedy app", assertEquals("Incorrect # of containers on the greedy app",
2 * numStarvedAppContainers, greedyApp.getLiveContainers().size()); numGreedyAppContainers, greedyApp.getLiveContainers().size());
// Verify the queue metrics are set appropriately. The greedyApp started // Verify the queue metrics are set appropriately. The greedyApp started
// with 8 1GB, 1vcore containers. // with 8 1GB, 1vcore containers.
assertEquals("Incorrect # of preempted containers in QueueMetrics", assertEquals("Incorrect # of preempted containers in QueueMetrics",
8 - 2 * numStarvedAppContainers, 8 - numGreedyAppContainers,
greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers()); greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers());
// Verify the node is reserved for the starvingApp // Verify the node is reserved for the starvingApp
@ -340,7 +341,7 @@ public void testPreemptionWithinSameLeafQueue() throws Exception {
String queue = "root.preemptable.child-1"; String queue = "root.preemptable.child-1";
submitApps(queue, queue); submitApps(queue, queue);
if (fairsharePreemption) { if (fairsharePreemption) {
verifyPreemption(2); verifyPreemption(2, 4);
} else { } else {
verifyNoPreemption(); verifyNoPreemption();
} }
@ -349,13 +350,13 @@ public void testPreemptionWithinSameLeafQueue() throws Exception {
@Test @Test
public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
submitApps("root.preemptable.child-1", "root.preemptable.child-2"); submitApps("root.preemptable.child-1", "root.preemptable.child-2");
verifyPreemption(2); verifyPreemption(2, 4);
} }
@Test @Test
public void testPreemptionBetweenNonSiblingQueues() throws Exception { public void testPreemptionBetweenNonSiblingQueues() throws Exception {
submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1"); submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
verifyPreemption(2); verifyPreemption(2, 4);
} }
@Test @Test
@ -389,7 +390,7 @@ public void testPreemptionSelectNonAMContainer() throws Exception {
setNumAMContainersPerNode(2); setNumAMContainersPerNode(2);
preemptHalfResources("root.preemptable.child-2"); preemptHalfResources("root.preemptable.child-2");
verifyPreemption(2); verifyPreemption(2, 4);
ArrayList<RMContainer> containers = ArrayList<RMContainer> containers =
(ArrayList<RMContainer>) starvingApp.getLiveContainers(); (ArrayList<RMContainer>) starvingApp.getLiveContainers();
@ -401,6 +402,22 @@ public void testPreemptionSelectNonAMContainer() throws Exception {
+ "nodes.", !host0.equals(host1)); + "nodes.", !host0.equals(host1));
} }
@Test
public void testAppNotPreemptedBelowFairShare() throws Exception {
takeAllResources("root.preemptable.child-1");
tryPreemptMoreThanFairShare("root.preemptable.child-2");
}
private void tryPreemptMoreThanFairShare(String queueName)
throws InterruptedException {
ApplicationAttemptId appAttemptId
= createSchedulingRequest(3 * GB, 3, queueName, "default",
NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
starvingApp = scheduler.getSchedulerApp(appAttemptId);
verifyPreemption(1, 5);
}
@Test @Test
public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare() public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare()
throws InterruptedException { throws InterruptedException {
@ -414,10 +431,10 @@ public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare()
// Submit a job so half the resources go to parent's sibling // Submit a job so half the resources go to parent's sibling
preemptHalfResources("root.preemptable-sibling"); preemptHalfResources("root.preemptable-sibling");
verifyPreemption(2); verifyPreemption(2, 4);
// Submit a job to the child's sibling to force preemption from the child // Submit a job to the child's sibling to force preemption from the child
preemptHalfResources("root.preemptable.child-2"); preemptHalfResources("root.preemptable.child-2");
verifyPreemption(1); verifyPreemption(1, 2);
} }
} }