From 94e2e78ab751d1da775d0acf6e5ee6c0694ed746 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Wed, 11 Dec 2013 15:13:46 +0000 Subject: [PATCH] YARN-1481. Move internal services logic from AdminService to ResourceManager. (vinodkv via kasha) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1550167 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../server/resourcemanager/AdminService.java | 77 +++------------- .../server/resourcemanager/RMContext.java | 7 +- .../server/resourcemanager/RMContextImpl.java | 30 ++++++- .../resourcemanager/ResourceManager.java | 89 ++++++++++++++----- 5 files changed, 120 insertions(+), 86 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c0d60d51cb..7edd4bba46 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -154,6 +154,9 @@ Release 2.4.0 - UNRELEASED YARN-1378. Implemented a cleaner of old finished applications from the RM state-store. (Jian He via vinodkv) + YARN-1481. Move internal services logic from AdminService to ResourceManager. + (vinodkv via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index e78f002f88..18ffee595e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -21,8 +21,6 @@ import java.io.IOException; import java.net.InetSocketAddress; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.BlockingService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,7 +41,6 @@ import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -66,6 +63,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; +import com.google.protobuf.BlockingService; + public class AdminService extends AbstractService implements HAServiceProtocol, ResourceManagerAdministrationProtocol { @@ -73,10 +72,6 @@ public class AdminService extends AbstractService implements private final RMContext rmContext; private final ResourceManager rm; - @VisibleForTesting - protected HAServiceProtocol.HAServiceState - haState = HAServiceProtocol.HAServiceState.INITIALIZING; - boolean haEnabled; private Server server; private InetSocketAddress masterServiceAddress; @@ -93,13 +88,6 @@ public AdminService(ResourceManager rm, RMContext rmContext) { @Override public synchronized void serviceInit(Configuration conf) throws Exception { - haEnabled = HAUtil.isHAEnabled(conf); - if (haEnabled) { - HAUtil.verifyAndSetConfiguration(conf); - rm.setConf(conf); - } - rm.createAndInitActiveServices(); - masterServiceAddress = conf.getSocketAddr( YarnConfiguration.RM_ADMIN_ADDRESS, YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, @@ -112,11 +100,6 @@ public synchronized void serviceInit(Configuration conf) throws Exception { @Override protected synchronized void serviceStart() throws Exception { - if (haEnabled) { - transitionToStandby(true); - } else { - transitionToActive(); - } startServer(); super.serviceStart(); } @@ -124,8 +107,6 @@ protected synchronized void serviceStart() throws Exception { @Override protected synchronized void serviceStop() throws Exception { stopServer(); - transitionToStandby(false); - haState = HAServiceState.STOPPING; super.serviceStop(); } @@ -145,7 +126,7 @@ protected void startServer() throws Exception { refreshServiceAcls(conf, new RMPolicyProvider()); } - if (haEnabled) { + if (rmContext.isHAEnabled()) { RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, ProtobufRpcEngine.class); @@ -182,39 +163,27 @@ private UserGroupInformation checkAcls(String method) throws YarnException { } private synchronized boolean isRMActive() { - return HAServiceState.ACTIVE == haState; + return HAServiceState.ACTIVE == rmContext.getHAServiceState(); } @Override public synchronized void monitorHealth() throws IOException { checkAccess("monitorHealth"); - if (haState == HAServiceProtocol.HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) { + if (isRMActive() && !rm.areActiveServicesRunning()) { throw new HealthCheckFailedException( "Active ResourceManager services are not running!"); } } - synchronized void transitionToActive() throws Exception { - if (haState == HAServiceProtocol.HAServiceState.ACTIVE) { - LOG.info("Already in active state"); - return; - } - - LOG.info("Transitioning to active"); - rm.startActiveServices(); - haState = HAServiceProtocol.HAServiceState.ACTIVE; - LOG.info("Transitioned to active"); - } - @Override - public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo) - throws IOException { + public synchronized void transitionToActive( + HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException { UserGroupInformation user = checkAccess("transitionToActive"); // TODO (YARN-1177): When automatic failover is enabled, // check if transition should be allowed for this request try { - transitionToActive(); + rm.transitionToActive(); RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive", "RMHAProtocolService"); } catch (Exception e) { @@ -226,32 +195,14 @@ public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequest } } - synchronized void transitionToStandby(boolean initialize) - throws Exception { - if (haState == HAServiceProtocol.HAServiceState.STANDBY) { - LOG.info("Already in standby state"); - return; - } - - LOG.info("Transitioning to standby"); - if (haState == HAServiceProtocol.HAServiceState.ACTIVE) { - rm.stopActiveServices(); - if (initialize) { - rm.createAndInitActiveServices(); - } - } - haState = HAServiceProtocol.HAServiceState.STANDBY; - LOG.info("Transitioned to standby"); - } - @Override - public synchronized void transitionToStandby(HAServiceProtocol.StateChangeRequestInfo reqInfo) - throws IOException { + public synchronized void transitionToStandby( + HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException { UserGroupInformation user = checkAccess("transitionToStandby"); // TODO (YARN-1177): When automatic failover is enabled, // check if transition should be allowed for this request try { - transitionToStandby(true); + rm.transitionToStandby(true); RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToStandby", "RMHAProtocolService"); } catch (Exception e) { @@ -266,15 +217,15 @@ public synchronized void transitionToStandby(HAServiceProtocol.StateChangeReques @Override public synchronized HAServiceStatus getServiceStatus() throws IOException { checkAccess("getServiceState"); + HAServiceState haState = rmContext.getHAServiceState(); HAServiceStatus ret = new HAServiceStatus(haState); - if (haState == HAServiceProtocol.HAServiceState.ACTIVE || haState == - HAServiceProtocol.HAServiceState.STANDBY) { + if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) { ret.setReadyToBecomeActive(); } else { ret.setNotReadyToBecomeActive("State is " + haState); } return ret; - } + } @Override public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index a93fcb13d7..1ddb1b48a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Dispatcher; @@ -42,7 +43,11 @@ public interface RMContext { Dispatcher getDispatcher(); - + + boolean isHAEnabled(); + + HAServiceState getHAServiceState(); + RMStateStore getStateStore(); ConcurrentMap getRMApps(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index f98b49efc8..ec90b4a27d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -21,6 +21,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -35,8 +37,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import com.google.common.annotations.VisibleForTesting; @@ -54,6 +56,10 @@ public class RMContextImpl implements RMContext { private final ConcurrentMap inactiveNodes = new ConcurrentHashMap(); + private boolean isHAEnabled; + private HAServiceState haServiceState = + HAServiceProtocol.HAServiceState.INITIALIZING; + private AMLivelinessMonitor amLivelinessMonitor; private AMLivelinessMonitor amFinishingMonitor; private RMStateStore stateStore = null; @@ -211,6 +217,16 @@ public ResourceTrackerService getResourceTrackerService() { return resourceTrackerService; } + void setHAEnabled(boolean isHAEnabled) { + this.isHAEnabled = isHAEnabled; + } + + void setHAServiceState(HAServiceState haServiceState) { + synchronized (haServiceState) { + this.haServiceState = haServiceState; + } + } + void setDispatcher(Dispatcher dispatcher) { this.rmDispatcher = dispatcher; } @@ -290,4 +306,16 @@ void setResourceTrackerService( ResourceTrackerService resourceTrackerService) { this.resourceTrackerService = resourceTrackerService; } + + @Override + public boolean isHAEnabled() { + return isHAEnabled; + } + + @Override + public HAServiceState getHAServiceState() { + synchronized (haServiceState) { + return haServiceState; + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 597d18c112..e44207796e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -27,6 +27,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig.Policy; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -188,6 +191,12 @@ protected void serviceInit(Configuration conf) throws Exception { addService(adminService); rmContext.setRMAdminService(adminService); + this.rmContext.setHAEnabled(HAUtil.isHAEnabled(conf)); + if (this.rmContext.isHAEnabled()) { + HAUtil.verifyAndSetConfiguration(conf); + } + createAndInitActiveServices(); + super.serviceInit(conf); } @@ -217,9 +226,8 @@ protected EventHandler createSchedulerEventDispatcher() { } protected RMStateStoreOperationFailedEventDispatcher - createRMStateStoreOperationFailedEventDispatcher() { - return new RMStateStoreOperationFailedEventDispatcher( - rmContext.getRMAdminService()); + createRMStateStoreOperationFailedEventDispatcher() { + return new RMStateStoreOperationFailedEventDispatcher(rmContext, this); } protected Dispatcher createDispatcher() { @@ -655,11 +663,14 @@ public void handle(SchedulerEvent event) { @Private public static class RMStateStoreOperationFailedEventDispatcher implements EventHandler { - private final AdminService adminService; - public RMStateStoreOperationFailedEventDispatcher( - AdminService adminService) { - this.adminService = adminService; + private final RMContext rmContext; + private final ResourceManager rm; + + public RMStateStoreOperationFailedEventDispatcher(RMContext rmContext, + ResourceManager resourceManager) { + this.rmContext = rmContext; + this.rm = resourceManager; } @Override @@ -671,16 +682,14 @@ public void handle(RMStateStoreOperationFailedEvent event) { } if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) { LOG.info("RMStateStore has been fenced"); - synchronized(adminService) { - if (adminService.haEnabled) { - try { - // Transition to standby and reinit active services - LOG.info("Transitioning RM to Standby mode"); - adminService.transitionToStandby(true); - return; - } catch (Exception e) { - LOG.error("Failed to transition RM to Standby mode."); - } + if (rmContext.isHAEnabled()) { + try { + // Transition to standby and reinit active services + LOG.info("Transitioning RM to Standby mode"); + rm.transitionToStandby(true); + return; + } catch (Exception e) { + LOG.error("Failed to transition RM to Standby mode."); } } } @@ -826,10 +835,6 @@ protected void startWepApp() { webApp = builder.start(new RMWebApp(this)); } - void setConf(Configuration configuration) { - conf = configuration; - } - /** * Helper method to create and init {@link #activeServices}. This creates an * instance of {@link RMActiveServices} and initializes it. @@ -870,6 +875,39 @@ protected boolean areActiveServicesRunning() { return activeServices != null && activeServices.isInState(STATE.STARTED); } + synchronized void transitionToActive() throws Exception { + if (rmContext.getHAServiceState() == + HAServiceProtocol.HAServiceState.ACTIVE) { + LOG.info("Already in active state"); + return; + } + + LOG.info("Transitioning to active state"); + startActiveServices(); + rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE); + LOG.info("Transitioned to active state"); + } + + synchronized void transitionToStandby(boolean initialize) + throws Exception { + if (rmContext.getHAServiceState() == + HAServiceProtocol.HAServiceState.STANDBY) { + LOG.info("Already in standby state"); + return; + } + + LOG.info("Transitioning to standby state"); + if (rmContext.getHAServiceState() == + HAServiceProtocol.HAServiceState.ACTIVE) { + stopActiveServices(); + if (initialize) { + createAndInitActiveServices(); + } + } + rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY); + LOG.info("Transitioned to standby state"); + } + @Override protected void serviceStart() throws Exception { try { @@ -877,6 +915,13 @@ protected void serviceStart() throws Exception { } catch(IOException ie) { throw new YarnRuntimeException("Failed to login", ie); } + + if (this.rmContext.isHAEnabled()) { + transitionToStandby(true); + } else { + transitionToActive(); + } + super.serviceStart(); } @@ -888,6 +933,8 @@ protected void doSecureLogin() throws IOException { @Override protected void serviceStop() throws Exception { super.serviceStop(); + transitionToStandby(false); + rmContext.setHAServiceState(HAServiceState.STOPPING); } protected ResourceTrackerService createResourceTrackerService() {