Revert "YARN-7813: Capacity Scheduler Intra-queue Preemption should be configurable for each queue"
This reverts commit c5e6e3de1c
.
This commit is contained in:
parent
1f20f432d2
commit
bddfe42e2c
@ -94,26 +94,6 @@ public static QueueInfo newInstance(String queueName, float capacity,
|
|||||||
return queueInfo;
|
return queueInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
|
||||||
@Unstable
|
|
||||||
public static QueueInfo newInstance(String queueName, float capacity,
|
|
||||||
float maximumCapacity, float currentCapacity,
|
|
||||||
List<QueueInfo> childQueues, List<ApplicationReport> applications,
|
|
||||||
QueueState queueState, Set<String> accessibleNodeLabels,
|
|
||||||
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
|
|
||||||
boolean preemptionDisabled,
|
|
||||||
Map<String, QueueConfigurations> queueConfigurations,
|
|
||||||
boolean intraQueuePreemptionDisabled) {
|
|
||||||
QueueInfo queueInfo = QueueInfo.newInstance(queueName, capacity,
|
|
||||||
maximumCapacity, currentCapacity,
|
|
||||||
childQueues, applications,
|
|
||||||
queueState, accessibleNodeLabels,
|
|
||||||
defaultNodeLabelExpression, queueStatistics,
|
|
||||||
preemptionDisabled, queueConfigurations);
|
|
||||||
queueInfo.setIntraQueuePreemptionDisabled(intraQueuePreemptionDisabled);
|
|
||||||
return queueInfo;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the <em>name</em> of the queue.
|
* Get the <em>name</em> of the queue.
|
||||||
* @return <em>name</em> of the queue
|
* @return <em>name</em> of the queue
|
||||||
@ -281,19 +261,4 @@ public abstract void setDefaultNodeLabelExpression(
|
|||||||
@Unstable
|
@Unstable
|
||||||
public abstract void setQueueConfigurations(
|
public abstract void setQueueConfigurations(
|
||||||
Map<String, QueueConfigurations> queueConfigurations);
|
Map<String, QueueConfigurations> queueConfigurations);
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the intra-queue preemption status of the queue.
|
|
||||||
* @return if property is not in proto, return null;
|
|
||||||
* otherwise, return intra-queue preemption status of the queue
|
|
||||||
*/
|
|
||||||
@Public
|
|
||||||
@Stable
|
|
||||||
public abstract Boolean getIntraQueuePreemptionDisabled();
|
|
||||||
|
|
||||||
@Private
|
|
||||||
@Unstable
|
|
||||||
public abstract void setIntraQueuePreemptionDisabled(
|
|
||||||
boolean intraQueuePreemptionDisabled);
|
|
||||||
}
|
}
|
||||||
|
@ -569,7 +569,6 @@ message QueueInfoProto {
|
|||||||
optional QueueStatisticsProto queueStatistics = 10;
|
optional QueueStatisticsProto queueStatistics = 10;
|
||||||
optional bool preemptionDisabled = 11;
|
optional bool preemptionDisabled = 11;
|
||||||
repeated QueueConfigurationsMapProto queueConfigurationsMap = 12;
|
repeated QueueConfigurationsMapProto queueConfigurationsMap = 12;
|
||||||
optional bool intraQueuePreemptionDisabled = 13;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message QueueConfigurationsProto {
|
message QueueConfigurationsProto {
|
||||||
|
@ -158,11 +158,5 @@ private void printQueueInfo(PrintWriter writer, QueueInfo queueInfo) {
|
|||||||
writer.print("\tPreemption : ");
|
writer.print("\tPreemption : ");
|
||||||
writer.println(preemptStatus ? "disabled" : "enabled");
|
writer.println(preemptStatus ? "disabled" : "enabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
Boolean intraQueuePreemption = queueInfo.getIntraQueuePreemptionDisabled();
|
|
||||||
if (intraQueuePreemption != null) {
|
|
||||||
writer.print("\tIntra-queue Preemption : ");
|
|
||||||
writer.println(intraQueuePreemption ? "disabled" : "enabled");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -665,8 +665,7 @@ public List<NodeReport> createFakeNodeReports() {
|
|||||||
|
|
||||||
public QueueInfo createFakeQueueInfo() {
|
public QueueInfo createFakeQueueInfo() {
|
||||||
return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
|
return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
|
||||||
createFakeAppReports(), QueueState.RUNNING, null, null, null, false,
|
createFakeAppReports(), QueueState.RUNNING, null, null, null, false);
|
||||||
null, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<QueueUserACLInfo> createFakeQueueUserACLInfoList() {
|
public List<QueueUserACLInfo> createFakeQueueUserACLInfoList() {
|
||||||
|
@ -1712,8 +1712,7 @@ public void testGetQueueInfo() throws Exception {
|
|||||||
nodeLabels.add("GPU");
|
nodeLabels.add("GPU");
|
||||||
nodeLabels.add("JDK_7");
|
nodeLabels.add("JDK_7");
|
||||||
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
|
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
|
||||||
null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, null,
|
null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, null);
|
||||||
false);
|
|
||||||
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
|
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
|
||||||
int result = cli.run(new String[] { "-status", "queueA" });
|
int result = cli.run(new String[] { "-status", "queueA" });
|
||||||
assertEquals(0, result);
|
assertEquals(0, result);
|
||||||
@ -1729,82 +1728,11 @@ public void testGetQueueInfo() throws Exception {
|
|||||||
pw.println("\tDefault Node Label expression : " + "GPU");
|
pw.println("\tDefault Node Label expression : " + "GPU");
|
||||||
pw.println("\tAccessible Node Labels : " + "JDK_7,GPU");
|
pw.println("\tAccessible Node Labels : " + "JDK_7,GPU");
|
||||||
pw.println("\tPreemption : " + "enabled");
|
pw.println("\tPreemption : " + "enabled");
|
||||||
pw.println("\tIntra-queue Preemption : " + "enabled");
|
|
||||||
pw.close();
|
pw.close();
|
||||||
String queueInfoStr = baos.toString("UTF-8");
|
String queueInfoStr = baos.toString("UTF-8");
|
||||||
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
|
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetQueueInfoOverrideIntraQueuePreemption() throws Exception {
|
|
||||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
|
||||||
ReservationSystemTestUtil.setupQueueConfiguration(conf);
|
|
||||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
||||||
ResourceScheduler.class);
|
|
||||||
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
|
|
||||||
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
|
|
||||||
"org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity."
|
|
||||||
+ "ProportionalCapacityPreemptionPolicy");
|
|
||||||
// Turn on cluster-wide intra-queue preemption
|
|
||||||
conf.setBoolean(
|
|
||||||
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
|
|
||||||
// Disable intra-queue preemption for all queues
|
|
||||||
conf.setBoolean(CapacitySchedulerConfiguration.PREFIX
|
|
||||||
+ "root.intra-queue-preemption.disable_preemption", true);
|
|
||||||
// Enable intra-queue preemption for the a1 queue
|
|
||||||
conf.setBoolean(CapacitySchedulerConfiguration.PREFIX
|
|
||||||
+ "root.a.a1.intra-queue-preemption.disable_preemption", false);
|
|
||||||
MiniYARNCluster cluster =
|
|
||||||
new MiniYARNCluster("testGetQueueInfoOverrideIntraQueuePreemption",
|
|
||||||
2, 1, 1);
|
|
||||||
|
|
||||||
YarnClient yarnClient = null;
|
|
||||||
try {
|
|
||||||
cluster.init(conf);
|
|
||||||
cluster.start();
|
|
||||||
final Configuration yarnConf = cluster.getConfig();
|
|
||||||
yarnClient = YarnClient.createYarnClient();
|
|
||||||
yarnClient.init(yarnConf);
|
|
||||||
yarnClient.start();
|
|
||||||
|
|
||||||
QueueCLI cli = new QueueCLI();
|
|
||||||
cli.setClient(yarnClient);
|
|
||||||
cli.setSysOutPrintStream(sysOut);
|
|
||||||
cli.setSysErrPrintStream(sysErr);
|
|
||||||
sysOutStream.reset();
|
|
||||||
// Get status for the root.a queue
|
|
||||||
int result = cli.run(new String[] { "-status", "a" });
|
|
||||||
assertEquals(0, result);
|
|
||||||
String queueStatusOut = sysOutStream.toString();
|
|
||||||
Assert.assertTrue(queueStatusOut
|
|
||||||
.contains("\tPreemption : enabled"));
|
|
||||||
// In-queue preemption is disabled at the "root.a" queue level
|
|
||||||
Assert.assertTrue(queueStatusOut
|
|
||||||
.contains("Intra-queue Preemption : disabled"));
|
|
||||||
cli = new QueueCLI();
|
|
||||||
cli.setClient(yarnClient);
|
|
||||||
cli.setSysOutPrintStream(sysOut);
|
|
||||||
cli.setSysErrPrintStream(sysErr);
|
|
||||||
sysOutStream.reset();
|
|
||||||
// Get status for the root.a.a1 queue
|
|
||||||
result = cli.run(new String[] { "-status", "a1" });
|
|
||||||
assertEquals(0, result);
|
|
||||||
queueStatusOut = sysOutStream.toString();
|
|
||||||
Assert.assertTrue(queueStatusOut
|
|
||||||
.contains("\tPreemption : enabled"));
|
|
||||||
// In-queue preemption is enabled at the "root.a.a1" queue level
|
|
||||||
Assert.assertTrue(queueStatusOut
|
|
||||||
.contains("Intra-queue Preemption : enabled"));
|
|
||||||
} finally {
|
|
||||||
// clean-up
|
|
||||||
if (yarnClient != null) {
|
|
||||||
yarnClient.stop();
|
|
||||||
}
|
|
||||||
cluster.stop();
|
|
||||||
cluster.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetQueueInfoPreemptionEnabled() throws Exception {
|
public void testGetQueueInfoPreemptionEnabled() throws Exception {
|
||||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
@ -1815,10 +1743,9 @@ public void testGetQueueInfoPreemptionEnabled() throws Exception {
|
|||||||
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
|
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
|
||||||
"org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity."
|
"org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity."
|
||||||
+ "ProportionalCapacityPreemptionPolicy");
|
+ "ProportionalCapacityPreemptionPolicy");
|
||||||
conf.setBoolean(
|
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
|
||||||
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
|
|
||||||
MiniYARNCluster cluster =
|
MiniYARNCluster cluster =
|
||||||
new MiniYARNCluster("testGetQueueInfoPreemptionEnabled", 2, 1, 1);
|
new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
|
||||||
|
|
||||||
YarnClient yarnClient = null;
|
YarnClient yarnClient = null;
|
||||||
try {
|
try {
|
||||||
@ -1836,11 +1763,8 @@ public void testGetQueueInfoPreemptionEnabled() throws Exception {
|
|||||||
sysOutStream.reset();
|
sysOutStream.reset();
|
||||||
int result = cli.run(new String[] { "-status", "a1" });
|
int result = cli.run(new String[] { "-status", "a1" });
|
||||||
assertEquals(0, result);
|
assertEquals(0, result);
|
||||||
String queueStatusOut = sysOutStream.toString();
|
Assert.assertTrue(sysOutStream.toString()
|
||||||
Assert.assertTrue(queueStatusOut
|
.contains("Preemption : enabled"));
|
||||||
.contains("\tPreemption : enabled"));
|
|
||||||
Assert.assertTrue(queueStatusOut
|
|
||||||
.contains("Intra-queue Preemption : enabled"));
|
|
||||||
} finally {
|
} finally {
|
||||||
// clean-up
|
// clean-up
|
||||||
if (yarnClient != null) {
|
if (yarnClient != null) {
|
||||||
@ -1880,11 +1804,8 @@ public void testGetQueueInfoPreemptionDisabled() throws Exception {
|
|||||||
sysOutStream.reset();
|
sysOutStream.reset();
|
||||||
int result = cli.run(new String[] { "-status", "a1" });
|
int result = cli.run(new String[] { "-status", "a1" });
|
||||||
assertEquals(0, result);
|
assertEquals(0, result);
|
||||||
String queueStatusOut = sysOutStream.toString();
|
Assert.assertTrue(sysOutStream.toString()
|
||||||
Assert.assertTrue(queueStatusOut
|
.contains("Preemption : disabled"));
|
||||||
.contains("\tPreemption : disabled"));
|
|
||||||
Assert.assertTrue(queueStatusOut
|
|
||||||
.contains("Intra-queue Preemption : disabled"));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1892,7 +1813,7 @@ public void testGetQueueInfoPreemptionDisabled() throws Exception {
|
|||||||
public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
|
public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
|
||||||
QueueCLI cli = createAndGetQueueCLI();
|
QueueCLI cli = createAndGetQueueCLI();
|
||||||
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
|
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
|
||||||
null, null, QueueState.RUNNING, null, null, null, true, null, true);
|
null, null, QueueState.RUNNING, null, null, null, true, null);
|
||||||
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
|
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
|
||||||
int result = cli.run(new String[] { "-status", "queueA" });
|
int result = cli.run(new String[] { "-status", "queueA" });
|
||||||
assertEquals(0, result);
|
assertEquals(0, result);
|
||||||
@ -1909,7 +1830,6 @@ public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
|
|||||||
+ NodeLabel.DEFAULT_NODE_LABEL_PARTITION);
|
+ NodeLabel.DEFAULT_NODE_LABEL_PARTITION);
|
||||||
pw.println("\tAccessible Node Labels : ");
|
pw.println("\tAccessible Node Labels : ");
|
||||||
pw.println("\tPreemption : " + "disabled");
|
pw.println("\tPreemption : " + "disabled");
|
||||||
pw.println("\tIntra-queue Preemption : " + "disabled");
|
|
||||||
pw.close();
|
pw.close();
|
||||||
String queueInfoStr = baos.toString("UTF-8");
|
String queueInfoStr = baos.toString("UTF-8");
|
||||||
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
|
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
|
||||||
|
@ -500,17 +500,4 @@ public void setQueueConfigurations(
|
|||||||
this.queueConfigurations.putAll(queueConfigurations);
|
this.queueConfigurations.putAll(queueConfigurations);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Boolean getIntraQueuePreemptionDisabled() {
|
|
||||||
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
|
|
||||||
return (p.hasIntraQueuePreemptionDisabled()) ? p
|
|
||||||
.getIntraQueuePreemptionDisabled() : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setIntraQueuePreemptionDisabled(
|
|
||||||
boolean intraQueuePreemptionDisabled) {
|
|
||||||
maybeInitBuilder();
|
|
||||||
builder.setIntraQueuePreemptionDisabled(intraQueuePreemptionDisabled);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -414,7 +414,7 @@ public static void setup() throws Exception {
|
|||||||
// it is recursive(has sub queues)
|
// it is recursive(has sub queues)
|
||||||
typeValueCache.put(QueueInfo.class, QueueInfo.newInstance("root", 1.0f,
|
typeValueCache.put(QueueInfo.class, QueueInfo.newInstance("root", 1.0f,
|
||||||
1.0f, 0.1f, null, null, QueueState.RUNNING, ImmutableSet.of("x", "y"),
|
1.0f, 0.1f, null, null, QueueState.RUNNING, ImmutableSet.of("x", "y"),
|
||||||
"x && y", null, false, null, false));
|
"x && y", null, false));
|
||||||
generateByNewInstance(QueueStatistics.class);
|
generateByNewInstance(QueueStatistics.class);
|
||||||
generateByNewInstance(QueueUserACLInfo.class);
|
generateByNewInstance(QueueUserACLInfo.class);
|
||||||
generateByNewInstance(YarnClusterMetrics.class);
|
generateByNewInstance(YarnClusterMetrics.class);
|
||||||
|
@ -114,8 +114,8 @@ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't preempt if intra-queue preemption is disabled for this queue.
|
// Don't preempt if disabled for this queue.
|
||||||
if (leafQueue.getIntraQueuePreemptionDisabled()) {
|
if (leafQueue.getPreemptionDisabled()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,9 +97,6 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||||||
new HashMap<AccessType, AccessControlList>();
|
new HashMap<AccessType, AccessControlList>();
|
||||||
volatile boolean reservationsContinueLooking;
|
volatile boolean reservationsContinueLooking;
|
||||||
private volatile boolean preemptionDisabled;
|
private volatile boolean preemptionDisabled;
|
||||||
// Indicates if the in-queue preemption setting is ever disabled within the
|
|
||||||
// hierarchy of this queue.
|
|
||||||
private boolean intraQueuePreemptionDisabledInHierarchy;
|
|
||||||
|
|
||||||
// Track resource usage-by-label like used-resource/pending-resource, etc.
|
// Track resource usage-by-label like used-resource/pending-resource, etc.
|
||||||
volatile ResourceUsage queueUsage;
|
volatile ResourceUsage queueUsage;
|
||||||
@ -408,8 +405,6 @@ protected void setupQueueConfigs(Resource clusterResource,
|
|||||||
|
|
||||||
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this,
|
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this,
|
||||||
configuration);
|
configuration);
|
||||||
this.intraQueuePreemptionDisabledInHierarchy =
|
|
||||||
isIntraQueueHierarchyPreemptionDisabled(this, configuration);
|
|
||||||
|
|
||||||
this.priority = configuration.getQueuePriority(
|
this.priority = configuration.getQueuePriority(
|
||||||
getQueuePath());
|
getQueuePath());
|
||||||
@ -618,8 +613,6 @@ protected QueueInfo getQueueInfo() {
|
|||||||
queueInfo.setCurrentCapacity(getUsedCapacity());
|
queueInfo.setCurrentCapacity(getUsedCapacity());
|
||||||
queueInfo.setQueueStatistics(getQueueStatistics());
|
queueInfo.setQueueStatistics(getQueueStatistics());
|
||||||
queueInfo.setPreemptionDisabled(preemptionDisabled);
|
queueInfo.setPreemptionDisabled(preemptionDisabled);
|
||||||
queueInfo.setIntraQueuePreemptionDisabled(
|
|
||||||
getIntraQueuePreemptionDisabled());
|
|
||||||
queueInfo.setQueueConfigurations(getQueueConfigurations());
|
queueInfo.setQueueConfigurations(getQueueConfigurations());
|
||||||
return queueInfo;
|
return queueInfo;
|
||||||
}
|
}
|
||||||
@ -742,16 +735,6 @@ public Map<AccessType, AccessControlList> getACLs() {
|
|||||||
public boolean getPreemptionDisabled() {
|
public boolean getPreemptionDisabled() {
|
||||||
return preemptionDisabled;
|
return preemptionDisabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
|
||||||
public boolean getIntraQueuePreemptionDisabled() {
|
|
||||||
return intraQueuePreemptionDisabledInHierarchy || preemptionDisabled;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Private
|
|
||||||
public boolean getIntraQueuePreemptionDisabledInHierarchy() {
|
|
||||||
return intraQueuePreemptionDisabledInHierarchy;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public QueueCapacities getQueueCapacities() {
|
public QueueCapacities getQueueCapacities() {
|
||||||
@ -774,19 +757,17 @@ public ReentrantReadWriteLock.ReadLock getReadLock() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The specified queue is cross-queue preemptable if system-wide cross-queue
|
* The specified queue is preemptable if system-wide preemption is turned on
|
||||||
* preemption is turned on unless any queue in the <em>qPath</em> hierarchy
|
* unless any queue in the <em>qPath</em> hierarchy has explicitly turned
|
||||||
* has explicitly turned cross-queue preemption off.
|
* preemption off.
|
||||||
* NOTE: Cross-queue preemptability is inherited from a queue's parent.
|
* NOTE: Preemptability is inherited from a queue's parent.
|
||||||
*
|
*
|
||||||
* @param q queue to check preemption state
|
* @return true if queue has preemption disabled, false otherwise
|
||||||
* @param configuration capacity scheduler config
|
|
||||||
* @return true if queue has cross-queue preemption disabled, false otherwise
|
|
||||||
*/
|
*/
|
||||||
private boolean isQueueHierarchyPreemptionDisabled(CSQueue q,
|
private boolean isQueueHierarchyPreemptionDisabled(CSQueue q,
|
||||||
CapacitySchedulerConfiguration configuration) {
|
CapacitySchedulerConfiguration configuration) {
|
||||||
boolean systemWidePreemption =
|
boolean systemWidePreemption =
|
||||||
configuration
|
csContext.getConfiguration()
|
||||||
.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
|
.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
|
||||||
CSQueue parentQ = q.getParent();
|
CSQueue parentQ = q.getParent();
|
||||||
@ -809,44 +790,7 @@ private boolean isQueueHierarchyPreemptionDisabled(CSQueue q,
|
|||||||
return configuration.getPreemptionDisabled(q.getQueuePath(),
|
return configuration.getPreemptionDisabled(q.getQueuePath(),
|
||||||
parentQ.getPreemptionDisabled());
|
parentQ.getPreemptionDisabled());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* The specified queue is intra-queue preemptable if
|
|
||||||
* 1) system-wide intra-queue preemption is turned on
|
|
||||||
* 2) no queue in the <em>qPath</em> hierarchy has explicitly turned off intra
|
|
||||||
* queue preemption.
|
|
||||||
* NOTE: Intra-queue preemptability is inherited from a queue's parent.
|
|
||||||
*
|
|
||||||
* @param q queue to check intra-queue preemption state
|
|
||||||
* @param configuration capacity scheduler config
|
|
||||||
* @return true if queue has intra-queue preemption disabled, false otherwise
|
|
||||||
*/
|
|
||||||
private boolean isIntraQueueHierarchyPreemptionDisabled(CSQueue q,
|
|
||||||
CapacitySchedulerConfiguration configuration) {
|
|
||||||
boolean systemWideIntraQueuePreemption =
|
|
||||||
configuration.getBoolean(
|
|
||||||
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
|
|
||||||
CapacitySchedulerConfiguration
|
|
||||||
.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
|
|
||||||
// Intra-queue preemption is disabled for this queue if the system-wide
|
|
||||||
// intra-queue preemption flag is false
|
|
||||||
if (!systemWideIntraQueuePreemption) return true;
|
|
||||||
|
|
||||||
// Check if this is the root queue and the root queue's intra-queue
|
|
||||||
// preemption disable switch is set
|
|
||||||
CSQueue parentQ = q.getParent();
|
|
||||||
if (parentQ == null) {
|
|
||||||
return configuration
|
|
||||||
.getIntraQueuePreemptionDisabled(q.getQueuePath(), false);
|
|
||||||
}
|
|
||||||
|
|
||||||
// At this point, the master preemption switch is enabled down to this
|
|
||||||
// queue's level. Determine whether or not intra-queue preemption is enabled
|
|
||||||
// down to this queu's level and return that value.
|
|
||||||
return configuration.getIntraQueuePreemptionDisabled(q.getQueuePath(),
|
|
||||||
parentQ.getIntraQueuePreemptionDisabledInHierarchy());
|
|
||||||
}
|
|
||||||
|
|
||||||
private Resource getCurrentLimitResource(String nodePartition,
|
private Resource getCurrentLimitResource(String nodePartition,
|
||||||
Resource clusterResource, ResourceLimits currentResourceLimits,
|
Resource clusterResource, ResourceLimits currentResourceLimits,
|
||||||
SchedulingMode schedulingMode) {
|
SchedulingMode schedulingMode) {
|
||||||
|
@ -276,21 +276,7 @@ public void attachContainer(Resource clusterResource,
|
|||||||
* @return true if <em>disable_preemption</em> is set, false if not
|
* @return true if <em>disable_preemption</em> is set, false if not
|
||||||
*/
|
*/
|
||||||
public boolean getPreemptionDisabled();
|
public boolean getPreemptionDisabled();
|
||||||
|
|
||||||
/**
|
|
||||||
* Check whether intra-queue preemption is disabled for this queue
|
|
||||||
* @return true if either intra-queue preemption or inter-queue preemption
|
|
||||||
* is disabled for this queue, false if neither is disabled.
|
|
||||||
*/
|
|
||||||
public boolean getIntraQueuePreemptionDisabled();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Determines whether or not the intra-queue preemption disabled switch is set
|
|
||||||
* at any level in this queue's hierarchy.
|
|
||||||
* @return state of the intra-queue preemption switch at this queue level
|
|
||||||
*/
|
|
||||||
public boolean getIntraQueuePreemptionDisabledInHierarchy();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get QueueCapacities of this queue
|
* Get QueueCapacities of this queue
|
||||||
* @return queueCapacities
|
* @return queueCapacities
|
||||||
|
@ -1215,21 +1215,6 @@ public boolean getPreemptionDisabled(String queue, boolean defaultVal) {
|
|||||||
return preemptionDisabled;
|
return preemptionDisabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Indicates whether intra-queue preemption is disabled on the specified queue
|
|
||||||
*
|
|
||||||
* @param queue queue path to query
|
|
||||||
* @param defaultVal used as default if the property is not set in the
|
|
||||||
* configuration
|
|
||||||
* @return true if preemption is disabled on queue, false otherwise
|
|
||||||
*/
|
|
||||||
public boolean getIntraQueuePreemptionDisabled(String queue,
|
|
||||||
boolean defaultVal) {
|
|
||||||
return
|
|
||||||
getBoolean(getQueuePrefix(queue) + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX
|
|
||||||
+ QUEUE_PREEMPTION_DISABLED, defaultVal);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get configured node labels in a given queuePath
|
* Get configured node labels in a given queuePath
|
||||||
*/
|
*/
|
||||||
|
@ -200,10 +200,7 @@ private void renderCommonLeafQueueInfo(ResponseInfo ri) {
|
|||||||
__("Configured User Limit Factor:", lqinfo.getUserLimitFactor()).
|
__("Configured User Limit Factor:", lqinfo.getUserLimitFactor()).
|
||||||
__("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())).
|
__("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())).
|
||||||
__("Ordering Policy: ", lqinfo.getOrderingPolicyInfo()).
|
__("Ordering Policy: ", lqinfo.getOrderingPolicyInfo()).
|
||||||
__("Preemption:",
|
__("Preemption:", lqinfo.getPreemptionDisabled() ? "disabled" : "enabled").
|
||||||
lqinfo.getPreemptionDisabled() ? "disabled" : "enabled").
|
|
||||||
__("Intra-queue Preemption:", lqinfo.getIntraQueuePreemptionDisabled()
|
|
||||||
? "disabled" : "enabled").
|
|
||||||
__("Default Node Label Expression:",
|
__("Default Node Label Expression:",
|
||||||
lqinfo.getDefaultNodeLabelExpression() == null
|
lqinfo.getDefaultNodeLabelExpression() == null
|
||||||
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION
|
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION
|
||||||
|
@ -49,7 +49,6 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
|||||||
protected ResourceInfo usedAMResource;
|
protected ResourceInfo usedAMResource;
|
||||||
protected ResourceInfo userAMResourceLimit;
|
protected ResourceInfo userAMResourceLimit;
|
||||||
protected boolean preemptionDisabled;
|
protected boolean preemptionDisabled;
|
||||||
protected boolean intraQueuePreemptionDisabled;
|
|
||||||
protected String defaultNodeLabelExpression;
|
protected String defaultNodeLabelExpression;
|
||||||
protected int defaultPriority;
|
protected int defaultPriority;
|
||||||
protected boolean isAutoCreatedLeafQueue;
|
protected boolean isAutoCreatedLeafQueue;
|
||||||
@ -73,7 +72,6 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
|||||||
AMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
|
AMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
|
||||||
usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());
|
usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());
|
||||||
preemptionDisabled = q.getPreemptionDisabled();
|
preemptionDisabled = q.getPreemptionDisabled();
|
||||||
intraQueuePreemptionDisabled = q.getIntraQueuePreemptionDisabled();
|
|
||||||
orderingPolicyInfo = q.getOrderingPolicy().getInfo();
|
orderingPolicyInfo = q.getOrderingPolicy().getInfo();
|
||||||
defaultNodeLabelExpression = q.getDefaultNodeLabelExpression();
|
defaultNodeLabelExpression = q.getDefaultNodeLabelExpression();
|
||||||
defaultPriority = q.getDefaultApplicationPriority().getPriority();
|
defaultPriority = q.getDefaultApplicationPriority().getPriority();
|
||||||
@ -152,10 +150,6 @@ public ResourceInfo getUserAMResourceLimit() {
|
|||||||
public boolean getPreemptionDisabled() {
|
public boolean getPreemptionDisabled() {
|
||||||
return preemptionDisabled;
|
return preemptionDisabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean getIntraQueuePreemptionDisabled() {
|
|
||||||
return intraQueuePreemptionDisabled;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getOrderingPolicyInfo() {
|
public String getOrderingPolicyInfo() {
|
||||||
return orderingPolicyInfo;
|
return orderingPolicyInfo;
|
||||||
|
@ -67,7 +67,7 @@ public void setUp() throws IOException {
|
|||||||
private void mockQueue(String queueName, MutableConfScheduler scheduler)
|
private void mockQueue(String queueName, MutableConfScheduler scheduler)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
QueueInfo queueInfo = QueueInfo.newInstance(queueName, 0, 0, 0, null, null,
|
QueueInfo queueInfo = QueueInfo.newInstance(queueName, 0, 0, 0, null, null,
|
||||||
null, null, null, null, false, null, false);
|
null, null, null, null, false);
|
||||||
when(scheduler.getQueueInfo(eq(queueName), anyBoolean(), anyBoolean()))
|
when(scheduler.getQueueInfo(eq(queueName), anyBoolean(), anyBoolean()))
|
||||||
.thenReturn(queueInfo);
|
.thenReturn(queueInfo);
|
||||||
Queue queue = mock(Queue.class);
|
Queue queue = mock(Queue.class);
|
||||||
|
@ -165,7 +165,7 @@ private Queue createQueue(String name, Queue parent) {
|
|||||||
private Queue createQueue(String name, Queue parent, float capacity) {
|
private Queue createQueue(String name, Queue parent, float capacity) {
|
||||||
QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf);
|
QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf);
|
||||||
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
|
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
|
||||||
null, QueueState.RUNNING, null, "", null, false, null, false);
|
null, QueueState.RUNNING, null, "", null, false);
|
||||||
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
|
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
|
||||||
Queue queue = mock(Queue.class);
|
Queue queue = mock(Queue.class);
|
||||||
when(queue.getMetrics()).thenReturn(metrics);
|
when(queue.getMetrics()).thenReturn(metrics);
|
||||||
|
@ -4103,7 +4103,7 @@ private AbstractCSQueue createQueue(String name, Queue parent, float capacity,
|
|||||||
float absCap, Resource res) {
|
float absCap, Resource res) {
|
||||||
CSQueueMetrics metrics = CSQueueMetrics.forQueue(name, parent, false, cs.getConf());
|
CSQueueMetrics metrics = CSQueueMetrics.forQueue(name, parent, false, cs.getConf());
|
||||||
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
|
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
|
||||||
null, QueueState.RUNNING, null, "", null, false, null, false);
|
null, QueueState.RUNNING, null, "", null, false);
|
||||||
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
|
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
|
||||||
AbstractCSQueue queue = mock(AbstractCSQueue.class);
|
AbstractCSQueue queue = mock(AbstractCSQueue.class);
|
||||||
when(queue.getMetrics()).thenReturn(metrics);
|
when(queue.getMetrics()).thenReturn(metrics);
|
||||||
|
@ -236,7 +236,6 @@ The following configuration parameters can be configured in yarn-site.xml to con
|
|||||||
| Property | Description |
|
| Property | Description |
|
||||||
|:---- |:---- |
|
|:---- |:---- |
|
||||||
| `yarn.scheduler.capacity.<queue-path>.disable_preemption` | This configuration can be set to `true` to selectively disable preemption of application containers submitted to a given queue. This property applies only when system wide preemption is enabled by configuring `yarn.resourcemanager.scheduler.monitor.enable` to *true* and `yarn.resourcemanager.scheduler.monitor.policies` to *ProportionalCapacityPreemptionPolicy*. If this property is not set for a queue, then the property value is inherited from the queue's parent. Default value is false.
|
| `yarn.scheduler.capacity.<queue-path>.disable_preemption` | This configuration can be set to `true` to selectively disable preemption of application containers submitted to a given queue. This property applies only when system wide preemption is enabled by configuring `yarn.resourcemanager.scheduler.monitor.enable` to *true* and `yarn.resourcemanager.scheduler.monitor.policies` to *ProportionalCapacityPreemptionPolicy*. If this property is not set for a queue, then the property value is inherited from the queue's parent. Default value is false.
|
||||||
| `yarn.scheduler.capacity.<queue-path>.intra-queue-preemption.disable_preemption` | This configuration can be set to *true* to selectively disable intra-queue preemption of application containers submitted to a given queue. This property applies only when system wide preemption is enabled by configuring `yarn.resourcemanager.scheduler.monitor.enable` to *true*, `yarn.resourcemanager.scheduler.monitor.policies` to *ProportionalCapacityPreemptionPolicy*, and `yarn.resourcemanager.monitor.capacity.preemption.intra-queue-preemption.enabled` to *true*. If this property is not set for a queue, then the property value is inherited from the queue's parent. Default value is *false*.
|
|
||||||
|
|
||||||
###Reservation Properties
|
###Reservation Properties
|
||||||
|
|
||||||
@ -478,4 +477,4 @@ Updating a Container (Experimental - API may change in the future)
|
|||||||
|
|
||||||
The **DECREASE_RESOURCE** and **DEMOTE_EXECUTION_TYPE** container updates are automatic - the AM does not explicitly have to ask the NM to decrease the resources of the container. The other update types require the AM to explicitly ask the NM to update the container.
|
The **DECREASE_RESOURCE** and **DEMOTE_EXECUTION_TYPE** container updates are automatic - the AM does not explicitly have to ask the NM to decrease the resources of the container. The other update types require the AM to explicitly ask the NM to update the container.
|
||||||
|
|
||||||
If the **yarn.resourcemanager.auto-update.containers** configuration parameter is set to **true** (false by default), The RM will ensure that all container updates are automatic.
|
If the **yarn.resourcemanager.auto-update.containers** configuration parameter is set to **true** (false by default), The RM will ensure that all container updates are automatic.
|
Loading…
Reference in New Issue
Block a user