diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 4d4ad5d957..2ec03aafb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -47,7 +47,6 @@ import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.DecommissionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.conf.HAUtil; @@ -319,15 +318,7 @@ public synchronized void transitionToActive( UserGroupInformation user = checkAccess("transitionToActive"); checkHaStateChange(reqInfo); - try { - rm.transitionToActive(); - } catch (Exception e) { - RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", - "", "RM", - "Exception transitioning to active"); - throw new ServiceFailedException( - "Error when transitioning to Active mode", e); - } + try { // call all refresh*s for active RM to get the updated configurations. refreshAll(); @@ -337,10 +328,22 @@ public synchronized void transitionToActive( .getDispatcher() .getEventHandler() .handle( - new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e)); + new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, + e)); throw new ServiceFailedException( - "Error on refreshAll during transistion to Active", e); + "Error on refreshAll during transition to Active", e); } + + try { + rm.transitionToActive(); + } catch (Exception e) { + RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", + "", "RM", + "Exception transitioning to active"); + throw new ServiceFailedException( + "Error when transitioning to Active mode", e); + } + RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive", "RM"); } @@ -408,12 +411,7 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) RefreshQueuesResponse response = recordFactory.newRecordInstance(RefreshQueuesResponse.class); try { - rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); - // refresh the reservation system - ReservationSystem rSystem = rmContext.getReservationSystem(); - if (rSystem != null) { - rSystem.reinitialize(getConfig(), rmContext); - } + refreshQueues(); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); return response; @@ -422,6 +420,15 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) } } + private void refreshQueues() throws IOException, YarnException { + rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); + // refresh the reservation system + ReservationSystem rSystem = rmContext.getReservationSystem(); + if (rSystem != null) { + rSystem.reinitialize(getConfig(), rmContext); + } + } + @Override public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) throws YarnException, StandbyException { @@ -454,6 +461,13 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) } } + private void refreshNodes() throws IOException, YarnException { + Configuration conf = + getConfiguration(new Configuration(false), + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + rmContext.getNodesListManager().refreshNodes(conf); + } + @Override public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( RefreshSuperUserGroupsConfigurationRequest request) @@ -464,6 +478,16 @@ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfigu checkRMStatus(user.getShortUserName(), operation, "refresh super-user-groups."); + refreshSuperUserGroupsConfiguration(); + RMAuditLogger.logSuccess(user.getShortUserName(), + operation, "AdminService"); + + return recordFactory.newRecordInstance( + RefreshSuperUserGroupsConfigurationResponse.class); + } + + private void refreshSuperUserGroupsConfiguration() + throws IOException, YarnException { // Accept hadoop common configs in core-site.xml as well as RM specific // configurations in yarn-site.xml Configuration conf = @@ -472,11 +496,6 @@ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfigu YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); RMServerUtils.processRMProxyUsersConf(conf); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); - RMAuditLogger.logSuccess(user.getShortUserName(), - operation, "AdminService"); - - return recordFactory.newRecordInstance( - RefreshSuperUserGroupsConfigurationResponse.class); } @Override @@ -488,10 +507,7 @@ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( checkRMStatus(user.getShortUserName(), operation, "refresh user-groups."); - Groups.getUserToGroupsMappingService( - getConfiguration(new Configuration(false), - YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh(); - + refreshUserToGroupsMappings(); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -499,6 +515,12 @@ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( RefreshUserToGroupsMappingsResponse.class); } + private void refreshUserToGroupsMappings() throws IOException, YarnException { + Groups.getUserToGroupsMappingService( + getConfiguration(new Configuration(false), + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh(); + } + @Override public RefreshAdminAclsResponse refreshAdminAcls( RefreshAdminAclsRequest request) throws YarnException, IOException { @@ -541,6 +563,14 @@ public RefreshServiceAclsResponse refreshServiceAcls( checkRMStatus(user.getShortUserName(), operation, "refresh Service ACLs."); + refreshServiceAcls(); + RMAuditLogger.logSuccess(user.getShortUserName(), operation, + "AdminService"); + + return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class); + } + + private void refreshServiceAcls() throws IOException, YarnException { PolicyProvider policyProvider = RMPolicyProvider.getInstance(); Configuration conf = getConfiguration(new Configuration(false), @@ -552,11 +582,6 @@ public RefreshServiceAclsResponse refreshServiceAcls( conf, policyProvider); rmContext.getResourceTrackerService().refreshServiceAcls( conf, policyProvider); - - RMAuditLogger.logSuccess(user.getShortUserName(), operation, - "AdminService"); - - return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class); } private synchronized void refreshServiceAcls(Configuration configuration, @@ -691,18 +716,17 @@ private synchronized Configuration getConfiguration(Configuration conf, @VisibleForTesting void refreshAll() throws ServiceFailedException { try { - refreshQueues(RefreshQueuesRequest.newInstance()); - refreshNodes(RefreshNodesRequest.newInstance(DecommissionType.NORMAL)); - refreshSuperUserGroupsConfiguration( - RefreshSuperUserGroupsConfigurationRequest.newInstance()); - refreshUserToGroupsMappings( - RefreshUserToGroupsMappingsRequest.newInstance()); + checkAcls("refreshAll"); + refreshQueues(); + refreshNodes(); + refreshSuperUserGroupsConfiguration(); + refreshUserToGroupsMappings(); if (getConfig().getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { - refreshServiceAcls(RefreshServiceAclsRequest.newInstance()); + refreshServiceAcls(); } - refreshClusterMaxPriority(RefreshClusterMaxPriorityRequest.newInstance()); + refreshClusterMaxPriority(); } catch (Exception ex) { throw new ServiceFailedException(ex.getMessage()); } @@ -839,11 +863,7 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( checkRMStatus(user.getShortUserName(), operation, msg); try { - Configuration conf = - getConfiguration(new Configuration(false), - YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - - rmContext.getScheduler().setClusterMaxPriority(conf); + refreshClusterMaxPriority(); RMAuditLogger .logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -854,6 +874,14 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( } } + private void refreshClusterMaxPriority() throws IOException, YarnException { + Configuration conf = + getConfiguration(new Configuration(false), + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + + rmContext.getScheduler().setClusterMaxPriority(conf); + } + public String getHAZookeeperConnectionState() { if (!rmContext.isHAEnabled()) { return "ResourceManager HA is not enabled."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index dab7312af4..87670c4f80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetworkTopology; @@ -74,9 +75,13 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -84,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -4648,4 +4654,67 @@ public void testContainerAllocationWithContainerIdLeap() throws Exception { long reservedId = reservedContainer1.getContainerId().getContainerId(); assertEquals(reservedId + 1, maxId); } + + @Test(timeout = 120000) + public void testRefreshQueuesWhenRMHA() throws Exception { + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); + conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + HAServiceProtocol.StateChangeRequestInfo requestInfo = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + // 1. start a standby RM, file 'ALLOC_FILE' is empty, so there is no queues + MockRM rm1 = new MockRM(conf, null); + rm1.init(conf); + rm1.start(); + rm1.getAdminService().transitionToStandby(requestInfo); + + // 2. add a new queue "test_queue" + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" 3"); + out.println(""); + out.println(""); + out.close(); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // 3. start a active RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.init(conf); + rm2.start(); + + MockNM nm = + new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + nm.registerNode(); + + rm2.getAdminService().transitionToActive(requestInfo); + + // 4. submit a app to the new added queue "test_queue" + RMApp app = rm2.submitApp(200, "test_app", "user", null, "test_queue"); + RMAppAttempt attempt0 = app.getCurrentAppAttempt(); + nm.nodeHeartbeat(true); + MockAM am0 = rm2.sendAMLaunched(attempt0.getAppAttemptId()); + am0.registerAppAttempt(); + assertEquals("root.test_queue", app.getQueue()); + + // 5. Transit rm1 to active, recover app + ((RMContextImpl)rm1.getRMContext()).setStateStore(memStore); + rm1.getAdminService().transitionToActive(requestInfo); + rm1.drainEvents(); + assertEquals(1, rm1.getRMContext().getRMApps().size()); + RMApp recoveredApp = + rm1.getRMContext().getRMApps().values().iterator().next(); + assertEquals("root.test_queue", recoveredApp.getQueue()); + + rm1.stop(); + rm2.stop(); + } }