YARN-1481. Move internal services logic from AdminService to ResourceManager. (vinodkv via kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1550167 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
327a995561
commit
94e2e78ab7
@ -154,6 +154,9 @@ Release 2.4.0 - UNRELEASED
|
||||
YARN-1378. Implemented a cleaner of old finished applications from the RM
|
||||
state-store. (Jian He via vinodkv)
|
||||
|
||||
YARN-1481. Move internal services logic from AdminService to ResourceManager.
|
||||
(vinodkv via kasha)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -21,8 +21,6 @@
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.BlockingService;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -43,7 +41,6 @@
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
@ -66,6 +63,8 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
||||
public class AdminService extends AbstractService implements
|
||||
HAServiceProtocol, ResourceManagerAdministrationProtocol {
|
||||
|
||||
@ -73,10 +72,6 @@ public class AdminService extends AbstractService implements
|
||||
|
||||
private final RMContext rmContext;
|
||||
private final ResourceManager rm;
|
||||
@VisibleForTesting
|
||||
protected HAServiceProtocol.HAServiceState
|
||||
haState = HAServiceProtocol.HAServiceState.INITIALIZING;
|
||||
boolean haEnabled;
|
||||
|
||||
private Server server;
|
||||
private InetSocketAddress masterServiceAddress;
|
||||
@ -93,13 +88,6 @@ public AdminService(ResourceManager rm, RMContext rmContext) {
|
||||
|
||||
@Override
|
||||
public synchronized void serviceInit(Configuration conf) throws Exception {
|
||||
haEnabled = HAUtil.isHAEnabled(conf);
|
||||
if (haEnabled) {
|
||||
HAUtil.verifyAndSetConfiguration(conf);
|
||||
rm.setConf(conf);
|
||||
}
|
||||
rm.createAndInitActiveServices();
|
||||
|
||||
masterServiceAddress = conf.getSocketAddr(
|
||||
YarnConfiguration.RM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
|
||||
@ -112,11 +100,6 @@ public synchronized void serviceInit(Configuration conf) throws Exception {
|
||||
|
||||
@Override
|
||||
protected synchronized void serviceStart() throws Exception {
|
||||
if (haEnabled) {
|
||||
transitionToStandby(true);
|
||||
} else {
|
||||
transitionToActive();
|
||||
}
|
||||
startServer();
|
||||
super.serviceStart();
|
||||
}
|
||||
@ -124,8 +107,6 @@ protected synchronized void serviceStart() throws Exception {
|
||||
@Override
|
||||
protected synchronized void serviceStop() throws Exception {
|
||||
stopServer();
|
||||
transitionToStandby(false);
|
||||
haState = HAServiceState.STOPPING;
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
@ -145,7 +126,7 @@ protected void startServer() throws Exception {
|
||||
refreshServiceAcls(conf, new RMPolicyProvider());
|
||||
}
|
||||
|
||||
if (haEnabled) {
|
||||
if (rmContext.isHAEnabled()) {
|
||||
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
|
||||
@ -182,39 +163,27 @@ private UserGroupInformation checkAcls(String method) throws YarnException {
|
||||
}
|
||||
|
||||
private synchronized boolean isRMActive() {
|
||||
return HAServiceState.ACTIVE == haState;
|
||||
return HAServiceState.ACTIVE == rmContext.getHAServiceState();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void monitorHealth()
|
||||
throws IOException {
|
||||
checkAccess("monitorHealth");
|
||||
if (haState == HAServiceProtocol.HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) {
|
||||
if (isRMActive() && !rm.areActiveServicesRunning()) {
|
||||
throw new HealthCheckFailedException(
|
||||
"Active ResourceManager services are not running!");
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void transitionToActive() throws Exception {
|
||||
if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
|
||||
LOG.info("Already in active state");
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("Transitioning to active");
|
||||
rm.startActiveServices();
|
||||
haState = HAServiceProtocol.HAServiceState.ACTIVE;
|
||||
LOG.info("Transitioned to active");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo)
|
||||
throws IOException {
|
||||
public synchronized void transitionToActive(
|
||||
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
|
||||
UserGroupInformation user = checkAccess("transitionToActive");
|
||||
// TODO (YARN-1177): When automatic failover is enabled,
|
||||
// check if transition should be allowed for this request
|
||||
try {
|
||||
transitionToActive();
|
||||
rm.transitionToActive();
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(),
|
||||
"transitionToActive", "RMHAProtocolService");
|
||||
} catch (Exception e) {
|
||||
@ -226,32 +195,14 @@ public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequest
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void transitionToStandby(boolean initialize)
|
||||
throws Exception {
|
||||
if (haState == HAServiceProtocol.HAServiceState.STANDBY) {
|
||||
LOG.info("Already in standby state");
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("Transitioning to standby");
|
||||
if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
|
||||
rm.stopActiveServices();
|
||||
if (initialize) {
|
||||
rm.createAndInitActiveServices();
|
||||
}
|
||||
}
|
||||
haState = HAServiceProtocol.HAServiceState.STANDBY;
|
||||
LOG.info("Transitioned to standby");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void transitionToStandby(HAServiceProtocol.StateChangeRequestInfo reqInfo)
|
||||
throws IOException {
|
||||
public synchronized void transitionToStandby(
|
||||
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
|
||||
UserGroupInformation user = checkAccess("transitionToStandby");
|
||||
// TODO (YARN-1177): When automatic failover is enabled,
|
||||
// check if transition should be allowed for this request
|
||||
try {
|
||||
transitionToStandby(true);
|
||||
rm.transitionToStandby(true);
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(),
|
||||
"transitionToStandby", "RMHAProtocolService");
|
||||
} catch (Exception e) {
|
||||
@ -266,15 +217,15 @@ public synchronized void transitionToStandby(HAServiceProtocol.StateChangeReques
|
||||
@Override
|
||||
public synchronized HAServiceStatus getServiceStatus() throws IOException {
|
||||
checkAccess("getServiceState");
|
||||
HAServiceState haState = rmContext.getHAServiceState();
|
||||
HAServiceStatus ret = new HAServiceStatus(haState);
|
||||
if (haState == HAServiceProtocol.HAServiceState.ACTIVE || haState ==
|
||||
HAServiceProtocol.HAServiceState.STANDBY) {
|
||||
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
|
||||
ret.setReadyToBecomeActive();
|
||||
} else {
|
||||
ret.setNotReadyToBecomeActive("State is " + haState);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
@ -42,7 +43,11 @@
|
||||
public interface RMContext {
|
||||
|
||||
Dispatcher getDispatcher();
|
||||
|
||||
|
||||
boolean isHAEnabled();
|
||||
|
||||
HAServiceState getHAServiceState();
|
||||
|
||||
RMStateStore getStateStore();
|
||||
|
||||
ConcurrentMap<ApplicationId, RMApp> getRMApps();
|
||||
|
@ -21,6 +21,8 @@
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
@ -35,8 +37,8 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
@ -54,6 +56,10 @@ public class RMContextImpl implements RMContext {
|
||||
private final ConcurrentMap<String, RMNode> inactiveNodes
|
||||
= new ConcurrentHashMap<String, RMNode>();
|
||||
|
||||
private boolean isHAEnabled;
|
||||
private HAServiceState haServiceState =
|
||||
HAServiceProtocol.HAServiceState.INITIALIZING;
|
||||
|
||||
private AMLivelinessMonitor amLivelinessMonitor;
|
||||
private AMLivelinessMonitor amFinishingMonitor;
|
||||
private RMStateStore stateStore = null;
|
||||
@ -211,6 +217,16 @@ public ResourceTrackerService getResourceTrackerService() {
|
||||
return resourceTrackerService;
|
||||
}
|
||||
|
||||
void setHAEnabled(boolean isHAEnabled) {
|
||||
this.isHAEnabled = isHAEnabled;
|
||||
}
|
||||
|
||||
void setHAServiceState(HAServiceState haServiceState) {
|
||||
synchronized (haServiceState) {
|
||||
this.haServiceState = haServiceState;
|
||||
}
|
||||
}
|
||||
|
||||
void setDispatcher(Dispatcher dispatcher) {
|
||||
this.rmDispatcher = dispatcher;
|
||||
}
|
||||
@ -290,4 +306,16 @@ void setResourceTrackerService(
|
||||
ResourceTrackerService resourceTrackerService) {
|
||||
this.resourceTrackerService = resourceTrackerService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isHAEnabled() {
|
||||
return isHAEnabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HAServiceState getHAServiceState() {
|
||||
synchronized (haServiceState) {
|
||||
return haServiceState;
|
||||
}
|
||||
}
|
||||
}
|
@ -27,6 +27,8 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.http.HttpConfig.Policy;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
@ -43,6 +45,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
@ -188,6 +191,12 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
addService(adminService);
|
||||
rmContext.setRMAdminService(adminService);
|
||||
|
||||
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(conf));
|
||||
if (this.rmContext.isHAEnabled()) {
|
||||
HAUtil.verifyAndSetConfiguration(conf);
|
||||
}
|
||||
createAndInitActiveServices();
|
||||
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@ -217,9 +226,8 @@ protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
||||
}
|
||||
|
||||
protected RMStateStoreOperationFailedEventDispatcher
|
||||
createRMStateStoreOperationFailedEventDispatcher() {
|
||||
return new RMStateStoreOperationFailedEventDispatcher(
|
||||
rmContext.getRMAdminService());
|
||||
createRMStateStoreOperationFailedEventDispatcher() {
|
||||
return new RMStateStoreOperationFailedEventDispatcher(rmContext, this);
|
||||
}
|
||||
|
||||
protected Dispatcher createDispatcher() {
|
||||
@ -655,11 +663,14 @@ public void handle(SchedulerEvent event) {
|
||||
@Private
|
||||
public static class RMStateStoreOperationFailedEventDispatcher implements
|
||||
EventHandler<RMStateStoreOperationFailedEvent> {
|
||||
private final AdminService adminService;
|
||||
|
||||
public RMStateStoreOperationFailedEventDispatcher(
|
||||
AdminService adminService) {
|
||||
this.adminService = adminService;
|
||||
private final RMContext rmContext;
|
||||
private final ResourceManager rm;
|
||||
|
||||
public RMStateStoreOperationFailedEventDispatcher(RMContext rmContext,
|
||||
ResourceManager resourceManager) {
|
||||
this.rmContext = rmContext;
|
||||
this.rm = resourceManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -671,16 +682,14 @@ public void handle(RMStateStoreOperationFailedEvent event) {
|
||||
}
|
||||
if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) {
|
||||
LOG.info("RMStateStore has been fenced");
|
||||
synchronized(adminService) {
|
||||
if (adminService.haEnabled) {
|
||||
try {
|
||||
// Transition to standby and reinit active services
|
||||
LOG.info("Transitioning RM to Standby mode");
|
||||
adminService.transitionToStandby(true);
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to transition RM to Standby mode.");
|
||||
}
|
||||
if (rmContext.isHAEnabled()) {
|
||||
try {
|
||||
// Transition to standby and reinit active services
|
||||
LOG.info("Transitioning RM to Standby mode");
|
||||
rm.transitionToStandby(true);
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to transition RM to Standby mode.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -826,10 +835,6 @@ protected void startWepApp() {
|
||||
webApp = builder.start(new RMWebApp(this));
|
||||
}
|
||||
|
||||
void setConf(Configuration configuration) {
|
||||
conf = configuration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to create and init {@link #activeServices}. This creates an
|
||||
* instance of {@link RMActiveServices} and initializes it.
|
||||
@ -870,6 +875,39 @@ protected boolean areActiveServicesRunning() {
|
||||
return activeServices != null && activeServices.isInState(STATE.STARTED);
|
||||
}
|
||||
|
||||
synchronized void transitionToActive() throws Exception {
|
||||
if (rmContext.getHAServiceState() ==
|
||||
HAServiceProtocol.HAServiceState.ACTIVE) {
|
||||
LOG.info("Already in active state");
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("Transitioning to active state");
|
||||
startActiveServices();
|
||||
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
|
||||
LOG.info("Transitioned to active state");
|
||||
}
|
||||
|
||||
synchronized void transitionToStandby(boolean initialize)
|
||||
throws Exception {
|
||||
if (rmContext.getHAServiceState() ==
|
||||
HAServiceProtocol.HAServiceState.STANDBY) {
|
||||
LOG.info("Already in standby state");
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("Transitioning to standby state");
|
||||
if (rmContext.getHAServiceState() ==
|
||||
HAServiceProtocol.HAServiceState.ACTIVE) {
|
||||
stopActiveServices();
|
||||
if (initialize) {
|
||||
createAndInitActiveServices();
|
||||
}
|
||||
}
|
||||
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
|
||||
LOG.info("Transitioned to standby state");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
try {
|
||||
@ -877,6 +915,13 @@ protected void serviceStart() throws Exception {
|
||||
} catch(IOException ie) {
|
||||
throw new YarnRuntimeException("Failed to login", ie);
|
||||
}
|
||||
|
||||
if (this.rmContext.isHAEnabled()) {
|
||||
transitionToStandby(true);
|
||||
} else {
|
||||
transitionToActive();
|
||||
}
|
||||
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@ -888,6 +933,8 @@ protected void doSecureLogin() throws IOException {
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
super.serviceStop();
|
||||
transitionToStandby(false);
|
||||
rmContext.setHAServiceState(HAServiceState.STOPPING);
|
||||
}
|
||||
|
||||
protected ResourceTrackerService createResourceTrackerService() {
|
||||
|
Loading…
Reference in New Issue
Block a user