YARN-9537. Add configuration to disable AM preemption. Contributed by ZhouKang
This commit is contained in:
parent
f6697aa82b
commit
b83b9ab418
@ -28,6 +28,7 @@
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
@ -99,6 +100,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||
private final Map<String, Set<String>> reservations = new HashMap<>();
|
||||
|
||||
private final List<FSSchedulerNode> blacklistNodeIds = new ArrayList<>();
|
||||
|
||||
private boolean enableAMPreemption;
|
||||
|
||||
/**
|
||||
* Delay scheduling: We often want to prioritize scheduling of node-local
|
||||
* containers over rack-local or off-switch containers. To achieve this
|
||||
@ -121,6 +125,8 @@ public FSAppAttempt(FairScheduler scheduler,
|
||||
this.startTime = scheduler.getClock().getTime();
|
||||
this.lastTimeAtFairShare = this.startTime;
|
||||
this.appPriority = Priority.newInstance(1);
|
||||
this.enableAMPreemption = scheduler.getConf()
|
||||
.getAMPreemptionEnabled(getQueue().getQueueName());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -589,6 +595,10 @@ boolean canContainerBePreempted(RMContainer container,
|
||||
return false;
|
||||
}
|
||||
|
||||
if (container.isAMContainer() && !enableAMPreemption) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Sanity check that the app owns this container
|
||||
if (!getLiveContainersMap().containsKey(container.getContainerId()) &&
|
||||
!newlyAllocatedContainers.contains(container)) {
|
||||
@ -1416,4 +1426,9 @@ public String toString() {
|
||||
public boolean isPreemptable() {
|
||||
return getQueue().isPreemptable();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setEnableAMPreemption(boolean enableAMPreemption) {
|
||||
this.enableAMPreemption = enableAMPreemption;
|
||||
}
|
||||
}
|
||||
|
@ -169,6 +169,10 @@ public class FairSchedulerConfiguration extends Configuration {
|
||||
public static final String PREEMPTION = CONF_PREFIX + "preemption";
|
||||
public static final boolean DEFAULT_PREEMPTION = false;
|
||||
|
||||
protected static final String AM_PREEMPTION_PREFIX =
|
||||
CONF_PREFIX + "am.preemption.";
|
||||
protected static final boolean DEFAULT_AM_PREEMPTION = true;
|
||||
|
||||
protected static final String PREEMPTION_THRESHOLD =
|
||||
CONF_PREFIX + "preemption.cluster-utilization-threshold";
|
||||
protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;
|
||||
@ -395,6 +399,10 @@ public boolean getPreemptionEnabled() {
|
||||
return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
|
||||
}
|
||||
|
||||
public boolean getAMPreemptionEnabled(String queueName) {
|
||||
return getBoolean(AM_PREEMPTION_PREFIX + queueName, DEFAULT_AM_PREEMPTION);
|
||||
}
|
||||
|
||||
public float getPreemptionUtilizationThreshold() {
|
||||
return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD);
|
||||
}
|
||||
|
@ -238,6 +238,8 @@ public void testHeadroom() {
|
||||
(clusterUsage);
|
||||
Mockito.when(mockScheduler.getRootQueueMetrics()).thenReturn
|
||||
(fakeRootQueueMetrics);
|
||||
Mockito.when(mockScheduler.getConf()).thenReturn
|
||||
(Mockito.mock(FairSchedulerConfiguration.class));
|
||||
|
||||
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
|
||||
RMContext rmContext = resourceManager.getRMContext();
|
||||
|
@ -421,6 +421,18 @@ private void tryPreemptMoreThanFairShare(String queueName)
|
||||
verifyPreemption(1, 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisableAMPreemption() {
|
||||
takeAllResources("root.preemptable.child-1");
|
||||
setNumAMContainersPerNode(2);
|
||||
RMContainer container = greedyApp.getLiveContainers().stream()
|
||||
.filter(rmContainer -> rmContainer.isAMContainer())
|
||||
.findFirst()
|
||||
.get();
|
||||
greedyApp.setEnableAMPreemption(false);
|
||||
assertFalse(greedyApp.canContainerBePreempted(container, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare()
|
||||
throws InterruptedException {
|
||||
|
Loading…
Reference in New Issue
Block a user