diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java
index e9de05227e..86261dea19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java
@@ -55,12 +55,16 @@ public static ResourceOption newInstance(Resource resource,
* Get timeout for tolerant of resource over-commitment
* Note: negative value means no timeout so that allocated containers will
* keep running until the end even under resource over-commitment cases.
- * @return overCommitTimeout of the ResourceOption
+ * @return overCommitTimeout of the ResourceOption in milliseconds.
*/
@Private
@Evolving
public abstract int getOverCommitTimeout();
-
+
+ /**
+ * Set the overcommit timeout.
+ * @param overCommitTimeout Timeout in ms. Negative means no timeout.
+ */
@Private
@Evolving
protected abstract void setOverCommitTimeout(int overCommitTimeout);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index f021ebb5dd..012f58a369 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -675,6 +675,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
if (capability != null) {
nodeHeartBeatResponse.setResource(capability);
}
+ // Check if we got an event (AdminService) that updated the resources
+ if (rmNode.isUpdatedCapability()) {
+ nodeHeartBeatResponse.setResource(rmNode.getTotalCapability());
+ rmNode.resetUpdatedCapability();
+ }
// 7. Send Container Queuing Limits back to the Node. This will be used by
// the node to truncate the number of Containers queued for execution.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index c77d29c89a..d3b515e824 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -104,6 +104,17 @@ public interface RMNode {
*/
public Resource getTotalCapability();
+ /**
+ * If the total available resources has been updated.
+ * @return If the capability has been updated.
+ */
+ boolean isUpdatedCapability();
+
+ /**
+ * Mark that the updated event has been processed.
+ */
+ void resetUpdatedCapability();
+
/**
* the aggregated resource utilization of the containers.
* @return the aggregated resource utilization of the containers.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 37f3a372e5..e94dfe0d86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -126,6 +126,7 @@ public class RMNodeImpl implements RMNode, EventHandler {
/* Snapshot of total resources before receiving decommissioning command */
private volatile Resource originalTotalCapability;
private volatile Resource totalCapability;
+ private volatile boolean updatedCapability = false;
private final Node node;
private String healthReport;
@@ -456,6 +457,16 @@ public Resource getTotalCapability() {
return this.totalCapability;
}
+ @Override
+ public boolean isUpdatedCapability() {
+ return this.updatedCapability;
+ }
+
+ @Override
+ public void resetUpdatedCapability() {
+ this.updatedCapability = false;
+ }
+
@Override
public String getRackName() {
return node.getNetworkLocation();
@@ -814,11 +825,12 @@ private static void handleRunningAppOnNode(RMNodeImpl rmNode,
.handle(new RMAppRunningOnNodeEvent(appId, nodeId));
}
- private static void updateNodeResourceFromEvent(RMNodeImpl rmNode,
- RMNodeResourceUpdateEvent event){
- ResourceOption resourceOption = event.getResourceOption();
- // Set resource on RMNode
- rmNode.totalCapability = resourceOption.getResource();
+ private static void updateNodeResourceFromEvent(RMNodeImpl rmNode,
+ RMNodeResourceUpdateEvent event){
+ ResourceOption resourceOption = event.getResourceOption();
+ // Set resource on RMNode
+ rmNode.totalCapability = resourceOption.getResource();
+ rmNode.updatedCapability = true;
}
private static NodeHealthStatus updateRMNodeFromStatusEvents(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 5168b34d6c..5fd064b7e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -92,13 +92,16 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -116,6 +119,8 @@ public abstract class AbstractYarnScheduler
private static final Logger LOG =
LoggerFactory.getLogger(AbstractYarnScheduler.class);
+ private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
protected final ClusterNodeTracker nodeTracker =
new ClusterNodeTracker<>();
@@ -809,6 +814,7 @@ public void updateNodeResource(RMNode nm,
try {
SchedulerNode node = getSchedulerNode(nm.getNodeID());
Resource newResource = resourceOption.getResource();
+ final int timeout = resourceOption.getOverCommitTimeout();
Resource oldResource = node.getTotalResource();
if (!oldResource.equals(newResource)) {
// Notify NodeLabelsManager about this change
@@ -816,13 +822,15 @@ public void updateNodeResource(RMNode nm,
newResource);
// Log resource change
- LOG.info("Update resource on node: " + node.getNodeName() + " from: "
- + oldResource + ", to: " + newResource);
+ LOG.info("Update resource on node: {} from: {}, to: {} in {} ms",
+ node.getNodeName(), oldResource, newResource, timeout);
nodeTracker.removeNode(nm.getNodeID());
// update resource to node
node.updateTotalResource(newResource);
+ node.setOvercommitTimeOut(timeout);
+ signalContainersIfOvercommitted(node, timeout == 0);
nodeTracker.addNode((N) node);
} else{
@@ -1165,6 +1173,10 @@ protected void nodeUpdate(RMNode nm) {
updateNodeResourceUtilization(nm, schedulerNode);
}
+ if (schedulerNode != null) {
+ signalContainersIfOvercommitted(schedulerNode, true);
+ }
+
// Now node data structures are up-to-date and ready for scheduling.
if(LOG.isDebugEnabled()) {
LOG.debug(
@@ -1174,6 +1186,67 @@ protected void nodeUpdate(RMNode nm) {
}
}
+ /**
+ * Check if the node is overcommitted and needs to remove containers. If
+ * it is overcommitted, it will kill or preempt (notify the AM to stop them)
+ * containers. It also takes into account the overcommit timeout. It only
+ * notifies the application to preempt a container if the timeout hasn't
+ * passed. If the timeout has passed, it tries to kill the containers. If
+ * there is no timeout, it doesn't do anything and just prevents new
+ * allocations.
+ *
+ * This action is taken when the change of resources happens (to preempt
+ * containers or killing them if specified) or when the node heart beats
+ * (for killing only).
+ *
+ * @param schedulerNode The node to check whether is overcommitted.
+ * @param kill If the container should be killed or just notify the AM.
+ */
+ private void signalContainersIfOvercommitted(
+ SchedulerNode schedulerNode, boolean kill) {
+
+ // If there is no time out, we don't do anything
+ if (!schedulerNode.isOvercommitTimeOutSet()) {
+ return;
+ }
+
+ SchedulerEventType eventType =
+ SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
+ if (kill) {
+ eventType = SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
+
+ // If it hasn't timed out yet, don't kill
+ if (!schedulerNode.isOvercommitTimedOut()) {
+ return;
+ }
+ }
+
+ // Check if the node is overcommitted (negative resources)
+ ResourceCalculator rc = getResourceCalculator();
+ Resource unallocated = Resource.newInstance(
+ schedulerNode.getUnallocatedResource());
+ if (Resources.fitsIn(rc, ZERO_RESOURCE, unallocated)) {
+ return;
+ }
+
+ LOG.info("{} is overcommitted ({}), preempt/kill containers",
+ schedulerNode.getNodeID(), unallocated);
+ for (RMContainer container : schedulerNode.getContainersToKill()) {
+ LOG.info("Send {} to {} to free up {}", eventType,
+ container.getContainerId(), container.getAllocatedResource());
+ ApplicationAttemptId appId = container.getApplicationAttemptId();
+ ContainerPreemptEvent event =
+ new ContainerPreemptEvent(appId, container, eventType);
+ this.rmContext.getDispatcher().getEventHandler().handle(event);
+ Resources.addTo(unallocated, container.getAllocatedResource());
+
+ if (Resources.fitsIn(rc, ZERO_RESOURCE, unallocated)) {
+ LOG.debug("Enough unallocated resources {}", unallocated);
+ break;
+ }
+ }
+ }
+
@Override
public Resource getNormalizedResource(Resource requestedResource,
Resource maxResourceCapability) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index e36bc64b19..ef03aadf1a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.List;
@@ -26,6 +27,7 @@
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.builder.CompareToBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -70,6 +72,8 @@ public abstract class SchedulerNode {
ResourceUtilization.newInstance(0, 0, 0f);
private volatile ResourceUtilization nodeUtilization =
ResourceUtilization.newInstance(0, 0, 0f);
+ /** Time stamp for overcommitted resources to time out. */
+ private long overcommitTimeout = -1;
/* set of containers that are allocated containers */
private final Map launchedContainers =
@@ -119,6 +123,38 @@ public synchronized void updateTotalResource(Resource resource){
this.allocatedResource);
}
+ /**
+ * Set the timeout for the node to stop overcommitting the resources. After
+ * this time the scheduler will start killing containers until the resources
+ * are not overcommitted anymore. This may reset a previous timeout.
+ * @param timeOut Time out in milliseconds.
+ */
+ public synchronized void setOvercommitTimeOut(long timeOut) {
+ if (timeOut >= 0) {
+ if (this.overcommitTimeout != -1) {
+ LOG.debug("The overcommit timeout for {} was already set to {}",
+ getNodeID(), this.overcommitTimeout);
+ }
+ this.overcommitTimeout = Time.now() + timeOut;
+ }
+ }
+
+ /**
+ * Check if the time out has passed.
+ * @return If the node is overcommitted.
+ */
+ public synchronized boolean isOvercommitTimedOut() {
+ return this.overcommitTimeout >= 0 && Time.now() >= this.overcommitTimeout;
+ }
+
+ /**
+ * Check if the node has a time out for overcommit resources.
+ * @return If the node has a time out for overcommit resources.
+ */
+ public synchronized boolean isOvercommitTimeOutSet() {
+ return this.overcommitTimeout >= 0;
+ }
+
/**
* Get the ID of the node which contains both its hostname and port.
* @return The ID of the node.
@@ -372,6 +408,36 @@ public synchronized List getRunningContainersWithAMsAtTheEnd() {
return result;
}
+ /**
+ * Get the containers running on the node ordered by which to kill first. It
+ * tries to kill AMs last, then GUARANTEED containers, and it kills
+ * OPPORTUNISTIC first. If the same time, it uses the creation time.
+ * @return A copy of the running containers ordered by which to kill first.
+ */
+ public List getContainersToKill() {
+ List result = getLaunchedContainers();
+ Collections.sort(result, (c1, c2) -> {
+ return new CompareToBuilder()
+ .append(c1.isAMContainer(), c2.isAMContainer())
+ .append(c2.getExecutionType(), c1.getExecutionType()) // reversed
+ .append(c2.getCreationTime(), c1.getCreationTime()) // reversed
+ .toComparison();
+ });
+ return result;
+ }
+
+ /**
+ * Get the launched containers in the node.
+ * @return List of launched containers.
+ */
+ protected synchronized List getLaunchedContainers() {
+ List result = new ArrayList<>();
+ for (ContainerInfo info : launchedContainers.values()) {
+ result.add(info.container);
+ }
+ return result;
+ }
+
/**
* Get the container for the specified container ID.
* @param containerId The container ID
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 166bb487e0..151a7ab086 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -87,6 +87,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -1288,8 +1289,32 @@ public void handle(SchedulerEvent event) {
SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE);
break;
+ case MARK_CONTAINER_FOR_PREEMPTION:
+ if (!(event instanceof ContainerPreemptEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ ContainerPreemptEvent preemptContainerEvent =
+ (ContainerPreemptEvent)event;
+ ApplicationAttemptId appId = preemptContainerEvent.getAppId();
+ RMContainer preemptedContainer = preemptContainerEvent.getContainer();
+ FSAppAttempt app = getApplicationAttempt(appId);
+ app.trackContainerForPreemption(preemptedContainer);
+ break;
+ case MARK_CONTAINER_FOR_KILLABLE:
+ if (!(event instanceof ContainerPreemptEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ ContainerPreemptEvent containerKillableEvent =
+ (ContainerPreemptEvent)event;
+ RMContainer killableContainer = containerKillableEvent.getContainer();
+ completedContainer(killableContainer,
+ SchedulerUtils.createPreemptedContainerStatus(
+ killableContainer.getContainerId(),
+ SchedulerUtils.PREEMPTED_CONTAINER),
+ RMContainerEventType.KILL);
+ break;
default:
- LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
+ LOG.error("Unknown event arrived at FairScheduler: {}", event);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index c0af0413a0..3b72ca1c0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -190,6 +190,15 @@ public Resource getTotalCapability() {
return this.perNode;
}
+ @Override
+ public boolean isUpdatedCapability() {
+ return false;
+ }
+
+ @Override
+ public void resetUpdatedCapability() {
+ }
+
@Override
public String getRackName() {
return this.rackName;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
index ba409b1386..b58c7a411c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -18,12 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,6 +37,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.*;
@@ -1018,4 +1021,94 @@ public void testContainerRecoveredByNode() throws Exception {
System.out.println("Stopping testContainerRecoveredByNode");
}
}
+
+ /**
+ * Test the order we get the containers to kill. It should respect the order
+ * described in {@link SchedulerNode#getContainersToKill()}.
+ */
+ @Test
+ public void testGetRunningContainersToKill() {
+ final SchedulerNode node = new MockSchedulerNode();
+ assertEquals(Collections.emptyList(), node.getContainersToKill());
+
+ // AM0
+ RMContainer am0 = newMockRMContainer(
+ true, ExecutionType.GUARANTEED, "AM0");
+ node.allocateContainer(am0);
+ assertEquals(Arrays.asList(am0), node.getContainersToKill());
+
+ // OPPORTUNISTIC0, AM0
+ RMContainer opp0 = newMockRMContainer(
+ false, ExecutionType.OPPORTUNISTIC, "OPPORTUNISTIC0");
+ node.allocateContainer(opp0);
+ assertEquals(Arrays.asList(opp0, am0), node.getContainersToKill());
+
+ // OPPORTUNISTIC0, GUARANTEED0, AM0
+ RMContainer regular0 = newMockRMContainer(
+ false, ExecutionType.GUARANTEED, "GUARANTEED0");
+ node.allocateContainer(regular0);
+ assertEquals(Arrays.asList(opp0, regular0, am0),
+ node.getContainersToKill());
+
+ // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED0, AM0
+ RMContainer opp1 = newMockRMContainer(
+ false, ExecutionType.OPPORTUNISTIC, "OPPORTUNISTIC1");
+ node.allocateContainer(opp1);
+ assertEquals(Arrays.asList(opp1, opp0, regular0, am0),
+ node.getContainersToKill());
+
+ // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED0, AM1, AM0
+ RMContainer am1 = newMockRMContainer(
+ true, ExecutionType.GUARANTEED, "AM1");
+ node.allocateContainer(am1);
+ assertEquals(Arrays.asList(opp1, opp0, regular0, am1, am0),
+ node.getContainersToKill());
+
+ // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED1, GUARANTEED0, AM1, AM0
+ RMContainer regular1 = newMockRMContainer(
+ false, ExecutionType.GUARANTEED, "GUARANTEED1");
+ node.allocateContainer(regular1);
+ assertEquals(Arrays.asList(opp1, opp0, regular1, regular0, am1, am0),
+ node.getContainersToKill());
+ }
+
+ private static RMContainer newMockRMContainer(boolean isAMContainer,
+ ExecutionType executionType, String name) {
+ RMContainer container = mock(RMContainer.class);
+ when(container.isAMContainer()).thenReturn(isAMContainer);
+ when(container.getExecutionType()).thenReturn(executionType);
+ when(container.getCreationTime()).thenReturn(Time.now());
+ when(container.toString()).thenReturn(name);
+ return container;
+ }
+
+ /**
+ * SchedulerNode mock to test launching containers.
+ */
+ class MockSchedulerNode extends SchedulerNode {
+ private final List containers = new ArrayList<>();
+
+ MockSchedulerNode() {
+ super(MockNodes.newNodeInfo(0, Resource.newInstance(1, 1)), false);
+ }
+
+ @Override
+ protected List getLaunchedContainers() {
+ return containers;
+ }
+
+ @Override
+ public void allocateContainer(RMContainer rmContainer) {
+ containers.add(rmContainer);
+ // Shuffle for testing
+ Collections.shuffle(containers);
+ }
+
+ @Override
+ public void reserveResource(SchedulerApplicationAttempt attempt,
+ SchedulerRequestKey schedulerKey, RMContainer container) {}
+
+ @Override
+ public void unreserveResource(SchedulerApplicationAttempt attempt) {}
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java
new file mode 100644
index 0000000000..cc665fb902
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java
@@ -0,0 +1,723 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.PreemptionContainer;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.TestResourceTrackerService.NullNodeAttributeStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Generic tests for overcommitting resources. This needs to be instantiated
+ * with a scheduler ({@link YarnConfiguration.RM_SCHEDULER}).
+ *
+ * If reducing the amount of resources leads to overcommitting (negative
+ * available resources), the scheduler will select containers to make room.
+ *
+ * - If there is no timeout (<0), it doesn't kill or preempt surplus
+ * containers.
+ * - If the timeout is 0, it kills the surplus containers immediately.
+ * - If the timeout is larger than 0, it first asks the application to
+ * preempt those containers and after the timeout passes, it kills the surplus
+ * containers.
+ *
+ */
+public abstract class TestSchedulerOvercommit {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestSchedulerOvercommit.class);
+
+ /** 1 GB in MB. */
+ protected final static int GB = 1024;
+
+ /** We do scheduling and heart beat every 200ms. */
+ protected static final int INTERVAL = 200;
+
+
+ /** Mock Resource Manager. */
+ private MockRM rm;
+ /** Scheduler for the Mock Resource Manager.*/
+ private ResourceScheduler scheduler;
+
+ /** Node Manager running containers. */
+ private MockNM nm;
+ private NodeId nmId;
+
+ /** Application to allocate containers. */
+ private RMAppAttempt attempt;
+ private MockAM am;
+
+ /**
+ * Setup the cluster with: an RM, a NM and an application for test.
+ * @throws Exception If it cannot set up the cluster.
+ */
+ @Before
+ public void setup() throws Exception {
+ LOG.info("Setting up the test cluster...");
+
+ // Start the Resource Manager
+ Configuration conf = getConfiguration();
+ rm = new MockRM(conf);
+ rm.start();
+ scheduler = rm.getResourceScheduler();
+
+ // Add a Node Manager with 4GB
+ nm = rm.registerNode("127.0.0.1:1234", 4 * GB);
+ nmId = nm.getNodeId();
+
+ // Start an AM with 2GB
+ RMApp app = rm.submitApp(2 * GB);
+ nm.nodeHeartbeat(true);
+ attempt = app.getCurrentAppAttempt();
+ am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
+
+ // After allocation, used 2GB and remaining 2GB on the NM
+ assertMemory(scheduler, nmId, 2 * GB, 2 * GB);
+ nm.nodeHeartbeat(true);
+ }
+
+ /**
+ * Get the configuration for the scheduler. This is used when setting up the
+ * Resource Manager and should setup the scheduler (e.g., Capacity Scheduler
+ * or Fair Scheduler). It needs to set the configuration with
+ * {@link YarnConfiguration.RM_SCHEDULER}.
+ * @return Configuration for the scheduler.
+ */
+ protected Configuration getConfiguration() {
+ Configuration conf = new YarnConfiguration();
+
+ // Prevent loading node attributes
+ conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+ NullNodeAttributeStore.class, NodeAttributeStore.class);
+
+ return conf;
+ }
+
+ /**
+ * Stops the default application and the RM (with the scheduler).
+ * @throws Exception If it cannot stop the cluster.
+ */
+ @After
+ public void cleanup() throws Exception {
+ LOG.info("Cleaning up the test cluster...");
+
+ if (am != null) {
+ am.unregisterAppAttempt();
+ am = null;
+ }
+ if (rm != null) {
+ rm.drainEvents();
+ rm.stop();
+ rm = null;
+ }
+ }
+
+
+ /**
+ * Reducing the resources with no timeout should prevent new containers
+ * but wait for the current ones without killing.
+ */
+ @Test
+ public void testReduceNoTimeout() throws Exception {
+
+ // New 2GB container should give 4 GB used (2+2) and 0 GB available
+ Container c1 = createContainer(am, 2 * GB);
+ assertMemory(scheduler, nmId, 4 * GB, 0);
+
+ // Update node resource to 2 GB, so resource is over-consumed
+ updateNodeResource(rm, nmId, 2 * GB, 2, -1);
+ // The used resource should still be 4 GB and negative available resource
+ waitMemory(scheduler, nmId, 4 * GB, -2 * GB, INTERVAL, 2 * 1000);
+ // Check that the NM got the updated resources
+ nm.nodeHeartbeat(true);
+ assertEquals(2 * GB, nm.getCapability().getMemorySize());
+
+ // Check that we did not get a preemption request
+ assertNoPreemption(am.schedule().getPreemptionMessage());
+
+ // Check container can complete successfully with resource over-commitment
+ ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
+ c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource());
+ nm.containerStatus(containerStatus);
+
+ LOG.info("Waiting for container to be finished for app...");
+ GenericTestUtils.waitFor(
+ () -> attempt.getJustFinishedContainers().size() == 1,
+ INTERVAL, 2 * 1000);
+ assertEquals(1, am.schedule().getCompletedContainersStatuses().size());
+ assertMemory(scheduler, nmId, 2 * GB, 0);
+
+ // Verify no NPE is trigger in schedule after resource is updated
+ am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1);
+ AllocateResponse allocResponse2 = am.schedule();
+ assertTrue("Shouldn't have enough resource to allocate containers",
+ allocResponse2.getAllocatedContainers().isEmpty());
+ // Try 10 times as scheduling is an async process
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(INTERVAL);
+ allocResponse2 = am.schedule();
+ assertTrue("Shouldn't have enough resource to allocate containers",
+ allocResponse2.getAllocatedContainers().isEmpty());
+ }
+ }
+
+ /**
+ * Changing resources multiples times without waiting for the
+ * timeout.
+ */
+ @Test
+ public void testChangeResourcesNoTimeout() throws Exception {
+ waitMemory(scheduler, nmId, 2 * GB, 2 * GB, 100, 2 * 1000);
+
+ updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+ waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 2 * 1000);
+
+ updateNodeResource(rm, nmId, 0 * GB, 2, -1);
+ waitMemory(scheduler, nmId, 2 * GB, -2 * GB, 100, 2 * 1000);
+
+ updateNodeResource(rm, nmId, 4 * GB, 2, -1);
+ waitMemory(scheduler, nmId, 2 * GB, 2 * GB, 100, 2 * 1000);
+
+ // The application should still be running without issues.
+ assertEquals(RMAppAttemptState.RUNNING, attempt.getState());
+ }
+
+ /**
+ * Reducing the resources with 0 time out kills the container right away.
+ */
+ @Test
+ public void testReduceKill() throws Exception {
+
+ Container container = createContainer(am, 2 * GB);
+ assertMemory(scheduler, nmId, 4 * GB, 0);
+
+ // Reducing to 2GB should kill the container
+ long t0 = Time.now();
+ updateNodeResource(rm, nmId, 2 * GB, 2, 0);
+ waitMemory(scheduler, nm, 2 * GB, 0 * GB, INTERVAL, 2 * INTERVAL);
+
+ // Check that the new container was killed
+ List completedContainers =
+ am.schedule().getCompletedContainersStatuses();
+ assertEquals(1, completedContainers.size());
+ ContainerStatus containerStatus = completedContainers.get(0);
+ assertContainerKilled(container.getId(), containerStatus);
+
+ // It should kill the containers right away
+ assertTime(0, Time.now() - t0);
+ }
+
+ /**
+ * Reducing the resources with a time out should first preempt and then kill.
+ */
+ @Test
+ public void testReducePreemptAndKill() throws Exception {
+
+ Container container = createContainer(am, 2 * GB);
+ assertMemory(scheduler, nmId, 4 * GB, 0);
+
+ // We give an overcommit time out of 2 seconds
+ final int timeout = (int)TimeUnit.SECONDS.toMillis(2);
+
+ // Reducing to 2GB should first preempt the container
+ long t0 = Time.now();
+ updateNodeResource(rm, nmId, 2 * GB, 2, timeout);
+ waitMemory(scheduler, nm, 4 * GB, -2 * GB, INTERVAL, timeout);
+
+ // We should receive a notification to preempt the container
+ PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage();
+ assertPreemption(container.getId(), preemptMsg);
+
+ // Wait until the container is killed
+ waitMemory(scheduler, nm, 2 * GB, 0, INTERVAL, timeout + 2 * INTERVAL);
+
+ // Check that the container was killed
+ List completedContainers =
+ am.schedule().getCompletedContainersStatuses();
+ assertEquals(1, completedContainers.size());
+ ContainerStatus containerStatus = completedContainers.get(0);
+ assertContainerKilled(container.getId(), containerStatus);
+
+ // Check how long it took to kill the container
+ assertTime(timeout, Time.now() - t0);
+ }
+
+ /**
+ * Reducing the resources (with a time out) triggers a preemption message to
+ * the AM right away. Then, increasing them again should prevent the killing
+ * when the time out would have happened.
+ */
+ @Test
+ public void testReducePreemptAndCancel() throws Exception {
+
+ Container container = createContainer(am, 2 * GB);
+ assertMemory(scheduler, nmId, 4 * GB, 0);
+
+ // We give an overcommit time out of 2 seconds
+ final int timeout = (int)TimeUnit.SECONDS.toMillis(1);
+
+ // Reducing to 2GB should first preempt the container
+ updateNodeResource(rm, nmId, 2 * GB, 2, timeout);
+ waitMemory(scheduler, nm, 4 * GB, -2 * GB, INTERVAL, timeout);
+
+ // We should receive a notification to preempt the container
+ PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage();
+ assertPreemption(container.getId(), preemptMsg);
+
+ // Increase the resources again
+ updateNodeResource(rm, nmId, 4 * GB, 2, timeout);
+ waitMemory(scheduler, nm, 4 * GB, 0, INTERVAL, timeout);
+
+ long t0 = Time.now();
+ while (Time.now() - t0 < TimeUnit.SECONDS.toMillis(2)) {
+ nm.nodeHeartbeat(true);
+ AllocateResponse allocation = am.schedule();
+ assertNoPreemption(allocation.getPreemptionMessage());
+ assertTrue(allocation.getCompletedContainersStatuses().isEmpty());
+ Thread.sleep(INTERVAL);
+ }
+
+ // Check that the containers are still running
+ assertMemory(scheduler, nmId, 4 * GB, 0);
+ assertEquals(2, scheduler.getNodeReport(nmId).getNumContainers());
+ }
+
+ /**
+ * Test the order we kill multiple containers.
+ * It initially has: AM(2GB), C1(1GB), C2(1GB), AM2(2GB), and C3(2GB).
+ * It should kill in this order: C3, C2, C1, AM2, and AM1.
+ */
+ @Test
+ public void testKillMultipleContainers() throws Exception {
+
+ updateNodeResource(rm, nmId, 8 * GB, 6, -1);
+ waitMemory(scheduler, nmId, 2 * GB, 6 * GB, 200, 5 * 1000);
+
+ // Start 2 containers with 1 GB each
+ Container c1 = createContainer(am, 1 * GB);
+ Container c2 = createContainer(am, 1 * GB);
+ waitMemory(scheduler, nmId, 4 * GB, 4 * GB, 200, 5 * 1000);
+
+ // Start an AM with 2GB
+ RMApp app2 = rm.submitApp(2 * GB, "app2", "user2");
+ nm.nodeHeartbeat(true);
+ RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
+ MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId());
+ am2.registerAppAttempt();
+ waitMemory(scheduler, nm, 6 * GB, 2 * GB, 200, 5 * 1000);
+ assertEquals(RMAppAttemptState.RUNNING, attempt2.getState());
+
+ Container c3 = createContainer(am2, 2 * GB);
+ waitMemory(scheduler, nm, 8 * GB, 0 * GB, 200, 5 * 1000);
+ assertEquals(5, scheduler.getNodeReport(nmId).getNumContainers());
+
+ // Reduce the resources to kill C3 and C2 (not AM2)
+ updateNodeResource(rm, nmId, 5 * GB, 6, 0);
+ waitMemory(scheduler, nm, 5 * GB, 0 * GB, 200, 5 * 1000);
+ assertEquals(3, scheduler.getNodeReport(nmId).getNumContainers());
+
+ List completedContainers =
+ am2.schedule().getCompletedContainersStatuses();
+ assertEquals(1, completedContainers.size());
+ ContainerStatus container3Status = completedContainers.get(0);
+ assertContainerKilled(c3.getId(), container3Status);
+
+ completedContainers = am.schedule().getCompletedContainersStatuses();
+ assertEquals(1, completedContainers.size());
+ ContainerStatus container2Status = completedContainers.get(0);
+ assertContainerKilled(c2.getId(), container2Status);
+ assertEquals(RMAppAttemptState.RUNNING, attempt.getState());
+ assertEquals(RMAppAttemptState.RUNNING, attempt2.getState());
+
+ // Reduce the resources to kill C1 (not AM2)
+ updateNodeResource(rm, nmId, 4 * GB, 6, 0);
+ waitMemory(scheduler, nm, 4 * GB, 0 * GB, 200, 5 * 1000);
+ assertEquals(2, scheduler.getNodeReport(nmId).getNumContainers());
+ completedContainers = am.schedule().getCompletedContainersStatuses();
+ assertEquals(1, completedContainers.size());
+ ContainerStatus container1Status = completedContainers.get(0);
+ assertContainerKilled(c1.getId(), container1Status);
+ assertEquals(RMAppAttemptState.RUNNING, attempt.getState());
+ assertEquals(RMAppAttemptState.RUNNING, attempt2.getState());
+
+ // Reduce the resources to kill AM2
+ updateNodeResource(rm, nmId, 2 * GB, 6, 0);
+ waitMemory(scheduler, nm, 2 * GB, 0 * GB, 200, 5 * 1000);
+ assertEquals(1, scheduler.getNodeReport(nmId).getNumContainers());
+ assertEquals(RMAppAttemptState.FAILED, attempt2.getState());
+
+ // The first application should be fine and still running
+ assertEquals(RMAppAttemptState.RUNNING, attempt.getState());
+ }
+
+ @Test
+ public void testEndToEnd() throws Exception {
+
+ Container c1 = createContainer(am, 2 * GB);
+ assertMemory(scheduler, nmId, 4 * GB, 0);
+
+ // check node report, 4 GB used and 0 GB available
+ assertMemory(scheduler, nmId, 4 * GB, 0);
+ nm.nodeHeartbeat(true);
+ assertEquals(4 * GB, nm.getCapability().getMemorySize());
+
+ // update node resource to 2 GB, so resource is over-consumed
+ updateNodeResource(rm, nmId, 2 * GB, 2, -1);
+ // the used resource should still 4 GB and negative available resource
+ waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000);
+ // check that we did not get a preemption requests
+ assertNoPreemption(am.schedule().getPreemptionMessage());
+
+ // check that the NM got the updated resources
+ nm.nodeHeartbeat(true);
+ assertEquals(2 * GB, nm.getCapability().getMemorySize());
+
+ // check container can complete successfully with resource over-commitment
+ ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
+ c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource());
+ nm.containerStatus(containerStatus);
+
+ LOG.info("Waiting for containers to be finished for app 1...");
+ GenericTestUtils.waitFor(
+ () -> attempt.getJustFinishedContainers().size() == 1, 100, 2000);
+ assertEquals(1, am.schedule().getCompletedContainersStatuses().size());
+ assertMemory(scheduler, nmId, 2 * GB, 0);
+
+ // verify no NPE is trigger in schedule after resource is updated
+ am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1);
+ AllocateResponse allocResponse2 = am.schedule();
+ assertTrue("Shouldn't have enough resource to allocate containers",
+ allocResponse2.getAllocatedContainers().isEmpty());
+ // try 10 times as scheduling is an async process
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(100);
+ allocResponse2 = am.schedule();
+ assertTrue("Shouldn't have enough resource to allocate containers",
+ allocResponse2.getAllocatedContainers().isEmpty());
+ }
+
+ // increase the resources again to 5 GB to schedule the 3GB container
+ updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+ waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 5 * 1000);
+
+ // kick the scheduling and check it took effect
+ nm.nodeHeartbeat(true);
+ while (allocResponse2.getAllocatedContainers().isEmpty()) {
+ LOG.info("Waiting for containers to be created for app 1...");
+ Thread.sleep(100);
+ allocResponse2 = am.schedule();
+ }
+ assertEquals(1, allocResponse2.getAllocatedContainers().size());
+ Container c2 = allocResponse2.getAllocatedContainers().get(0);
+ assertEquals(3 * GB, c2.getResource().getMemorySize());
+ assertEquals(nmId, c2.getNodeId());
+ assertMemory(scheduler, nmId, 5 * GB, 0);
+
+ // reduce the resources and trigger a preempt request to the AM for c2
+ updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
+ waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);
+
+ PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage();
+ assertPreemption(c2.getId(), preemptMsg);
+
+ // increasing the resources again, should stop killing the containers
+ updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+ waitMemory(scheduler, nmId, 5 * GB, 0, 200, 5 * 1000);
+ Thread.sleep(3 * 1000);
+ assertMemory(scheduler, nmId, 5 * GB, 0);
+
+ // reduce the resources again to trigger a preempt request to the AM for c2
+ long t0 = Time.now();
+ updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
+ waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);
+
+ preemptMsg = am.schedule().getPreemptionMessage();
+ assertPreemption(c2.getId(), preemptMsg);
+
+ // wait until the scheduler kills the container
+ GenericTestUtils.waitFor(() -> {
+ try {
+ nm.nodeHeartbeat(true); // trigger preemption in the NM
+ } catch (Exception e) {
+ LOG.error("Cannot heartbeat", e);
+ }
+ SchedulerNodeReport report = scheduler.getNodeReport(nmId);
+ return report.getAvailableResource().getMemorySize() > 0;
+ }, 200, 5 * 1000);
+ assertMemory(scheduler, nmId, 2 * GB, 1 * GB);
+
+ List completedContainers =
+ am.schedule().getCompletedContainersStatuses();
+ assertEquals(1, completedContainers.size());
+ ContainerStatus c2status = completedContainers.get(0);
+ assertContainerKilled(c2.getId(), c2status);
+
+ assertTime(2000, Time.now() - t0);
+ }
+
+ /**
+ * Create a container with a particular size and make sure it succeeds.
+ * @param am Application Master to add the container to.
+ * @param memory Memory of the container.
+ * @return Newly created container.
+ * @throws Exception If there are issues creating the container.
+ */
+ protected Container createContainer(
+ final MockAM app, final int memory) throws Exception {
+
+ ResourceRequest req = ResourceRequest.newBuilder()
+ .capability(Resource.newInstance(memory, 1))
+ .numContainers(1)
+ .build();
+ AllocateResponse response = app.allocate(singletonList(req), emptyList());
+ List allocated = response.getAllocatedContainers();
+ nm.nodeHeartbeat(true);
+ for (int i = 0; allocated.isEmpty() && i < 10; i++) {
+ LOG.info("Waiting for containers to be created for app...");
+ Thread.sleep(INTERVAL);
+ response = app.schedule();
+ allocated = response.getAllocatedContainers();
+ nm.nodeHeartbeat(true);
+ }
+ assertFalse("Cannot create the container", allocated.isEmpty());
+
+ assertEquals(1, allocated.size());
+ final Container c = allocated.get(0);
+ assertEquals(memory, c.getResource().getMemorySize());
+ assertEquals(nmId, c.getNodeId());
+ return c;
+ }
+
+ /**
+ * Update the resources on a Node Manager.
+ * @param rm Resource Manager to contact.
+ * @param nmId Identifier of the Node Manager.
+ * @param memory Memory in MB.
+ * @param vCores Number of virtual cores.
+ * @param overcommitTimeout Timeout for overcommit.
+ * @throws Exception If the update cannot be completed.
+ */
+ public static void updateNodeResource(MockRM rm, NodeId nmId,
+ int memory, int vCores, int overcommitTimeout) throws Exception {
+ AdminService admin = rm.getAdminService();
+ ResourceOption resourceOption = ResourceOption.newInstance(
+ Resource.newInstance(memory, vCores), overcommitTimeout);
+ UpdateNodeResourceRequest req = UpdateNodeResourceRequest.newInstance(
+ singletonMap(nmId, resourceOption));
+ admin.updateNodeResource(req);
+ }
+
+ /**
+ * Make sure that the container was killed.
+ * @param containerId Expected container identifier.
+ * @param status Container status to check.
+ */
+ public static void assertContainerKilled(
+ final ContainerId containerId, final ContainerStatus status) {
+ assertEquals(containerId, status.getContainerId());
+ assertEquals(ContainerState.COMPLETE, status.getState());
+ assertEquals(ContainerExitStatus.PREEMPTED, status.getExitStatus());
+ assertEquals(SchedulerUtils.PREEMPTED_CONTAINER, status.getDiagnostics());
+ }
+
+ /**
+ * Check that an elapsed time is at least the expected time and no more than
+ * two heart beats/scheduling rounds.
+ * @param expectedTime Time expected in milliseconds.
+ * @param time Actual time to check.
+ */
+ public static void assertTime(final long expectedTime, final long time) {
+ assertTrue("Too short: " + time + "ms", time > expectedTime);
+ assertTrue("Too long: " + time + "ms",
+ time < (expectedTime + 2 * INTERVAL));
+ }
+
+ /**
+ * Check that the scheduler didn't ask to preempt anything.
+ * @param msg Preemption message from the scheduler.
+ */
+ public static void assertNoPreemption(final PreemptionMessage msg) {
+ if (msg != null &&
+ msg.getContract() != null &&
+ !msg.getContract().getContainers().isEmpty()) {
+ fail("We shouldn't preempt containers: " + msg);
+ }
+ }
+
+ /**
+ * Check that the scheduler ask to preempt a particular container.
+ * @param containerId Expected container to preempt.
+ * @param msg Preemption message from the scheduler.
+ */
+ public static void assertPreemption(
+ final ContainerId containerId, final PreemptionMessage msg) {
+ assertNotNull("Expected a preemption message", msg);
+ Set preemptContainers = new HashSet<>();
+ if (msg.getContract() != null) {
+ for (PreemptionContainer c : msg.getContract().getContainers()) {
+ preemptContainers.add(c.getId());
+ }
+ }
+ if (msg.getStrictContract() != null) {
+ for (PreemptionContainer c : msg.getStrictContract().getContainers()) {
+ preemptContainers.add(c.getId());
+ }
+ }
+ assertEquals(Collections.singleton(containerId), preemptContainers);
+ }
+
+ /**
+ * Check if a node report has the expected memory values.
+ * @param scheduler Scheduler with the data.
+ * @param nmId Identifier of the node to check.
+ * @param expectedUsed The expected used memory in MB.
+ * @param expectedAvailable The expected available memory in MB.
+ */
+ public static void assertMemory(ResourceScheduler scheduler, NodeId nmId,
+ long expectedUsed, long expectedAvailable) {
+ SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId);
+ assertNotNull(nmReport);
+ Resource used = nmReport.getUsedResource();
+ assertEquals("Used memory", expectedUsed, used.getMemorySize());
+ Resource available = nmReport.getAvailableResource();
+ assertEquals("Available memory",
+ expectedAvailable, available.getMemorySize());
+ }
+
+ /**
+ * Wait until the memory of a NM is at a given point.
+ * It does not trigger NM heart beat.
+ * @param scheduler Scheduler with the data.
+ * @param nmId Identifier of the node to check.
+ * @param expectedUsed The expected used memory in MB.
+ * @param expectedAvailable The expected available memory in MB.
+ * @param checkEveryMillis How often to perform the test in ms.
+ * @param waitForMillis The maximum time to wait in ms.
+ * @throws Exception If we don't get to the expected memory.
+ */
+ public static void waitMemory(ResourceScheduler scheduler,
+ NodeId nmId, int expectedUsed, int expectedAvailable,
+ int checkEveryMillis, int waitForMillis) throws Exception {
+ waitMemory(scheduler, nmId, null, expectedUsed, expectedAvailable,
+ checkEveryMillis, waitForMillis);
+ }
+
+ /**
+ * Wait until the memory of a NM is at a given point.
+ * It triggers NM heart beat.
+ * @param scheduler Scheduler with the data.
+ * @param nm Node Manager to check.
+ * @param expectedUsed The expected used memory in MB.
+ * @param expectedAvailable The expected available memory in MB.
+ * @param checkEveryMillis How often to perform the test in ms.
+ * @param waitForMillis The maximum time to wait in ms.
+ * @throws Exception If we don't get to the expected memory.
+ */
+ public static void waitMemory(ResourceScheduler scheduler, MockNM nm,
+ int expectedUsed, int expectedAvailable,
+ int checkEveryMillis, int waitForMillis) throws Exception {
+ waitMemory(scheduler, nm.getNodeId(), nm, expectedUsed, expectedAvailable,
+ checkEveryMillis, waitForMillis);
+ }
+
+ /**
+ * Wait until the memory of a NM is at a given point.
+ * If the NM is specified, it does heart beat.
+ * @param scheduler Scheduler with the data.
+ * @param nmId Identifier of the node to check.
+ * @param nm Node Manager to check.
+ * @param expectedUsed The expected used memory in MB.
+ * @param expectedAvailable The expected available memory in MB.
+ * @param checkEveryMillis How often to perform the test in ms.
+ * @param waitForMillis The maximum time to wait in ms.
+ * @throws Exception If we don't get to the expected memory.
+ */
+ public static void waitMemory(ResourceScheduler scheduler,
+ NodeId nmId, MockNM nm,
+ int expectedUsed, int expectedAvailable,
+ int checkEveryMillis, int waitForMillis) throws Exception {
+
+ long start = Time.monotonicNow();
+ while (Time.monotonicNow() - start < waitForMillis) {
+ try {
+ if (nm != null) {
+ nm.nodeHeartbeat(true);
+ }
+ assertMemory(scheduler, nmId, expectedUsed, expectedAvailable);
+ return;
+ } catch (AssertionError e) {
+ Thread.sleep(checkEveryMillis);
+ }
+ }
+
+ // No success, notify time out
+ SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId);
+ Resource used = nmReport.getUsedResource();
+ Resource available = nmReport.getAvailableResource();
+ throw new TimeoutException("Took longer than " + waitForMillis +
+ "ms to get to " + expectedUsed + "," + expectedAvailable +
+ " actual=" + used + "," + available);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 5cb49a4e34..fd8fa0535c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -21,6 +21,13 @@
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertContainerKilled;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertMemory;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertNoPreemption;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertPreemption;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertTime;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.updateNodeResource;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.waitMemory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -57,6 +64,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -76,12 +84,12 @@
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -94,8 +102,6 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -1310,110 +1316,139 @@ public void testAllocateReorder() throws Exception {
@Test
public void testResourceOverCommit() throws Exception {
- int waitCount;
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
+ ResourceScheduler scheduler = rm.getResourceScheduler();
- MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
- RMApp app1 = rm.submitApp(2048);
- // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1
- nm1.nodeHeartbeat(true);
- RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
- MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
- am1.registerAppAttempt();
- SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
- nm1.getNodeId());
- // check node report, 2 GB used and 2 GB available
- Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
- Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemorySize());
+ MockNM nm = rm.registerNode("127.0.0.1:1234", 4 * GB);
+ NodeId nmId = nm.getNodeId();
+ RMApp app = rm.submitApp(2048);
+ // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm
+ nm.nodeHeartbeat(true);
+ RMAppAttempt attempt1 = app.getCurrentAppAttempt();
+ MockAM am = rm.sendAMLaunched(attempt1.getAppAttemptId());
+ am.registerAppAttempt();
+ assertMemory(scheduler, nmId, 2 * GB, 2 * GB);
- // add request for containers
- am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1);
- AllocateResponse alloc1Response = am1.schedule(); // send the request
+ // add request for 1 container of 2 GB
+ am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 1);
+ AllocateResponse alloc1Response = am.schedule(); // send the request
// kick the scheduler, 2 GB given to AM1, resource remaining 0
- nm1.nodeHeartbeat(true);
- while (alloc1Response.getAllocatedContainers().size() < 1) {
+ nm.nodeHeartbeat(true);
+ while (alloc1Response.getAllocatedContainers().isEmpty()) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
- alloc1Response = am1.schedule();
+ alloc1Response = am.schedule();
}
List allocated1 = alloc1Response.getAllocatedContainers();
- Assert.assertEquals(1, allocated1.size());
- Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemorySize());
- Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
-
- report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
- // check node report, 4 GB used and 0 GB available
- Assert.assertEquals(0, report_nm1.getAvailableResource().getMemorySize());
- Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize());
-
- // check container is assigned with 2 GB.
+ assertEquals(1, allocated1.size());
Container c1 = allocated1.get(0);
- Assert.assertEquals(2 * GB, c1.getResource().getMemorySize());
+ assertEquals(2 * GB, c1.getResource().getMemorySize());
+ assertEquals(nmId, c1.getNodeId());
- // update node resource to 2 GB, so resource is over-consumed.
- Map nodeResourceMap =
- new HashMap();
- nodeResourceMap.put(nm1.getNodeId(),
- ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1));
- UpdateNodeResourceRequest request =
- UpdateNodeResourceRequest.newInstance(nodeResourceMap);
- AdminService as = ((MockRM)rm).getAdminService();
- as.updateNodeResource(request);
+ // check node report, 4 GB used and 0 GB available
+ assertMemory(scheduler, nmId, 4 * GB, 0);
+ nm.nodeHeartbeat(true);
+ assertEquals(4 * GB, nm.getCapability().getMemorySize());
- waitCount = 0;
- while (waitCount++ != 20) {
- report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
- if (report_nm1.getAvailableResource().getMemorySize() != 0) {
- break;
- }
- LOG.info("Waiting for RMNodeResourceUpdateEvent to be handled... Tried "
- + waitCount + " times already..");
- Thread.sleep(1000);
- }
- // Now, the used resource is still 4 GB, and available resource is minus value.
- report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
- Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize());
- Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemorySize());
+ // update node resource to 2 GB, so resource is over-consumed
+ updateNodeResource(rm, nmId, 2 * GB, 2, -1);
+ // the used resource should still 4 GB and negative available resource
+ waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000);
+ // check that we did not get a preemption requests
+ assertNoPreemption(am.schedule().getPreemptionMessage());
- // Check container can complete successfully in case of resource over-commitment.
+ // check that the NM got the updated resources
+ nm.nodeHeartbeat(true);
+ assertEquals(2 * GB, nm.getCapability().getMemorySize());
+
+ // check container can complete successfully with resource over-commitment
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource());
- nm1.containerStatus(containerStatus);
- waitCount = 0;
- while (attempt1.getJustFinishedContainers().size() < 1
- && waitCount++ != 20) {
- LOG.info("Waiting for containers to be finished for app 1... Tried "
- + waitCount + " times already..");
- Thread.sleep(100);
- }
- Assert.assertEquals(1, attempt1.getJustFinishedContainers().size());
- Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size());
- report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
- Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
- // As container return 2 GB back, the available resource becomes 0 again.
- Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize());
+ nm.containerStatus(containerStatus);
- // Verify no NPE is trigger in schedule after resource is updated.
- am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 1, 1);
- alloc1Response = am1.schedule();
- Assert.assertEquals("Shouldn't have enough resource to allocate containers",
- 0, alloc1Response.getAllocatedContainers().size());
- int times = 0;
- // try 10 times as scheduling is async process.
- while (alloc1Response.getAllocatedContainers().size() < 1
- && times++ < 10) {
- LOG.info("Waiting for containers to be allocated for app 1... Tried "
- + times + " times already..");
+ LOG.info("Waiting for containers to be finished for app 1...");
+ GenericTestUtils.waitFor(
+ () -> attempt1.getJustFinishedContainers().size() == 1, 100, 2000);
+ assertEquals(1, am.schedule().getCompletedContainersStatuses().size());
+ assertMemory(scheduler, nmId, 2 * GB, 0);
+
+ // verify no NPE is trigger in schedule after resource is updated
+ am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1);
+ AllocateResponse allocResponse2 = am.schedule();
+ assertTrue("Shouldn't have enough resource to allocate containers",
+ allocResponse2.getAllocatedContainers().isEmpty());
+ // try 10 times as scheduling is an async process
+ for (int i = 0; i < 10; i++) {
Thread.sleep(100);
+ allocResponse2 = am.schedule();
+ assertTrue("Shouldn't have enough resource to allocate containers",
+ allocResponse2.getAllocatedContainers().isEmpty());
}
- Assert.assertEquals("Shouldn't have enough resource to allocate containers",
- 0, alloc1Response.getAllocatedContainers().size());
+
+ // increase the resources again to 5 GB to schedule the 3GB container
+ updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+ waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 5 * 1000);
+
+ // kick the scheduling and check it took effect
+ nm.nodeHeartbeat(true);
+ while (allocResponse2.getAllocatedContainers().isEmpty()) {
+ LOG.info("Waiting for containers to be created for app 1...");
+ Thread.sleep(100);
+ allocResponse2 = am.schedule();
+ }
+ assertEquals(1, allocResponse2.getAllocatedContainers().size());
+ Container c2 = allocResponse2.getAllocatedContainers().get(0);
+ assertEquals(3 * GB, c2.getResource().getMemorySize());
+ assertEquals(nmId, c2.getNodeId());
+ assertMemory(scheduler, nmId, 5 * GB, 0);
+
+ // reduce the resources and trigger a preempt request to the AM for c2
+ updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
+ waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);
+
+ PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage();
+ assertPreemption(c2.getId(), preemptMsg);
+
+ // increasing the resources again, should stop killing the containers
+ updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+ waitMemory(scheduler, nmId, 5 * GB, 0, 200, 5 * 1000);
+ Thread.sleep(3 * 1000);
+ assertMemory(scheduler, nmId, 5 * GB, 0);
+
+ // reduce the resources again to trigger a preempt request to the AM for c2
+ long t0 = Time.now();
+ updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
+ waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);
+
+ preemptMsg = am.schedule().getPreemptionMessage();
+ assertPreemption(c2.getId(), preemptMsg);
+
+ // wait until the scheduler kills the container
+ GenericTestUtils.waitFor(() -> {
+ try {
+ nm.nodeHeartbeat(true); // trigger preemption in the NM
+ } catch (Exception e) {
+ LOG.error("Cannot heartbeat", e);
+ }
+ SchedulerNodeReport report = scheduler.getNodeReport(nmId);
+ return report.getAvailableResource().getMemorySize() > 0;
+ }, 200, 5 * 1000);
+ assertMemory(scheduler, nmId, 2 * GB, 1 * GB);
+
+ List completedContainers =
+ am.schedule().getCompletedContainersStatuses();
+ assertEquals(1, completedContainers.size());
+ ContainerStatus c2status = completedContainers.get(0);
+ assertContainerKilled(c2.getId(), c2status);
+
+ assertTime(2000, Time.now() - t0);
+
rm.stop();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java
new file mode 100644
index 0000000000..27eb3ac7bf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java
@@ -0,0 +1,52 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerOvercommit;
+
+/**
+ * Test changing resources and overcommit in the Capacity Scheduler
+ * {@link CapacityScheduler}.
+ */
+public class TestCapacitySchedulerOvercommit extends TestSchedulerOvercommit {
+
+ @Override
+ protected Configuration getConfiguration() {
+ Configuration conf = super.getConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER,
+ CapacityScheduler.class, ResourceScheduler.class);
+
+ // Remove limits on AMs to allow multiple applications running
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration(conf);
+ csConf.setMaximumApplicationMasterResourcePerQueuePercent(
+ CapacitySchedulerConfiguration.ROOT, 100.0f);
+ csConf.setMaximumAMResourcePercentPerPartition(
+ CapacitySchedulerConfiguration.ROOT, "", 100.0f);
+ csConf.setMaximumApplicationMasterResourcePerQueuePercent(
+ CapacitySchedulerConfiguration.ROOT + ".default", 100.0f);
+ csConf.setMaximumAMResourcePercentPerPartition(
+ CapacitySchedulerConfiguration.ROOT + ".default", "", 100.0f);
+
+ return csConf;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerOvercommit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerOvercommit.java
new file mode 100644
index 0000000000..9d31f99d6f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerOvercommit.java
@@ -0,0 +1,46 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerOvercommit;
+
+/**
+ * Test changing resources and overcommit in the Fair Scheduler
+ * {@link FairScheduler}.
+ */
+public class TestFairSchedulerOvercommit extends TestSchedulerOvercommit {
+
+ @Override
+ protected Configuration getConfiguration() {
+ Configuration conf = super.getConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER,
+ FairScheduler.class, ResourceScheduler.class);
+
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10 * GB);
+ conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
+ conf.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10);
+ conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+
+ return conf;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2-resourcemanager.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2-resourcemanager.properties
new file mode 100644
index 0000000000..addcd53783
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2-resourcemanager.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# syntax: [prefix].[source|sink].[instance].[options]
+# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
+
+*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
+# default sampling period, in seconds
+*.period=10
+*.periodMillis=100
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2.properties
new file mode 100644
index 0000000000..addcd53783
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# syntax: [prefix].[source|sink].[instance].[options]
+# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
+
+*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
+# default sampling period, in seconds
+*.period=10
+*.periodMillis=100