YARN-5830. FairScheduler: Avoid preempting AM containers. (Yufei Gu via kasha)

This commit is contained in:
Karthik Kambatla 2017-01-25 12:17:28 -08:00
parent b782bf2156
commit abedb8a9d8
3 changed files with 212 additions and 59 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -370,8 +371,8 @@ public int getNumContainers() {
}
/**
* Get the running containers in the node.
* @return List of running containers in the node.
* Get the containers running on the node.
* @return A copy of containers running on the node.
*/
public synchronized List<RMContainer> getCopiedListOfRunningContainers() {
List<RMContainer> result = new ArrayList<>(launchedContainers.size());
@ -381,6 +382,22 @@ public synchronized List<RMContainer> getCopiedListOfRunningContainers() {
return result;
}
/**
* Get the containers running on the node with AM containers at the end.
* @return A copy of running containers with AM containers at the end.
*/
public synchronized List<RMContainer> getRunningContainersWithAMsAtTheEnd() {
LinkedList<RMContainer> result = new LinkedList<>();
for (ContainerInfo info : launchedContainers.values()) {
if(info.container.isAMContainer()) {
result.addLast(info.container);
} else {
result.addFirst(info.container);
}
}
return result;
}
/**
* Get the container for the specified container ID.
* @param containerId The container ID

View File

@ -65,10 +65,10 @@ public void run() {
try{
starvedApp = context.getStarvedApps().take();
if (!Resources.isNone(starvedApp.getStarvation())) {
List<RMContainer> containers =
PreemptableContainers containers =
identifyContainersToPreempt(starvedApp);
if (containers != null) {
preemptContainers(containers);
preemptContainers(containers.containers);
}
}
} catch (InterruptedException e) {
@ -87,9 +87,9 @@ public void run() {
* @return list of containers to preempt to satisfy starvedApp, null if the
* app cannot be satisfied by preempting any running containers
*/
private List<RMContainer> identifyContainersToPreempt(
private PreemptableContainers identifyContainersToPreempt(
FSAppAttempt starvedApp) {
List<RMContainer> containers = new ArrayList<>(); // return value
PreemptableContainers bestContainers = null;
// Find the nodes that match the next resource request
SchedulingPlacementSet nextPs =
@ -107,9 +107,6 @@ private List<RMContainer> identifyContainersToPreempt(
// From the potential nodes, pick a node that has enough containers
// from apps over their fairshare
for (FSSchedulerNode node : potentialNodes) {
// Reset containers for the new node being considered.
containers.clear();
// TODO (YARN-5829): Attempt to reserve the node for starved app. The
// subsequent if-check needs to be reworked accordingly.
FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
@ -119,39 +116,81 @@ private List<RMContainer> identifyContainersToPreempt(
continue;
}
// Figure out list of containers to consider
List<RMContainer> containersToCheck =
node.getCopiedListOfRunningContainers();
containersToCheck.removeAll(node.getContainersForPreemption());
// Initialize potential with unallocated resources
Resource potential = Resources.clone(node.getUnallocatedResource());
for (RMContainer container : containersToCheck) {
FSAppAttempt app =
scheduler.getSchedulerApp(container.getApplicationAttemptId());
if (app.canContainerBePreempted(container)) {
// Flag container for preemption
containers.add(container);
Resources.addTo(potential, container.getAllocatedResource());
}
// Check if we have already identified enough containers
if (Resources.fitsIn(requestCapability, potential)) {
// Mark the containers as being considered for preemption on the node.
// Make sure the containers are subsequently removed by calling
// FSSchedulerNode#removeContainerForPreemption.
node.addContainersForPreemption(containers);
return containers;
int maxAMContainers = bestContainers == null ?
Integer.MAX_VALUE : bestContainers.numAMContainers;
PreemptableContainers preemptableContainers =
identifyContainersToPreemptOnNode(requestCapability, node,
maxAMContainers);
if (preemptableContainers != null) {
if (preemptableContainers.numAMContainers == 0) {
return preemptableContainers;
} else {
// TODO (YARN-5829): Unreserve the node for the starved app.
bestContainers = preemptableContainers;
}
}
}
return bestContainers;
}
/**
* Identify containers to preempt on a given node. Try to find a list with
* least AM containers to avoid preempting AM containers. This method returns
* a non-null set of containers only if the number of AM containers is less
* than maxAMContainers.
*
* @param request resource requested
* @param node the node to check
* @param maxAMContainers max allowed AM containers in the set
* @return list of preemptable containers with fewer AM containers than
* maxAMContainers if such a list exists; null otherwise.
*/
private PreemptableContainers identifyContainersToPreemptOnNode(
Resource request, FSSchedulerNode node, int maxAMContainers) {
PreemptableContainers preemptableContainers =
new PreemptableContainers(maxAMContainers);
// Figure out list of containers to consider
List<RMContainer> containersToCheck =
node.getRunningContainersWithAMsAtTheEnd();
containersToCheck.removeAll(node.getContainersForPreemption());
// Initialize potential with unallocated resources
Resource potential = Resources.clone(node.getUnallocatedResource());
for (RMContainer container : containersToCheck) {
FSAppAttempt app =
scheduler.getSchedulerApp(container.getApplicationAttemptId());
if (app.canContainerBePreempted(container)) {
// Flag container for preemption
if (!preemptableContainers.addContainer(container)) {
return null;
}
Resources.addTo(potential, container.getAllocatedResource());
}
// Check if we have already identified enough containers
if (Resources.fitsIn(request, potential)) {
return preemptableContainers;
} else {
// TODO (YARN-5829): Unreserve the node for the starved app.
}
}
return null;
}
private void preemptContainers(List<RMContainer> containers) {
// Mark the containers as being considered for preemption on the node.
// Make sure the containers are subsequently removed by calling
// FSSchedulerNode#removeContainerForPreemption.
if (containers.size() > 0) {
FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
.getNode(containers.get(0).getNodeId());
node.addContainersForPreemption(containers);
}
// Warn application about containers to be killed
for (RMContainer container : containers) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
@ -190,4 +229,38 @@ public void run() {
}
}
}
/**
* A class to track preemptable containers.
*/
private static class PreemptableContainers {
List<RMContainer> containers;
int numAMContainers;
int maxAMContainers;
PreemptableContainers(int maxAMContainers) {
containers = new ArrayList<>();
numAMContainers = 0;
this.maxAMContainers = maxAMContainers;
}
/**
* Add a container if the number of AM containers is less than
* maxAMContainers.
*
* @param container the container to add
* @return true if success; false otherwise
*/
private boolean addContainer(RMContainer container) {
if (container.isAMContainer()) {
numAMContainers++;
if (numAMContainers >= maxAMContainers) {
return false;
}
}
containers.add(container);
return true;
}
}
}

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -34,8 +36,10 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
* Tests to verify fairshare and minshare preemption, using parameterization.
@ -43,6 +47,7 @@
@RunWith(Parameterized.class)
public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
private static final int GB = 1024;
// Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
private static final int NODE_CAPACITY_MULTIPLE = 4;
@ -165,8 +170,8 @@ private void setupCluster() throws IOException {
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
// Create and add two nodes to the cluster
addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE);
addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE);
// Verify if child-1 and child-2 are preemptable
FSQueue child1 =
@ -187,6 +192,46 @@ private void sendEnoughNodeUpdatesToAssignFully() {
}
}
/**
* Submit an application to a given queue and take over the entire cluster.
*
* @param queueName queue name
*/
private void takeAllResource(String queueName) {
// Create an app that takes up all the resources on the cluster
ApplicationAttemptId appAttemptId
= createSchedulingRequest(GB, 1, queueName, "default",
NODE_CAPACITY_MULTIPLE * rmNodes.size());
greedyApp = scheduler.getSchedulerApp(appAttemptId);
scheduler.update();
sendEnoughNodeUpdatesToAssignFully();
assertEquals(8, greedyApp.getLiveContainers().size());
// Verify preemptable for queue and app attempt
assertTrue(
scheduler.getQueueManager().getQueue(queueName).isPreemptable()
== greedyApp.isPreemptable());
}
/**
* Submit an application to a given queue and preempt half resources of the
* cluster.
*
* @param queueName queue name
* @throws InterruptedException
* if any thread has interrupted the current thread.
*/
private void preemptHalfResources(String queueName)
throws InterruptedException {
ApplicationAttemptId appAttemptId
= createSchedulingRequest(2 * GB, 2, queueName, "default",
NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
starvingApp = scheduler.getSchedulerApp(appAttemptId);
// Sleep long enough to pass
Thread.sleep(10);
scheduler.update();
}
/**
* Submit application to {@code queue1} and take over the entire cluster.
* Submit application with larger containers to {@code queue2} that
@ -198,29 +243,8 @@ private void sendEnoughNodeUpdatesToAssignFully() {
*/
private void submitApps(String queue1, String queue2)
throws InterruptedException {
// Create an app that takes up all the resources on the cluster
ApplicationAttemptId appAttemptId1
= createSchedulingRequest(1024, 1, queue1, "default",
NODE_CAPACITY_MULTIPLE * rmNodes.size());
greedyApp = scheduler.getSchedulerApp(appAttemptId1);
scheduler.update();
sendEnoughNodeUpdatesToAssignFully();
assertEquals(8, greedyApp.getLiveContainers().size());
// Verify preemptable for queue and app attempt
assertTrue(
scheduler.getQueueManager().getQueue(queue1).isPreemptable()
== greedyApp.isPreemptable());
// Create an app that takes up all the resources on the cluster
ApplicationAttemptId appAttemptId2
= createSchedulingRequest(2048, 2, queue2, "default",
NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
starvingApp = scheduler.getSchedulerApp(appAttemptId2);
// Sleep long enough to pass
Thread.sleep(10);
scheduler.update();
takeAllResource(queue1);
preemptHalfResources(queue2);
}
private void verifyPreemption() throws InterruptedException {
@ -285,4 +309,43 @@ public void testNoPreemptionFromDisallowedQueue() throws Exception {
submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1");
verifyNoPreemption();
}
/**
* Set the number of AM containers for each node.
*
* @param numAMContainersPerNode number of AM containers per node
*/
private void setNumAMContainersPerNode(int numAMContainersPerNode) {
List<FSSchedulerNode> potentialNodes =
scheduler.getNodeTracker().getNodesByResourceName("*");
for (FSSchedulerNode node: potentialNodes) {
List<RMContainer> containers=
node.getCopiedListOfRunningContainers();
// Change the first numAMContainersPerNode out of 4 containers to
// AM containers
for (int i = 0; i < numAMContainersPerNode; i++) {
((RMContainerImpl) containers.get(i)).setAMContainer(true);
}
}
}
@Test
public void testPreemptionSelectNonAMContainer() throws Exception {
setupCluster();
takeAllResource("root.preemptable.child-1");
setNumAMContainersPerNode(2);
preemptHalfResources("root.preemptable.child-2");
verifyPreemption();
ArrayList<RMContainer> containers =
(ArrayList<RMContainer>) starvingApp.getLiveContainers();
String host0 = containers.get(0).getNodeId().getHost();
String host1 = containers.get(1).getNodeId().getHost();
// Each node provides two and only two non-AM containers to be preempted, so
// the preemption happens on both nodes.
assertTrue("Preempted containers should come from two different "
+ "nodes.", !host0.equals(host1));
}
}