YARN-6102. RMActiveService context to be updated with new RMContext on failover. Contributed by Rohith Sharma K S.

This commit is contained in:
Sunil G 2017-07-24 10:59:01 +05:30
parent 2054324d47
commit e315328428
13 changed files with 452 additions and 243 deletions

View File

@ -57,7 +57,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
private RMContext rmContext;
private ResourceManager rm;
private byte[] localActiveNodeInfo;
private ActiveStandbyElector elector;
@ -66,9 +66,9 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
@VisibleForTesting
final Object zkDisconnectLock = new Object();
ActiveStandbyElectorBasedElectorService(RMContext rmContext) {
ActiveStandbyElectorBasedElectorService(ResourceManager rm) {
super(ActiveStandbyElectorBasedElectorService.class.getName());
this.rmContext = rmContext;
this.rm = rm;
}
@Override
@ -140,7 +140,7 @@ public void becomeActive() throws ServiceFailedException {
cancelDisconnectTimer();
try {
rmContext.getRMAdminService().transitionToActive(req);
rm.getRMContext().getRMAdminService().transitionToActive(req);
} catch (Exception e) {
throw new ServiceFailedException("RM could not transition to Active", e);
}
@ -151,7 +151,7 @@ public void becomeStandby() {
cancelDisconnectTimer();
try {
rmContext.getRMAdminService().transitionToStandby(req);
rm.getRMContext().getRMAdminService().transitionToStandby(req);
} catch (Exception e) {
LOG.error("RM could not transition to Standby", e);
}
@ -205,7 +205,7 @@ public void run() {
@SuppressWarnings(value = "unchecked")
@Override
public void notifyFatalError(String errorMessage) {
rmContext.getDispatcher().getEventHandler().handle(
rm.getRMContext().getDispatcher().getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
errorMessage));
}

View File

@ -102,7 +102,6 @@ public class AdminService extends CompositeService implements
private static final Log LOG = LogFactory.getLog(AdminService.class);
private final RMContext rmContext;
private final ResourceManager rm;
private String rmId;
@ -123,16 +122,16 @@ public class AdminService extends CompositeService implements
@VisibleForTesting
boolean isCentralizedNodeLabelConfiguration = true;
public AdminService(ResourceManager rm, RMContext rmContext) {
public AdminService(ResourceManager rm) {
super(AdminService.class.getName());
this.rm = rm;
this.rmContext = rmContext;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
autoFailoverEnabled =
rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf);
rm.getRMContext().isHAEnabled()
&& HAUtil.isAutomaticFailoverEnabled(conf);
masterServiceBindAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
@ -189,7 +188,7 @@ protected void startServer() throws Exception {
RMPolicyProvider.getInstance());
}
if (rmContext.isHAEnabled()) {
if (rm.getRMContext().isHAEnabled()) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
@ -265,7 +264,7 @@ private void checkHaStateChange(StateChangeRequestInfo req)
}
private synchronized boolean isRMActive() {
return HAServiceState.ACTIVE == rmContext.getHAServiceState();
return HAServiceState.ACTIVE == rm.getRMContext().getHAServiceState();
}
private void throwStandbyException() throws StandbyException {
@ -304,7 +303,7 @@ public synchronized void transitionToActive(
// call all refresh*s for active RM to get the updated configurations.
refreshAll();
} catch (Exception e) {
rmContext
rm.getRMContext()
.getDispatcher()
.getEventHandler()
.handle(
@ -363,7 +362,7 @@ public synchronized void transitionToStandby(
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
HAServiceState haState = rmContext.getHAServiceState();
HAServiceState haState = rm.getRMContext().getHAServiceState();
HAServiceStatus ret = new HAServiceStatus(haState);
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
@ -395,11 +394,12 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
}
private void refreshQueues() throws IOException, YarnException {
rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
rm.getRMContext().getScheduler().reinitialize(getConfig(),
this.rm.getRMContext());
// refresh the reservation system
ReservationSystem rSystem = rmContext.getReservationSystem();
ReservationSystem rSystem = rm.getRMContext().getReservationSystem();
if (rSystem != null) {
rSystem.reinitialize(getConfig(), rmContext);
rSystem.reinitialize(getConfig(), rm.getRMContext());
}
}
@ -418,14 +418,14 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
switch (request.getDecommissionType()) {
case NORMAL:
rmContext.getNodesListManager().refreshNodes(conf);
rm.getRMContext().getNodesListManager().refreshNodes(conf);
break;
case GRACEFUL:
rmContext.getNodesListManager().refreshNodesGracefully(
rm.getRMContext().getNodesListManager().refreshNodesGracefully(
conf, request.getDecommissionTimeout());
break;
case FORCEFUL:
rmContext.getNodesListManager().refreshNodesForcefully();
rm.getRMContext().getNodesListManager().refreshNodesForcefully();
break;
}
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
@ -440,7 +440,7 @@ private void refreshNodes() throws IOException, YarnException {
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
rmContext.getNodesListManager().refreshNodes(conf);
rm.getRMContext().getNodesListManager().refreshNodes(conf);
}
@Override
@ -559,10 +559,11 @@ private void refreshActiveServicesAcls() throws IOException, YarnException {
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
rmContext.getApplicationMasterService().refreshServiceAcls(
rm.getRMContext().getClientRMService().refreshServiceAcls(conf,
policyProvider);
rm.getRMContext().getApplicationMasterService().refreshServiceAcls(
conf, policyProvider);
rmContext.getResourceTrackerService().refreshServiceAcls(
rm.getRMContext().getResourceTrackerService().refreshServiceAcls(
conf, policyProvider);
}
@ -601,7 +602,7 @@ public UpdateNodeResourceResponse updateNodeResource(
// if any invalid nodes, throw exception instead of partially updating
// valid nodes.
for (NodeId nodeId : nodeIds) {
RMNode node = this.rmContext.getRMNodes().get(nodeId);
RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId);
if (node == null) {
LOG.error("Resource update get failed on all nodes due to change "
+ "resource on an unrecognized node: " + nodeId);
@ -619,14 +620,14 @@ public UpdateNodeResourceResponse updateNodeResource(
for (Map.Entry<NodeId, ResourceOption> entry : nodeResourceMap.entrySet()) {
ResourceOption newResourceOption = entry.getValue();
NodeId nodeId = entry.getKey();
RMNode node = this.rmContext.getRMNodes().get(nodeId);
RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId);
if (node == null) {
LOG.warn("Resource update get failed on an unrecognized node: " + nodeId);
allSuccess = false;
} else {
// update resource to RMNode
this.rmContext.getDispatcher().getEventHandler()
this.rm.getRMContext().getDispatcher().getEventHandler()
.handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption));
LOG.info("Update resource on node(" + node.getNodeID()
+ ") with resource(" + newResourceOption.toString() + ")");
@ -661,7 +662,8 @@ public RefreshNodesResourcesResponse refreshNodesResources(
DynamicResourceConfiguration newConf;
InputStream drInputStream =
this.rmContext.getConfigurationProvider().getConfigurationInputStream(
this.rm.getRMContext().getConfigurationProvider()
.getConfigurationInputStream(
configuration, YarnConfiguration.DR_CONFIGURATION_FILE);
if (drInputStream != null) {
@ -679,7 +681,7 @@ public RefreshNodesResourcesResponse refreshNodesResources(
updateNodeResource(updateRequest);
}
// refresh dynamic resource in ResourceTrackerService
this.rmContext.getResourceTrackerService().
this.rm.getRMContext().getResourceTrackerService().
updateDynamicResourceConfiguration(newConf);
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
@ -692,7 +694,8 @@ public RefreshNodesResourcesResponse refreshNodesResources(
private synchronized Configuration getConfiguration(Configuration conf,
String... confFileNames) throws YarnException, IOException {
for (String confFileName : confFileNames) {
InputStream confFileInputStream = this.rmContext.getConfigurationProvider()
InputStream confFileInputStream =
this.rm.getRMContext().getConfigurationProvider()
.getConfigurationInputStream(conf, confFileName);
if (confFileInputStream != null) {
conf.addResource(confFileInputStream);
@ -746,7 +749,7 @@ public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLab
AddToClusterNodeLabelsResponse response =
recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class);
try {
rmContext.getNodeLabelManager()
rm.getRMContext().getNodeLabelManager()
.addToCluserNodeLabels(request.getNodeLabels());
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
@ -769,7 +772,8 @@ public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
RemoveFromClusterNodeLabelsResponse response =
recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class);
try {
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels());
rm.getRMContext().getNodeLabelManager()
.removeFromClusterNodeLabels(request.getNodeLabels());
RMAuditLogger
.logSuccess(user.getShortUserName(), operation, "AdminService");
return response;
@ -805,19 +809,20 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
boolean isKnown = false;
// both active and inactive nodes are recognized as known nodes
if (requestedNode.getPort() != 0) {
if (rmContext.getRMNodes().containsKey(requestedNode)
|| rmContext.getInactiveRMNodes().containsKey(requestedNode)) {
if (rm.getRMContext().getRMNodes().containsKey(requestedNode) || rm
.getRMContext().getInactiveRMNodes().containsKey(requestedNode)) {
isKnown = true;
}
} else {
for (NodeId knownNode : rmContext.getRMNodes().keySet()) {
for (NodeId knownNode : rm.getRMContext().getRMNodes().keySet()) {
if (knownNode.getHost().equals(requestedNode.getHost())) {
isKnown = true;
break;
}
}
if (!isKnown) {
for (NodeId knownNode : rmContext.getInactiveRMNodes().keySet()) {
for (NodeId knownNode : rm.getRMContext().getInactiveRMNodes()
.keySet()) {
if (knownNode.getHost().equals(requestedNode.getHost())) {
isKnown = true;
break;
@ -841,7 +846,7 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
}
}
try {
rmContext.getNodeLabelManager().replaceLabelsOnNode(
rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode(
request.getNodeToLabels());
RMAuditLogger
.logSuccess(user.getShortUserName(), operation, "AdminService");
@ -878,7 +883,7 @@ public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
checkRMStatus(user.getShortUserName(), operation, msg);
Set<NodeId> decommissioningNodes = rmContext.getNodesListManager()
Set<NodeId> decommissioningNodes = rm.getRMContext().getNodesListManager()
.checkForDecommissioningNodes();
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
@ -914,6 +919,6 @@ private void refreshClusterMaxPriority() throws IOException, YarnException {
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
rmContext.getScheduler().setClusterMaxPriority(conf);
rm.getRMContext().getScheduler().setClusterMaxPriority(conf);
}
}

View File

@ -45,14 +45,12 @@ public class CuratorBasedElectorService extends AbstractService
LogFactory.getLog(CuratorBasedElectorService.class);
private LeaderLatch leaderLatch;
private CuratorFramework curator;
private RMContext rmContext;
private String latchPath;
private String rmId;
private ResourceManager rm;
public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) {
public CuratorBasedElectorService(ResourceManager rm) {
super(CuratorBasedElectorService.class.getName());
this.rmContext = rmContext;
this.rm = rm;
}
@ -102,7 +100,8 @@ public String getZookeeperConnectionState() {
public void isLeader() {
LOG.info(rmId + "is elected leader, transitioning to active");
try {
rmContext.getRMAdminService().transitionToActive(
rm.getRMContext().getRMAdminService()
.transitionToActive(
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
} catch (Exception e) {
@ -123,7 +122,8 @@ private void closeLeaderLatch() throws IOException {
public void notLeader() {
LOG.info(rmId + " relinquish leadership");
try {
rmContext.getRMAdminService().transitionToStandby(
rm.getRMContext().getRMAdminService()
.transitionToStandby(
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
} catch (Exception e) {

View File

@ -42,20 +42,20 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* The RMActiveServiceContext is the class that maintains all the
* RMActiveService contexts.This is expected to be used only by ResourceManager
* and RMContext.
* The RMActiveServiceContext is the class that maintains <b>Active</b> service
* context. Services that need to run only on the Active RM. This is expected to
* be used only by RMContext.
*/
@Private
@Unstable
@ -94,7 +94,6 @@ public class RMActiveServiceContext {
private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
private RMTimelineCollectorManager timelineCollectorManager;
private RMNodeLabelsManager nodeLabelManager;
private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
@ -107,6 +106,7 @@ public class RMActiveServiceContext {
private PlacementManager queuePlacementManager = null;
private RMAppLifetimeMonitor rmAppLifetimeMonitor;
private QueueLimitCalculator queueLimitCalculator;
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
@ -372,19 +372,6 @@ public boolean isWorkPreservingRecoveryEnabled() {
return this.isWorkPreservingRecoveryEnabled;
}
@Private
@Unstable
public RMTimelineCollectorManager getRMTimelineCollectorManager() {
return timelineCollectorManager;
}
@Private
@Unstable
public void setRMTimelineCollectorManager(
RMTimelineCollectorManager collectorManager) {
this.timelineCollectorManager = collectorManager;
}
@Private
@Unstable
public long getEpoch() {
@ -483,4 +470,17 @@ public void setRMAppLifetimeMonitor(
public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
return this.rmAppLifetimeMonitor;
}
@Private
@Unstable
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
return this.queueLimitCalculator;
}
@Private
@Unstable
public void setContainerQueueLimitCalculator(
QueueLimitCalculator limitCalculator) {
this.queueLimitCalculator = limitCalculator;
}
}

View File

@ -24,7 +24,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -57,37 +56,39 @@
import com.google.common.annotations.VisibleForTesting;
/**
* RMContextImpl class holds two services context.
* <ul>
* <li>serviceContext : These services called as <b>Always On</b> services.
* Services that need to run always irrespective of the HA state of the RM.</li>
* <li>activeServiceCotext : Active services context. Services that need to run
* only on the Active RM.</li>
* </ul>
* <p>
* <b>Note:</b> If any new service to be added to context, add it to a right
* context as per above description.
*/
public class RMContextImpl implements RMContext {
private Dispatcher rmDispatcher;
private boolean isHAEnabled;
private HAServiceState haServiceState =
HAServiceProtocol.HAServiceState.INITIALIZING;
private AdminService adminService;
private ConfigurationProvider configurationProvider;
/**
* RM service contexts which runs through out RM life span. These are created
* once during start of RM.
*/
private RMServiceContext serviceContext;
/**
* RM Active service context. This will be recreated for every transition from
* ACTIVE->STANDBY.
*/
private RMActiveServiceContext activeServiceContext;
private Configuration yarnConfiguration;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private EmbeddedElector elector;
private QueueLimitCalculator queueLimitCalculator;
private final Object haServiceStateLock = new Object();
private ResourceManager resourceManager;
/**
* Default constructor. To be used in conjunction with setter methods for
* individual fields.
*/
public RMContextImpl() {
this.serviceContext = new RMServiceContext();
this.activeServiceContext = new RMActiveServiceContext();
}
@VisibleForTesting
@ -138,19 +139,154 @@ public RMContextImpl(Dispatcher rmDispatcher,
clientToAMTokenSecretManager, null);
}
@Override
public Dispatcher getDispatcher() {
return this.rmDispatcher;
/**
* RM service contexts which runs through out JVM life span. These are created
* once during start of RM.
* @return serviceContext of RM
*/
@Private
@Unstable
public RMServiceContext getServiceContext() {
return serviceContext;
}
/**
* <b>Note:</b> setting service context clears all services embedded with it.
* @param context rm service context
*/
@Private
@Unstable
public void setServiceContext(RMServiceContext context) {
this.serviceContext = context;
}
@Override
public void setLeaderElectorService(EmbeddedElector elector) {
this.elector = elector;
public ResourceManager getResourceManager() {
return serviceContext.getResourceManager();
}
public void setResourceManager(ResourceManager rm) {
serviceContext.setResourceManager(rm);
}
@Override
public EmbeddedElector getLeaderElectorService() {
return this.elector;
return serviceContext.getLeaderElectorService();
}
@Override
public void setLeaderElectorService(EmbeddedElector elector) {
serviceContext.setLeaderElectorService(elector);
}
@Override
public Dispatcher getDispatcher() {
return serviceContext.getDispatcher();
}
void setDispatcher(Dispatcher dispatcher) {
serviceContext.setDispatcher(dispatcher);
}
@Override
public AdminService getRMAdminService() {
return serviceContext.getRMAdminService();
}
void setRMAdminService(AdminService adminService) {
serviceContext.setRMAdminService(adminService);
}
@Override
public boolean isHAEnabled() {
return serviceContext.isHAEnabled();
}
void setHAEnabled(boolean isHAEnabled) {
serviceContext.setHAEnabled(isHAEnabled);
}
@Override
public HAServiceState getHAServiceState() {
return serviceContext.getHAServiceState();
}
void setHAServiceState(HAServiceState serviceState) {
serviceContext.setHAServiceState(serviceState);
}
@Override
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return serviceContext.getRMApplicationHistoryWriter();
}
@Override
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
serviceContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
}
@Override
public SystemMetricsPublisher getSystemMetricsPublisher() {
return serviceContext.getSystemMetricsPublisher();
}
@Override
public void setSystemMetricsPublisher(
SystemMetricsPublisher metricsPublisher) {
serviceContext.setSystemMetricsPublisher(metricsPublisher);
}
@Override
public RMTimelineCollectorManager getRMTimelineCollectorManager() {
return serviceContext.getRMTimelineCollectorManager();
}
@Override
public void setRMTimelineCollectorManager(
RMTimelineCollectorManager timelineCollectorManager) {
serviceContext.setRMTimelineCollectorManager(timelineCollectorManager);
}
@Override
public ConfigurationProvider getConfigurationProvider() {
return serviceContext.getConfigurationProvider();
}
public void setConfigurationProvider(
ConfigurationProvider configurationProvider) {
serviceContext.setConfigurationProvider(configurationProvider);
}
@Override
public Configuration getYarnConfiguration() {
return serviceContext.getYarnConfiguration();
}
public void setYarnConfiguration(Configuration yarnConfiguration) {
serviceContext.setYarnConfiguration(yarnConfiguration);
}
public String getHAZookeeperConnectionState() {
return serviceContext.getHAZookeeperConnectionState();
}
// ==========================================================================
/**
* RM Active service context. This will be recreated for every transition from
* ACTIVE to STANDBY.
* @return activeServiceContext of active services
*/
@Private
@Unstable
public RMActiveServiceContext getActiveServiceContext() {
return activeServiceContext;
}
@Private
@Unstable
void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
this.activeServiceContext = activeServiceContext;
}
@Override
@ -228,11 +364,6 @@ public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
return activeServiceContext.getClientToAMTokenSecretManager();
}
@Override
public AdminService getRMAdminService() {
return this.adminService;
}
@VisibleForTesting
public void setStateStore(RMStateStore store) {
activeServiceContext.setStateStore(store);
@ -253,24 +384,6 @@ public ResourceTrackerService getResourceTrackerService() {
return activeServiceContext.getResourceTrackerService();
}
void setHAEnabled(boolean isHAEnabled) {
this.isHAEnabled = isHAEnabled;
}
void setHAServiceState(HAServiceState serviceState) {
synchronized (haServiceStateLock) {
this.haServiceState = serviceState;
}
}
void setDispatcher(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
void setRMAdminService(AdminService adminService) {
this.adminService = adminService;
}
@Override
public void setClientRMService(ClientRMService clientRMService) {
activeServiceContext.setClientRMService(clientRMService);
@ -348,18 +461,6 @@ void setResourceTrackerService(ResourceTrackerService resourceTrackerService) {
activeServiceContext.setResourceTrackerService(resourceTrackerService);
}
@Override
public boolean isHAEnabled() {
return isHAEnabled;
}
@Override
public HAServiceState getHAServiceState() {
synchronized (haServiceStateLock) {
return haServiceState;
}
}
public void setWorkPreservingRecoveryEnabled(boolean enabled) {
activeServiceContext.setWorkPreservingRecoveryEnabled(enabled);
}
@ -369,50 +470,6 @@ public boolean isWorkPreservingRecoveryEnabled() {
return activeServiceContext.isWorkPreservingRecoveryEnabled();
}
@Override
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return this.rmApplicationHistoryWriter;
}
@Override
public void setRMTimelineCollectorManager(
RMTimelineCollectorManager timelineCollectorManager) {
activeServiceContext.setRMTimelineCollectorManager(
timelineCollectorManager);
}
@Override
public RMTimelineCollectorManager getRMTimelineCollectorManager() {
return activeServiceContext.getRMTimelineCollectorManager();
}
@Override
public void setSystemMetricsPublisher(
SystemMetricsPublisher metricsPublisher) {
this.systemMetricsPublisher = metricsPublisher;
}
@Override
public SystemMetricsPublisher getSystemMetricsPublisher() {
return this.systemMetricsPublisher;
}
@Override
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
}
@Override
public ConfigurationProvider getConfigurationProvider() {
return this.configurationProvider;
}
public void setConfigurationProvider(
ConfigurationProvider configurationProvider) {
this.configurationProvider = configurationProvider;
}
@Override
public long getEpoch() {
@ -463,27 +520,6 @@ public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
return activeServiceContext.getSystemCredentialsForApps();
}
@Private
@Unstable
public RMActiveServiceContext getActiveServiceContext() {
return activeServiceContext;
}
@Private
@Unstable
void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
this.activeServiceContext = activeServiceContext;
}
@Override
public Configuration getYarnConfiguration() {
return this.yarnConfiguration;
}
public void setYarnConfiguration(Configuration yarnConfiguration) {
this.yarnConfiguration=yarnConfiguration;
}
@Override
public PlacementManager getQueuePlacementManager() {
return this.activeServiceContext.getQueuePlacementManager();
@ -496,12 +532,12 @@ public void setQueuePlacementManager(PlacementManager placementMgr) {
@Override
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
return this.queueLimitCalculator;
return activeServiceContext.getNodeManagerQueueLimitCalculator();
}
public void setContainerQueueLimitCalculator(
QueueLimitCalculator limitCalculator) {
this.queueLimitCalculator = limitCalculator;
activeServiceContext.setContainerQueueLimitCalculator(limitCalculator);
}
@Override
@ -515,21 +551,5 @@ public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
return this.activeServiceContext.getRMAppLifetimeMonitor();
}
public String getHAZookeeperConnectionState() {
if (elector == null) {
return "Could not find leader elector. Verify both HA and automatic " +
"failover are enabled.";
} else {
return elector.getZookeeperConnectionState();
}
}
@Override
public ResourceManager getResourceManager() {
return resourceManager;
}
public void setResourceManager(ResourceManager rm) {
this.resourceManager = rm;
}
// Note: Read java doc before adding any services over here.
}

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
/**
* RMServiceContext class maintains "Always On" services. Services that need to
* run always irrespective of the HA state of the RM. This is created during
* initialization of RMContextImpl.
* <p>
* <b>Note:</b> If any services to be added in this class, make sure service
* will be running always irrespective of the HA state of the RM
*/
@Private
@Unstable
public class RMServiceContext {
private Dispatcher rmDispatcher;
private boolean isHAEnabled;
private HAServiceState haServiceState =
HAServiceProtocol.HAServiceState.INITIALIZING;
private AdminService adminService;
private ConfigurationProvider configurationProvider;
private Configuration yarnConfiguration;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private EmbeddedElector elector;
private final Object haServiceStateLock = new Object();
private ResourceManager resourceManager;
private RMTimelineCollectorManager timelineCollectorManager;
public ResourceManager getResourceManager() {
return resourceManager;
}
public void setResourceManager(ResourceManager rm) {
this.resourceManager = rm;
}
public ConfigurationProvider getConfigurationProvider() {
return this.configurationProvider;
}
public void setConfigurationProvider(
ConfigurationProvider configurationProvider) {
this.configurationProvider = configurationProvider;
}
public Dispatcher getDispatcher() {
return this.rmDispatcher;
}
void setDispatcher(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
public EmbeddedElector getLeaderElectorService() {
return this.elector;
}
public void setLeaderElectorService(EmbeddedElector embeddedElector) {
this.elector = embeddedElector;
}
public AdminService getRMAdminService() {
return this.adminService;
}
void setRMAdminService(AdminService service) {
this.adminService = service;
}
void setHAEnabled(boolean rmHAEnabled) {
this.isHAEnabled = rmHAEnabled;
}
public boolean isHAEnabled() {
return isHAEnabled;
}
public HAServiceState getHAServiceState() {
synchronized (haServiceStateLock) {
return haServiceState;
}
}
void setHAServiceState(HAServiceState serviceState) {
synchronized (haServiceStateLock) {
this.haServiceState = serviceState;
}
}
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return this.rmApplicationHistoryWriter;
}
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter applicationHistoryWriter) {
this.rmApplicationHistoryWriter = applicationHistoryWriter;
}
public void setSystemMetricsPublisher(
SystemMetricsPublisher metricsPublisher) {
this.systemMetricsPublisher = metricsPublisher;
}
public SystemMetricsPublisher getSystemMetricsPublisher() {
return this.systemMetricsPublisher;
}
public Configuration getYarnConfiguration() {
return this.yarnConfiguration;
}
public void setYarnConfiguration(Configuration yarnConfiguration) {
this.yarnConfiguration = yarnConfiguration;
}
public RMTimelineCollectorManager getRMTimelineCollectorManager() {
return timelineCollectorManager;
}
public void setRMTimelineCollectorManager(
RMTimelineCollectorManager collectorManager) {
this.timelineCollectorManager = collectorManager;
}
public String getHAZookeeperConnectionState() {
if (elector == null) {
return "Could not find leader elector. Verify both HA and automatic "
+ "failover are enabled.";
} else {
return elector.getZookeeperConnectionState();
}
}
}

View File

@ -115,7 +115,6 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
@ -345,9 +344,9 @@ protected EmbeddedElector createEmbeddedElector() throws IOException {
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
this.curator = createAndStartCurator(conf);
elector = new CuratorBasedElectorService(rmContext, this);
elector = new CuratorBasedElectorService(this);
} else {
elector = new ActiveStandbyElectorBasedElectorService(rmContext);
elector = new ActiveStandbyElectorBasedElectorService(this);
}
return elector;
}
@ -497,7 +496,7 @@ protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() {
}
private RMTimelineCollectorManager createRMTimelineCollectorManager() {
return new RMTimelineCollectorManager(rmContext);
return new RMTimelineCollectorManager(this);
}
protected SystemMetricsPublisher createSystemMetricsPublisher() {
@ -508,7 +507,8 @@ protected SystemMetricsPublisher createSystemMetricsPublisher() {
// we're dealing with the v.2.x publisher
LOG.info("system metrics publisher with the timeline service V2 is " +
"configured");
publisher = new TimelineServiceV2Publisher(rmContext);
publisher = new TimelineServiceV2Publisher(
rmContext.getRMTimelineCollectorManager());
} else {
// we're dealing with the v.1.x publisher
LOG.info("system metrics publisher with the timeline service V1 is " +
@ -560,7 +560,6 @@ public class RMActiveServices extends CompositeService {
private ApplicationMasterLauncher applicationMasterLauncher;
private ContainerAllocationExpirer containerAllocationExpirer;
private ResourceManager rm;
private RMActiveServiceContext activeServiceContext;
private boolean fromActive = false;
private StandByTransitionRunnable standByTransitionRunnable;
@ -573,9 +572,6 @@ public class RMActiveServices extends CompositeService {
protected void serviceInit(Configuration configuration) throws Exception {
standByTransitionRunnable = new StandByTransitionRunnable();
activeServiceContext = new RMActiveServiceContext();
rmContext.setActiveServiceContext(activeServiceContext);
rmSecretManagerService = createRMSecretManagerService();
addService(rmSecretManagerService);
@ -1149,7 +1145,7 @@ void reinitialize(boolean initialize) {
ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics();
if (initialize) {
resetDispatcher();
resetRMContext();
createAndInitActiveServices(true);
}
}
@ -1294,7 +1290,7 @@ protected ApplicationMasterService createApplicationMasterService() {
}
protected AdminService createAdminService() {
return new AdminService(this, rmContext);
return new AdminService(this);
}
protected RMSecretManagerService createRMSecretManagerService() {
@ -1417,17 +1413,24 @@ private Dispatcher setupDispatcher() {
return dispatcher;
}
private void resetDispatcher() {
private void resetRMContext() {
RMContextImpl rmContextImpl = new RMContextImpl();
// transfer service context to new RM service Context
rmContextImpl.setServiceContext(rmContext.getServiceContext());
// reset dispatcher
Dispatcher dispatcher = setupDispatcher();
((Service)dispatcher).init(this.conf);
((Service)dispatcher).start();
removeService((Service)rmDispatcher);
((Service) dispatcher).init(this.conf);
((Service) dispatcher).start();
removeService((Service) rmDispatcher);
// Need to stop previous rmDispatcher before assigning new dispatcher
// otherwise causes "AsyncDispatcher event handler" thread leak
((Service) rmDispatcher).stop();
rmDispatcher = dispatcher;
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher);
rmContextImpl.setDispatcher(dispatcher);
rmContext = rmContextImpl;
}
private void setSchedulerRecoveryStartAndWaitTime(RMState state,

View File

@ -48,7 +48,6 @@
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@ -75,9 +74,10 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
private RMTimelineCollectorManager rmTimelineCollectorManager;
private boolean publishContainerEvents;
public TimelineServiceV2Publisher(RMContext rmContext) {
public TimelineServiceV2Publisher(
RMTimelineCollectorManager timelineCollectorManager) {
super("TimelineserviceV2Publisher");
rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager();
rmTimelineCollectorManager = timelineCollectorManager;
}
@Override

View File

@ -24,7 +24,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
@ -41,16 +41,16 @@ public class RMTimelineCollectorManager extends TimelineCollectorManager {
private static final Log LOG =
LogFactory.getLog(RMTimelineCollectorManager.class);
private RMContext rmContext;
private ResourceManager rm;
public RMTimelineCollectorManager(RMContext rmContext) {
public RMTimelineCollectorManager(ResourceManager resourceManager) {
super(RMTimelineCollectorManager.class.getName());
this.rmContext = rmContext;
this.rm = resourceManager;
}
@Override
protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
RMApp app = rmContext.getRMApps().get(appId);
RMApp app = rm.getRMContext().getRMApps().get(appId);
if (app == null) {
throw new YarnRuntimeException(
"Unable to get the timeline collector context info for a " +

View File

@ -1055,7 +1055,7 @@ protected void serviceStop() {
@Override
protected AdminService createAdminService() {
return new AdminService(this, getRMContext()) {
return new AdminService(this) {
@Override
protected void startServer() {
// override to not start rpc handler

View File

@ -122,13 +122,15 @@ private void testCallbackSynchronization(SyncTestType type)
throws IOException, InterruptedException {
AdminService as = mock(AdminService.class);
RMContext rc = mock(RMContext.class);
ResourceManager rm = mock(ResourceManager.class);
Configuration myConf = new Configuration(conf);
myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50);
when(rm.getRMContext()).thenReturn(rc);
when(rc.getRMAdminService()).thenReturn(as);
ActiveStandbyElectorBasedElectorService
ees = new ActiveStandbyElectorBasedElectorService(rc);
ActiveStandbyElectorBasedElectorService ees =
new ActiveStandbyElectorBasedElectorService(rm);
ees.init(myConf);
ees.enterNeutralMode();
@ -290,7 +292,7 @@ private class MockRMWithElector extends MockRM {
@Override
protected EmbeddedElector createEmbeddedElector() {
return new ActiveStandbyElectorBasedElectorService(getRMContext()) {
return new ActiveStandbyElectorBasedElectorService(this) {
@Override
public void becomeActive() throws
ServiceFailedException {

View File

@ -71,6 +71,7 @@ public class TestRMHA {
private Log LOG = LogFactory.getLog(TestRMHA.class);
private Configuration configuration;
private MockRM rm = null;
private MockNM nm = null;
private RMApp app = null;
private RMAppAttempt attempt = null;
private static final String STATE_ERR =
@ -135,7 +136,7 @@ private void checkActiveRMFunctionality() throws Exception {
try {
rm.getNewAppId();
rm.registerNode("127.0.0.1:1", 2048);
nm = rm.registerNode("127.0.0.1:1", 2048);
app = rm.submitApp(1024);
attempt = app.getCurrentAppAttempt();
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
@ -551,6 +552,17 @@ public void testFailoverClearsRMContext() throws Exception {
verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
assertEquals(1, rm.getRMContext().getRMNodes().size());
assertEquals(1, rm.getRMContext().getRMApps().size());
Assert.assertNotNull("Node not registered", nm);
rm.adminService.transitionToStandby(requestInfo);
checkMonitorHealth();
checkStandbyRMFunctionality();
// race condition causes to register/node heartbeat node even after service
// is stopping/stopped. New RMContext is being created on every transition
// to standby, so metrics should be 0 which indicates new context reference
// has taken.
nm.registerNode();
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
// 3. Create new RM
rm = new MockRM(conf, memStore) {
@ -592,7 +604,7 @@ public void testTransitionedToActiveRefreshFail() throws Exception {
rm = new MockRM(configuration) {
@Override
protected AdminService createAdminService() {
return new AdminService(this, getRMContext()) {
return new AdminService(this) {
int counter = 0;
@Override
protected void setConfig(Configuration conf) {

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@ -98,10 +99,12 @@ public static void setup() throws Exception {
new Path(testRootDir.getAbsolutePath()), true);
}
ResourceManager rm = mock(ResourceManager.class);
RMContext rmContext = mock(RMContext.class);
rmAppsMapInContext = new ConcurrentHashMap<ApplicationId, RMApp>();
when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext);
rmTimelineCollectorManager = new RMTimelineCollectorManager(rmContext);
when(rm.getRMContext()).thenReturn(rmContext);
rmTimelineCollectorManager = new RMTimelineCollectorManager(rm);
when(rmContext.getRMTimelineCollectorManager()).thenReturn(
rmTimelineCollectorManager);
@ -113,7 +116,8 @@ public static void setup() throws Exception {
dispatcher.init(conf);
dispatcher.start();
metricsPublisher = new TimelineServiceV2Publisher(rmContext) {
metricsPublisher =
new TimelineServiceV2Publisher(rmTimelineCollectorManager) {
@Override
protected Dispatcher getDispatcher() {
return dispatcher;
@ -162,7 +166,7 @@ private static Configuration getTimelineV2Conf() {
public void testSystemMetricPublisherInitialization() {
@SuppressWarnings("resource")
TimelineServiceV2Publisher publisher =
new TimelineServiceV2Publisher(mock(RMContext.class));
new TimelineServiceV2Publisher(mock(RMTimelineCollectorManager.class));
try {
Configuration conf = getTimelineV2Conf();
conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED,
@ -174,7 +178,8 @@ public void testSystemMetricPublisherInitialization() {
publisher.stop();
publisher = new TimelineServiceV2Publisher(mock(RMContext.class));
publisher = new TimelineServiceV2Publisher(
mock(RMTimelineCollectorManager.class));
conf = getTimelineV2Conf();
publisher.init(conf);
assertTrue("Expected to have registered event handlers and set ready to "