diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d86d998c4f..76365a8b88 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -27,6 +27,8 @@ Release 2.3.0 - UNRELEASED IMPROVEMENTS YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza) + YARN-1098. Separate out RM services into Always On and Active (Karthik + Kambatla via bikas) OPTIMIZATIONS diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index ee418c1937..8c0b195f70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -107,9 +107,18 @@ public class ResourceManager extends CompositeService implements Recoverable { private static final Log LOG = LogFactory.getLog(ResourceManager.class); public static final long clusterTimeStamp = System.currentTimeMillis(); + /** + * "Active" services. Services that need to run only on the Active RM. + * These services are managed (initialized, started, stopped) by the + * {@link CompositeService} RMActiveServices. + * + * RM is active when (1) HA is disabled, or (2) HA is enabled and the RM is + * in Active state. + */ + protected RMActiveServices activeServices; protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager = new ClientToAMTokenSecretManagerInRM(); - + protected RMContainerTokenSecretManager containerTokenSecretManager; protected NMTokenSecretManagerInRM nmTokenSecretManager; @@ -135,6 +144,8 @@ public class ResourceManager extends CompositeService implements Recoverable { protected ResourceTrackerService resourceTracker; private boolean recoveryEnabled; + /** End of Active services */ + private Configuration conf; public ResourceManager() { @@ -147,137 +158,11 @@ public RMContext getRMContext() { @Override protected void serviceInit(Configuration conf) throws Exception { - validateConfigs(conf); - this.conf = conf; - this.conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); - - this.rmDispatcher = createDispatcher(); - addIfService(this.rmDispatcher); - - this.amRmTokenSecretManager = createAMRMTokenSecretManager(conf); - - this.containerAllocationExpirer = new ContainerAllocationExpirer( - this.rmDispatcher); - addService(this.containerAllocationExpirer); - - AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor(); - addService(amLivelinessMonitor); - - AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); - addService(amFinishingMonitor); - - this.containerTokenSecretManager = createContainerTokenSecretManager(conf); - this.nmTokenSecretManager = createNMTokenSecretManager(conf); - - boolean isRecoveryEnabled = conf.getBoolean( - YarnConfiguration.RECOVERY_ENABLED, - YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); - - RMStateStore rmStore = null; - if(isRecoveryEnabled) { - recoveryEnabled = true; - rmStore = RMStateStoreFactory.getStore(conf); - } else { - recoveryEnabled = false; - rmStore = new NullRMStateStore(); - } - - try { - rmStore.init(conf); - rmStore.setRMDispatcher(rmDispatcher); - } catch (Exception e) { - // the Exception from stateStore.init() needs to be handled for - // HA and we need to give up master status if we got fenced - LOG.error("Failed to init state store", e); - ExitUtil.terminate(1, e); - } - - if (UserGroupInformation.isSecurityEnabled()) { - this.delegationTokenRenewer = createDelegationTokenRenewer(); - } - - this.rmContext = - new RMContextImpl(this.rmDispatcher, rmStore, - this.containerAllocationExpirer, amLivelinessMonitor, - amFinishingMonitor, delegationTokenRenewer, this.amRmTokenSecretManager, - this.containerTokenSecretManager, this.nmTokenSecretManager, - this.clientToAMSecretManager); - - // Register event handler for NodesListManager - this.nodesListManager = new NodesListManager(this.rmContext); - this.rmDispatcher.register(NodesListManagerEventType.class, - this.nodesListManager); - addService(nodesListManager); - - // Initialize the scheduler - this.scheduler = createScheduler(); - this.schedulerDispatcher = createSchedulerEventDispatcher(); - addIfService(this.schedulerDispatcher); - this.rmDispatcher.register(SchedulerEventType.class, - this.schedulerDispatcher); - - // Register event handler for RmAppEvents - this.rmDispatcher.register(RMAppEventType.class, - new ApplicationEventDispatcher(this.rmContext)); - - // Register event handler for RmAppAttemptEvents - this.rmDispatcher.register(RMAppAttemptEventType.class, - new ApplicationAttemptEventDispatcher(this.rmContext)); - - // Register event handler for RmNodes - this.rmDispatcher.register(RMNodeEventType.class, - new NodeEventDispatcher(this.rmContext)); - - this.nmLivelinessMonitor = createNMLivelinessMonitor(); - addService(this.nmLivelinessMonitor); - - this.resourceTracker = createResourceTrackerService(); - addService(resourceTracker); - - DefaultMetricsSystem.initialize("ResourceManager"); - JvmMetrics.initSingleton("ResourceManager", null); - - try { - this.scheduler.reinitialize(conf, this.rmContext); - } catch (IOException ioe) { - throw new RuntimeException("Failed to initialize scheduler", ioe); - } - - // creating monitors that handle preemption - createPolicyMonitors(); - - masterService = createApplicationMasterService(); - addService(masterService) ; - - this.applicationACLsManager = new ApplicationACLsManager(conf); - - this.rmAppManager = createRMAppManager(); - // Register event handler for RMAppManagerEvents - this.rmDispatcher.register(RMAppManagerEventType.class, - this.rmAppManager); - this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext); - rmContext.setRMDelegationTokenSecretManager(this.rmDTSecretManager); - clientRM = createClientRMService(); - rmContext.setClientRMService(clientRM); - addService(clientRM); - - adminService = createAdminService(clientRM, masterService, resourceTracker); - addService(adminService); - - this.applicationMasterLauncher = createAMLauncher(); - this.rmDispatcher.register(AMLauncherEventType.class, - this.applicationMasterLauncher); - - addService(applicationMasterLauncher); - if (UserGroupInformation.isSecurityEnabled()) { - addService(delegationTokenRenewer); - delegationTokenRenewer.setRMContext(rmContext); - } - new RMNMInfo(this.rmContext, this.scheduler); - + activeServices = new RMActiveServices(); + addService(activeServices); super.serviceInit(conf); } @@ -378,6 +263,217 @@ protected static void validateConfigs(Configuration conf) { } } + /** + * RMActiveServices handles all the Active services in the RM. + */ + @Private + class RMActiveServices extends CompositeService { + RMActiveServices() { + super("RMActiveServices"); + } + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + + rmDispatcher = createDispatcher(); + addIfService(rmDispatcher); + + amRmTokenSecretManager = createAMRMTokenSecretManager(conf); + + containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher); + addService(containerAllocationExpirer); + + AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor(); + addService(amLivelinessMonitor); + + AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); + addService(amFinishingMonitor); + + containerTokenSecretManager = createContainerTokenSecretManager(conf); + nmTokenSecretManager = createNMTokenSecretManager(conf); + + boolean isRecoveryEnabled = conf.getBoolean( + YarnConfiguration.RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); + + RMStateStore rmStore = null; + if(isRecoveryEnabled) { + recoveryEnabled = true; + rmStore = RMStateStoreFactory.getStore(conf); + } else { + recoveryEnabled = false; + rmStore = new NullRMStateStore(); + } + + try { + rmStore.init(conf); + rmStore.setRMDispatcher(rmDispatcher); + } catch (Exception e) { + // the Exception from stateStore.init() needs to be handled for + // HA and we need to give up master status if we got fenced + LOG.error("Failed to init state store", e); + ExitUtil.terminate(1, e); + } + + if (UserGroupInformation.isSecurityEnabled()) { + delegationTokenRenewer = createDelegationTokenRenewer(); + } + + rmContext = new RMContextImpl( + rmDispatcher, rmStore, containerAllocationExpirer, amLivelinessMonitor, + amFinishingMonitor, delegationTokenRenewer, amRmTokenSecretManager, + containerTokenSecretManager, nmTokenSecretManager, + clientToAMSecretManager); + + // Register event handler for NodesListManager + nodesListManager = new NodesListManager(rmContext); + rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); + addService(nodesListManager); + + // Initialize the scheduler + scheduler = createScheduler(); + schedulerDispatcher = createSchedulerEventDispatcher(); + addIfService(schedulerDispatcher); + rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); + + // Register event handler for RmAppEvents + rmDispatcher.register(RMAppEventType.class, + new ApplicationEventDispatcher(rmContext)); + + // Register event handler for RmAppAttemptEvents + rmDispatcher.register(RMAppAttemptEventType.class, + new ApplicationAttemptEventDispatcher(rmContext)); + + // Register event handler for RmNodes + rmDispatcher.register( + RMNodeEventType.class, new NodeEventDispatcher(rmContext)); + + nmLivelinessMonitor = createNMLivelinessMonitor(); + addService(nmLivelinessMonitor); + + resourceTracker = createResourceTrackerService(); + addService(resourceTracker); + + DefaultMetricsSystem.initialize("ResourceManager"); + JvmMetrics.initSingleton("ResourceManager", null); + + try { + scheduler.reinitialize(conf, rmContext); + } catch (IOException ioe) { + throw new RuntimeException("Failed to initialize scheduler", ioe); + } + + // creating monitors that handle preemption + createPolicyMonitors(); + + masterService = createApplicationMasterService(); + addService(masterService) ; + + applicationACLsManager = new ApplicationACLsManager(conf); + + rmAppManager = createRMAppManager(); + // Register event handler for RMAppManagerEvents + rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); + rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext); + rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager); + clientRM = createClientRMService(); + rmContext.setClientRMService(clientRM); + addService(clientRM); + + adminService = createAdminService(clientRM, masterService, resourceTracker); + addService(adminService); + + applicationMasterLauncher = createAMLauncher(); + rmDispatcher.register(AMLauncherEventType.class, + applicationMasterLauncher); + + addService(applicationMasterLauncher); + if (UserGroupInformation.isSecurityEnabled()) { + addService(delegationTokenRenewer); + delegationTokenRenewer.setRMContext(rmContext); + } + + new RMNMInfo(rmContext, scheduler); + + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + amRmTokenSecretManager.start(); + containerTokenSecretManager.start(); + nmTokenSecretManager.start(); + + RMStateStore rmStore = rmContext.getStateStore(); + // The state store needs to start irrespective of recoveryEnabled as apps + // need events to move to further states. + rmStore.start(); + + if(recoveryEnabled) { + try { + RMState state = rmStore.loadState(); + recover(state); + } catch (Exception e) { + // the Exception from loadState() needs to be handled for + // HA and we need to give up master status if we got fenced + LOG.error("Failed to load/recover state", e); + ExitUtil.terminate(1, e); + } + } + + startWepApp(); + try { + rmDTSecretManager.startThreads(); + } catch(IOException ie) { + throw new YarnRuntimeException("Failed to start secret manager threads", ie); + } + + if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { + String hostname = getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS); + hostname = (hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":")) : hostname; + int port = webApp.port(); + String resolvedAddress = hostname + ":" + port; + conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, resolvedAddress); + } + + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (webApp != null) { + webApp.stop(); + } + if (rmDTSecretManager != null) { + rmDTSecretManager.stopThreads(); + } + + if (amRmTokenSecretManager != null) { + amRmTokenSecretManager.stop(); + } + if (containerTokenSecretManager != null) { + containerTokenSecretManager.stop(); + } + if(nmTokenSecretManager != null) { + nmTokenSecretManager.stop(); + } + + DefaultMetricsSystem.shutdown(); + + if (rmContext != null) { + RMStateStore store = rmContext.getStateStore(); + try { + store.close(); + } catch (Exception e) { + LOG.error("Error closing store.", e); + } + } + super.serviceStop(); + } + } + @Private public static class SchedulerEventDispatcher extends AbstractService implements EventHandler { @@ -620,54 +716,7 @@ protected void serviceStart() throws Exception { throw new YarnRuntimeException("Failed to login", ie); } - this.amRmTokenSecretManager.start(); - this.containerTokenSecretManager.start(); - this.nmTokenSecretManager.start(); - - RMStateStore rmStore = rmContext.getStateStore(); - // The state store needs to start irrespective of recoveryEnabled as apps - // need events to move to further states. - rmStore.start(); - - if(recoveryEnabled) { - try { - RMState state = rmStore.loadState(); - recover(state); - } catch (Exception e) { - // the Exception from loadState() needs to be handled for - // HA and we need to give up master status if we got fenced - LOG.error("Failed to load/recover state", e); - ExitUtil.terminate(1, e); - } - } - - startWepApp(); - try { - rmDTSecretManager.startThreads(); - } catch(IOException ie) { - throw new YarnRuntimeException("Failed to start secret manager threads", ie); - } - - if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { - String hostname = getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS); - hostname = (hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":")) : hostname; - int port = webApp.port(); - String resolvedAddress = hostname + ":" + port; - conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, resolvedAddress); - } - super.serviceStart(); - - /*synchronized(shutdown) { - try { - while(!shutdown.get()) { - shutdown.wait(); - } - } catch(InterruptedException ie) { - LOG.info("Interrupted while waiting", ie); - } - }*/ } protected void doSecureLogin() throws IOException { @@ -677,39 +726,6 @@ protected void doSecureLogin() throws IOException { @Override protected void serviceStop() throws Exception { - if (webApp != null) { - webApp.stop(); - } - if (rmDTSecretManager != null) { - rmDTSecretManager.stopThreads(); - } - - if (amRmTokenSecretManager != null) { - this.amRmTokenSecretManager.stop(); - } - if (containerTokenSecretManager != null) { - this.containerTokenSecretManager.stop(); - } - if(nmTokenSecretManager != null) { - nmTokenSecretManager.stop(); - } - - /*synchronized(shutdown) { - shutdown.set(true); - shutdown.notifyAll(); - }*/ - - DefaultMetricsSystem.shutdown(); - - if (rmContext != null) { - RMStateStore store = rmContext.getStateStore(); - try { - store.close(); - } catch (Exception e) { - LOG.error("Error closing store.", e); - } - } - super.serviceStop(); }