diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 9aeaec679b..7c918a5362 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -88,4 +88,10 @@ public interface CapacitySchedulerContext { * @return Max Cluster level App priority. */ Priority getMaxClusterLevelAppPriority(); + + /** + * Returns if configuration is mutable. + * @return if configuration is mutable + */ + boolean isConfigurationMutable(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 1ceb6fb3ea..48c289f0cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueState; @@ -170,8 +171,14 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf) CSQueue newRoot = parseQueue(this.csContext, newConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP); - // Ensure queue hiearchy in the new XML file is proper. - validateQueueHierarchy(queues, newQueues); + // When failing over, if using configuration store, don't validate queue + // hierarchy since queues can be removed without being STOPPED. + if (!csContext.isConfigurationMutable() || + csContext.getRMContext().getHAServiceState() + != HAServiceProtocol.HAServiceState.STANDBY) { + // Ensure queue hiearchy in the new XML file is proper. + validateQueueHierarchy(queues, newQueues); + } // Add new queues and delete OldQeueus only after validation. updateQueues(queues, newQueues); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java index 355f741803..0cf5e6fa7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java @@ -38,18 +38,21 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; /** * Tests {@link ZKConfigurationStore}. @@ -303,6 +306,105 @@ public void testFailoverReadsFromUpdatedStore() throws Exception { rm2.close(); } + /** + * When failing over, if RM1 stopped and removed a queue that RM2 has in + * memory, failing over to RM2 should not throw an exception. + * @throws Exception + */ + @Test + public void testFailoverAfterRemoveQueue() throws Exception { + HAServiceProtocol.StateChangeRequestInfo req = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234); + ResourceManager rm1 = new MockRM(conf1); + rm1.start(); + rm1.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm1.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); + + Configuration conf2 = createRMHAConf("rm1,rm2", "rm2", 5678); + ResourceManager rm2 = new MockRM(conf2); + rm2.start(); + assertEquals("RM should be Standby", + HAServiceProtocol.HAServiceState.STANDBY, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + UserGroupInformation user = UserGroupInformation + .createUserForTesting(TEST_USER, new String[0]); + MutableConfigurationProvider confProvider = ((MutableConfScheduler) + rm1.getResourceScheduler()).getMutableConfProvider(); + // Add root.a + SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo(); + Map addParams = new HashMap<>(); + addParams.put("capacity", "100"); + QueueConfigInfo addInfo = new QueueConfigInfo("root.a", addParams); + schedConfUpdateInfo.getAddQueueInfo().add(addInfo); + // Stop root.default + Map stopParams = new HashMap<>(); + stopParams.put("state", "STOPPED"); + stopParams.put("capacity", "0"); + QueueConfigInfo stopInfo = new QueueConfigInfo("root.default", stopParams); + schedConfUpdateInfo.getUpdateQueueInfo().add(stopInfo); + confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext()); + confProvider.confirmPendingMutation(true); + assertTrue(Arrays.asList(((MutableConfScheduler) rm1.getResourceScheduler()) + .getConfiguration().get("yarn.scheduler.capacity.root.queues").split + (",")).contains("a")); + + // Remove root.default + schedConfUpdateInfo.getUpdateQueueInfo().clear(); + schedConfUpdateInfo.getAddQueueInfo().clear(); + schedConfUpdateInfo.getRemoveQueueInfo().add("root.default"); + confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext()); + confProvider.confirmPendingMutation(true); + assertEquals("a", ((MutableConfScheduler) rm1.getResourceScheduler()) + .getConfiguration().get("yarn.scheduler.capacity.root.queues")); + + // Start RM2 and verifies it starts with updated configuration + rm2.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm2.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) { + if (HAServiceProtocol.HAServiceState.ACTIVE == + rm1.getRMContext().getRMAdminService().getServiceStatus() + .getState()) { + Thread.sleep(100); + } + } + assertEquals("RM should have been fenced", + HAServiceProtocol.HAServiceState.STANDBY, + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + assertEquals("a", ((MutableCSConfigurationProvider) ( + (CapacityScheduler) rm2.getResourceScheduler()) + .getMutableConfProvider()).getConfStore().retrieve() + .get("yarn.scheduler.capacity.root.queues")); + assertEquals("a", ((MutableConfScheduler) rm2.getResourceScheduler()) + .getConfiguration().get("yarn.scheduler.capacity.root.queues")); + // Transition to standby will set RM's HA status and then reinitialize in + // a separate thread. Despite asserting for STANDBY state, it's + // possible for reinitialization to be unfinished. Wait here for it to + // finish, otherwise closing rm1 will close zkManager and the unfinished + // reinitialization will throw an exception. + Thread.sleep(10000); + rm1.close(); + rm2.close(); + } + @Override public YarnConfigurationStore createConfStore() { return new ZKConfigurationStore();