From 7faa406f27f687844c941346f59a27a375af3233 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Wed, 16 Dec 2015 13:19:40 -0800 Subject: [PATCH] YARN-4225. Add preemption status to yarn queue -status for capacity scheduler. (Eric Payne via wangda) --- hadoop-yarn-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 5 + .../hadoop/yarn/api/records/QueueInfo.java | 16 ++- .../src/main/proto/yarn_protos.proto | 1 + .../hadoop/yarn/client/cli/QueueCLI.java | 6 ++ .../yarn/client/ProtocolHATestBase.java | 2 +- .../hadoop/yarn/client/cli/TestYarnCLI.java | 99 ++++++++++++++++++- .../api/records/impl/pb/QueueInfoPBImpl.java | 13 +++ .../hadoop/yarn/api/TestPBImplRecords.java | 2 +- .../scheduler/capacity/AbstractCSQueue.java | 1 + .../TestSchedulerApplicationAttempt.java | 3 +- 11 files changed, 145 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index da1ea55606..e65cbc5572 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1145,6 +1145,9 @@ Release 2.8.0 - UNRELEASED YARN-4452. NPE when submit Unmanaged application. (Naganarasimha G R via junping_du) + YARN-4225. Add preemption status to yarn queue -status for capacity scheduler. + (Eric Payne via wangda) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 133e95d84f..2d0d5d6211 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -500,4 +500,9 @@ + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java index 176af84340..7816febb3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java @@ -55,7 +55,8 @@ public static QueueInfo newInstance(String queueName, float capacity, float maximumCapacity, float currentCapacity, List childQueues, List applications, QueueState queueState, Set accessibleNodeLabels, - String defaultNodeLabelExpression, QueueStatistics queueStatistics) { + String defaultNodeLabelExpression, QueueStatistics queueStatistics, + boolean preemptionDisabled) { QueueInfo queueInfo = Records.newRecord(QueueInfo.class); queueInfo.setQueueName(queueName); queueInfo.setCapacity(capacity); @@ -67,6 +68,7 @@ public static QueueInfo newInstance(String queueName, float capacity, queueInfo.setAccessibleNodeLabels(accessibleNodeLabels); queueInfo.setDefaultNodeLabelExpression(defaultNodeLabelExpression); queueInfo.setQueueStatistics(queueStatistics); + queueInfo.setPreemptionDisabled(preemptionDisabled); return queueInfo; } @@ -205,4 +207,16 @@ public abstract void setDefaultNodeLabelExpression( @Unstable public abstract void setQueueStatistics(QueueStatistics queueStatistics); + /** + * Get the preemption status of the queue. + * @return if property is not in proto, return null; + * otherwise, return preemption status of the queue + */ + @Public + @Stable + public abstract Boolean getPreemptionDisabled(); + + @Private + @Unstable + public abstract void setPreemptionDisabled(boolean preemptionDisabled); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 3c9877bb04..3c208e2989 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -416,6 +416,7 @@ message QueueInfoProto { repeated string accessibleNodeLabels = 8; optional string defaultNodeLabelExpression = 9; optional QueueStatisticsProto queueStatistics = 10; + optional bool preemptionDisabled = 11; } enum QueueACLProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java index b5db536b6f..330b081c73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java @@ -152,5 +152,11 @@ private void printQueueInfo(PrintWriter writer, QueueInfo queueInfo) { labelList.append(nodeLabel); } writer.println(labelList.toString()); + + Boolean preemptStatus = queueInfo.getPreemptionDisabled(); + if (preemptStatus != null) { + writer.print("\tPreemption : "); + writer.println(preemptStatus ? "disabled" : "enabled"); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 75e6cee685..45629b266a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -665,7 +665,7 @@ public List createFakeNodeReports() { public QueueInfo createFakeQueueInfo() { return QueueInfo.newInstance("root", 100f, 100f, 50f, null, - createFakeAppReports(), QueueState.RUNNING, null, null, null); + createFakeAppReports(), QueueState.RUNNING, null, null, null, false); } public List createFakeQueueUserACLInfoList() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 4e6660004e..f30bd40be0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -46,6 +46,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.lang.time.DateFormatUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -69,9 +70,15 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Before; @@ -1324,7 +1331,7 @@ public void testGetQueueInfo() throws Exception { nodeLabels.add("GPU"); nodeLabels.add("JDK_7"); QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f, - null, null, QueueState.RUNNING, nodeLabels, "GPU", null); + null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false); when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo); int result = cli.run(new String[] { "-status", "queueA" }); assertEquals(0, result); @@ -1339,16 +1346,103 @@ public void testGetQueueInfo() throws Exception { pw.println("\tMaximum Capacity : " + "80.0%"); pw.println("\tDefault Node Label expression : " + "GPU"); pw.println("\tAccessible Node Labels : " + "JDK_7,GPU"); + pw.println("\tPreemption : " + "enabled"); pw.close(); String queueInfoStr = baos.toString("UTF-8"); Assert.assertEquals(queueInfoStr, sysOutStream.toString()); } + + @Test + public void testGetQueueInfoPreemptionEnabled() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + ReservationSystemTestUtil.setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + "org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity." + + "ProportionalCapacityPreemptionPolicy"); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + MiniYARNCluster cluster = + new MiniYARNCluster("testReservationAPIs", 2, 1, 1); + + YarnClient yarnClient = null; + try { + cluster.init(conf); + cluster.start(); + final Configuration yarnConf = cluster.getConfig(); + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(yarnConf); + yarnClient.start(); + + QueueCLI cli = new QueueCLI(); + cli.setClient(yarnClient); + cli.setSysOutPrintStream(sysOut); + cli.setSysErrPrintStream(sysErr); + sysOutStream.reset(); + int result = cli.run(new String[] { "-status", "a1" }); + assertEquals(0, result); + Assert.assertTrue(sysOutStream.toString() + .contains("Preemption : enabled")); + } finally { + // clean-up + if (yarnClient != null) { + yarnClient.stop(); + } + cluster.stop(); + cluster.close(); + } + } + + @Test + public void testGetQueueInfoPreemptionDisabled() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + ReservationSystemTestUtil.setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + "org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity." + + "ProportionalCapacityPreemptionPolicy"); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + conf.setBoolean( + "yarn.scheduler.capacity.root.a.a1.disable_preemption", true); + MiniYARNCluster cluster = + new MiniYARNCluster("testReservationAPIs", 2, 1, 1); + + YarnClient yarnClient = null; + try { + cluster.init(conf); + cluster.start(); + final Configuration yarnConf = cluster.getConfig(); + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(yarnConf); + yarnClient.start(); + + QueueCLI cli = new QueueCLI(); + cli.setClient(yarnClient); + cli.setSysOutPrintStream(sysOut); + cli.setSysErrPrintStream(sysErr); + sysOutStream.reset(); + int result = cli.run(new String[] { "-status", "a1" }); + assertEquals(0, result); + Assert.assertTrue(sysOutStream.toString() + .contains("Preemption : disabled")); + } finally { + // clean-up + if (yarnClient != null) { + yarnClient.stop(); + } + cluster.stop(); + cluster.close(); + } + } @Test public void testGetQueueInfoWithEmptyNodeLabel() throws Exception { QueueCLI cli = createAndGetQueueCLI(); QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f, - null, null, QueueState.RUNNING, null, null, null); + null, null, QueueState.RUNNING, null, null, null, true); when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo); int result = cli.run(new String[] { "-status", "queueA" }); assertEquals(0, result); @@ -1364,6 +1458,7 @@ public void testGetQueueInfoWithEmptyNodeLabel() throws Exception { pw.println("\tDefault Node Label expression : " + NodeLabel.DEFAULT_NODE_LABEL_PARTITION); pw.println("\tAccessible Node Labels : "); + pw.println("\tPreemption : " + "disabled"); pw.close(); String queueInfoStr = baos.toString("UTF-8"); Assert.assertEquals(queueInfoStr, sysOutStream.toString()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java index 90aecf40d8..605cab161c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java @@ -395,4 +395,17 @@ public void setQueueStatistics(QueueStatistics queueStatistics) { } builder.setQueueStatistics(convertToProtoFormat(queueStatistics)); } + + @Override + public Boolean getPreemptionDisabled() { + QueueInfoProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasPreemptionDisabled()) ? p + .getPreemptionDisabled() : null; + } + + @Override + public void setPreemptionDisabled(boolean preemptionDisabled) { + maybeInitBuilder(); + builder.setPreemptionDisabled(preemptionDisabled); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index db33672f49..b7f5ff739b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -488,7 +488,7 @@ public static void setup() throws Exception { // it is recursive(has sub queues) typeValueCache.put(QueueInfo.class, QueueInfo.newInstance("root", 1.0f, 1.0f, 0.1f, null, null, QueueState.RUNNING, ImmutableSet.of("x", "y"), - "x && y", null)); + "x && y", null, false)); generateByNewInstance(QueueStatistics.class); generateByNewInstance(QueueUserACLInfo.class); generateByNewInstance(YarnClusterMetrics.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index b40ac27211..0d70e90e33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -303,6 +303,7 @@ protected QueueInfo getQueueInfo() { queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression); queueInfo.setCurrentCapacity(getUsedCapacity()); queueInfo.setQueueStatistics(getQueueStatistics()); + queueInfo.setPreemptionDisabled(preemptionDisabled); return queueInfo; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java index 95d7129cfa..ddf3ccf207 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java @@ -149,7 +149,8 @@ private Queue createQueue(String name, Queue parent) { private Queue createQueue(String name, Queue parent, float capacity) { QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf); - QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null, null, QueueState.RUNNING, null, "", null); + QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null, + null, QueueState.RUNNING, null, "", null, false); ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics); Queue queue = mock(Queue.class); when(queue.getMetrics()).thenReturn(metrics);