diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 651d0e9d39..67b676bea5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -1244,4 +1244,19 @@ public abstract class AbstractCSQueue implements CSQueue { public Map getUserWeights() { return userWeights; } + + public void recoverDrainingState() { + try { + this.writeLock.lock(); + if (getState() == QueueState.STOPPED) { + updateQueueState(QueueState.DRAINING); + } + LOG.info("Recover draining state for queue " + this.getQueuePath()); + if (getParent() != null && getParent().getState() == QueueState.STOPPED) { + ((AbstractCSQueue) getParent()).recoverDrainingState(); + } + } finally { + this.writeLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 1d6c104a43..162d3bb99c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -808,6 +809,12 @@ public class CapacityScheduler extends throw new QueueInvalidException(queueErrorMsg); } } + // When recovering apps in this queue but queue is in STOPPED state, + // that means its previous state was DRAINING. So we auto transit + // the state to DRAINING for recovery. + if (queue.getState() == QueueState.STOPPED) { + ((LeafQueue) queue).recoverDrainingState(); + } // Submit to the queue try { queue.submitApplication(applicationId, user, queueName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java index 9f2933efac..0a39e99b38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java @@ -32,7 +32,12 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -197,4 +202,59 @@ public class TestQueueState { .thenCallRealMethod(); return application; } + + @Test (timeout = 30000) + public void testRecoverDrainingStateAfterRMRestart() throws Exception { + // init conf + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(); + newConf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + newConf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + false); + newConf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + newConf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1); + newConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{Q1}); + newConf.setQueues(Q1_PATH, new String[]{Q2}); + newConf.setCapacity(Q1_PATH, 100); + newConf.setCapacity(Q2_PATH, 100); + + // init state store + MemoryRMStateStore newMemStore = new MemoryRMStateStore(); + newMemStore.init(newConf); + // init RM & NMs & Nodes + MockRM rm = new MockRM(newConf, newMemStore); + rm.start(); + MockNM nm = rm.registerNode("h1:1234", 204800); + + // submit an app, AM is running on nm1 + RMApp app = rm.submitApp(1024, "appname", "appuser", null, Q2); + MockRM.launchAM(app, rm, nm); + rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); + // update queue state to STOPPED + newConf.setState(Q1_PATH, QueueState.STOPPED); + CapacityScheduler capacityScheduler = + (CapacityScheduler) rm.getRMContext().getScheduler(); + capacityScheduler.reinitialize(newConf, rm.getRMContext()); + // current queue state should be DRAINING + Assert.assertEquals(QueueState.DRAINING, + capacityScheduler.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.DRAINING, + capacityScheduler.getQueue(Q1).getState()); + + // RM restart + rm = new MockRM(newConf, newMemStore); + rm.start(); + rm.registerNode("h1:1234", 204800); + + // queue state should be DRAINING after app recovered + rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); + capacityScheduler = (CapacityScheduler) rm.getRMContext().getScheduler(); + Assert.assertEquals(QueueState.DRAINING, + capacityScheduler.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.DRAINING, + capacityScheduler.getQueue(Q1).getState()); + + // close rm + rm.close(); + } }