YARN-1574. RMDispatcher should be reset on transition to standby. (Xuan Gong via kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1557248 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7354547a02
commit
6608b75829
@ -95,7 +95,7 @@ protected boolean addIfService(Object object) {
|
|||||||
|
|
||||||
protected synchronized boolean removeService(Service service) {
|
protected synchronized boolean removeService(Service service) {
|
||||||
synchronized (serviceList) {
|
synchronized (serviceList) {
|
||||||
return serviceList.add(service);
|
return serviceList.remove(service);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,6 +313,9 @@ Release 2.4.0 - UNRELEASED
|
|||||||
YARN-1293. Fixed TestContainerLaunch#testInvalidEnvSyntaxDiagnostics failure
|
YARN-1293. Fixed TestContainerLaunch#testInvalidEnvSyntaxDiagnostics failure
|
||||||
caused by non-English system locale. (Tsuyoshi OZAWA via jianhe)
|
caused by non-English system locale. (Tsuyoshi OZAWA via jianhe)
|
||||||
|
|
||||||
|
YARN-1574. RMDispatcher should be reset on transition to standby. (Xuan Gong
|
||||||
|
via kasha)
|
||||||
|
|
||||||
Release 2.3.0 - UNRELEASED
|
Release 2.3.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -338,6 +338,33 @@ public void serviceInit(Configuration conf) {
|
|||||||
1, testService.getServices().size());
|
1, testService.getServices().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveService() {
|
||||||
|
CompositeService testService = new CompositeService("TestService") {
|
||||||
|
@Override
|
||||||
|
public void serviceInit(Configuration conf) {
|
||||||
|
Integer notAService = new Integer(0);
|
||||||
|
assertFalse("Added an integer as a service",
|
||||||
|
addIfService(notAService));
|
||||||
|
|
||||||
|
Service service1 = new AbstractService("Service1") {};
|
||||||
|
addIfService(service1);
|
||||||
|
|
||||||
|
Service service2 = new AbstractService("Service2") {};
|
||||||
|
addIfService(service2);
|
||||||
|
|
||||||
|
Service service3 = new AbstractService("Service3") {};
|
||||||
|
addIfService(service3);
|
||||||
|
|
||||||
|
removeService(service1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
testService.init(new Configuration());
|
||||||
|
assertEquals("Incorrect number of services",
|
||||||
|
2, testService.getServices().size());
|
||||||
|
}
|
||||||
|
|
||||||
public static class CompositeServiceAddingAChild extends CompositeService{
|
public static class CompositeServiceAddingAChild extends CompositeService{
|
||||||
Service child;
|
Service child;
|
||||||
|
|
||||||
|
@ -37,6 +37,7 @@
|
|||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.service.CompositeService;
|
import org.apache.hadoop.service.CompositeService;
|
||||||
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.util.ExitUtil;
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.ShutdownHookManager;
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
@ -180,13 +181,11 @@ protected void serviceInit(Configuration conf) throws Exception {
|
|||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.rmContext = new RMContextImpl();
|
this.rmContext = new RMContextImpl();
|
||||||
|
|
||||||
rmDispatcher = createDispatcher();
|
// register the handlers for all AlwaysOn services using setupDispatcher().
|
||||||
|
rmDispatcher = setupDispatcher();
|
||||||
addIfService(rmDispatcher);
|
addIfService(rmDispatcher);
|
||||||
rmContext.setDispatcher(rmDispatcher);
|
rmContext.setDispatcher(rmDispatcher);
|
||||||
|
|
||||||
rmDispatcher.register(RMFatalEventType.class,
|
|
||||||
new ResourceManager.RMFatalEventDispatcher(this.rmContext, this));
|
|
||||||
|
|
||||||
adminService = createAdminService();
|
adminService = createAdminService();
|
||||||
addService(adminService);
|
addService(adminService);
|
||||||
rmContext.setRMAdminService(adminService);
|
rmContext.setRMAdminService(adminService);
|
||||||
@ -832,6 +831,7 @@ synchronized void transitionToStandby(boolean initialize)
|
|||||||
HAServiceProtocol.HAServiceState.ACTIVE) {
|
HAServiceProtocol.HAServiceState.ACTIVE) {
|
||||||
stopActiveServices();
|
stopActiveServices();
|
||||||
if (initialize) {
|
if (initialize) {
|
||||||
|
resetDispatcher();
|
||||||
createAndInitActiveServices();
|
createAndInitActiveServices();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -994,4 +994,24 @@ private static void setHttpPolicy(Configuration conf) {
|
|||||||
YarnConfiguration.YARN_HTTP_POLICY_KEY,
|
YarnConfiguration.YARN_HTTP_POLICY_KEY,
|
||||||
YarnConfiguration.YARN_HTTP_POLICY_DEFAULT)));
|
YarnConfiguration.YARN_HTTP_POLICY_DEFAULT)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register the handlers for alwaysOn services
|
||||||
|
*/
|
||||||
|
private Dispatcher setupDispatcher() {
|
||||||
|
Dispatcher dispatcher = createDispatcher();
|
||||||
|
dispatcher.register(RMFatalEventType.class,
|
||||||
|
new ResourceManager.RMFatalEventDispatcher(this.rmContext, this));
|
||||||
|
return dispatcher;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void resetDispatcher() {
|
||||||
|
Dispatcher dispatcher = setupDispatcher();
|
||||||
|
((Service)dispatcher).init(this.conf);
|
||||||
|
((Service)dispatcher).start();
|
||||||
|
removeService((Service)rmDispatcher);
|
||||||
|
rmDispatcher = dispatcher;
|
||||||
|
addIfService(rmDispatcher);
|
||||||
|
rmContext.setDispatcher(rmDispatcher);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,8 +26,11 @@
|
|||||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -222,4 +225,81 @@ public void testTransitionsWhenAutomaticFailoverEnabled() throws IOException {
|
|||||||
checkMonitorHealth();
|
checkMonitorHealth();
|
||||||
checkActiveRMFunctionality();
|
checkActiveRMFunctionality();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRMDispatcherForHA() throws IOException {
|
||||||
|
String errorMessageForEventHandler =
|
||||||
|
"Expect to get the same number of handlers";
|
||||||
|
String errorMessageForService = "Expect to get the same number of services";
|
||||||
|
Configuration conf = new YarnConfiguration(configuration);
|
||||||
|
rm = new MockRM(conf) {
|
||||||
|
@Override
|
||||||
|
protected Dispatcher createDispatcher() {
|
||||||
|
return new MyCountingDispatcher();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm.init(conf);
|
||||||
|
int expectedEventHandlerCount =
|
||||||
|
((MyCountingDispatcher) rm.getRMContext().getDispatcher())
|
||||||
|
.getEventHandlerCount();
|
||||||
|
int expectedServiceCount = rm.getServices().size();
|
||||||
|
assertTrue(expectedEventHandlerCount != 0);
|
||||||
|
|
||||||
|
StateChangeRequestInfo requestInfo = new StateChangeRequestInfo(
|
||||||
|
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||||
|
|
||||||
|
assertEquals(STATE_ERR, HAServiceState.INITIALIZING,
|
||||||
|
rm.adminService.getServiceStatus().getState());
|
||||||
|
assertFalse("RM is ready to become active before being started",
|
||||||
|
rm.adminService.getServiceStatus().isReadyToBecomeActive());
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
//call transitions to standby and active a couple of times
|
||||||
|
rm.adminService.transitionToStandby(requestInfo);
|
||||||
|
rm.adminService.transitionToActive(requestInfo);
|
||||||
|
rm.adminService.transitionToStandby(requestInfo);
|
||||||
|
rm.adminService.transitionToActive(requestInfo);
|
||||||
|
rm.adminService.transitionToStandby(requestInfo);
|
||||||
|
|
||||||
|
rm.adminService.transitionToActive(requestInfo);
|
||||||
|
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
|
||||||
|
((MyCountingDispatcher) rm.getRMContext().getDispatcher())
|
||||||
|
.getEventHandlerCount());
|
||||||
|
assertEquals(errorMessageForService, expectedServiceCount,
|
||||||
|
rm.getServices().size());
|
||||||
|
|
||||||
|
rm.adminService.transitionToStandby(requestInfo);
|
||||||
|
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
|
||||||
|
((MyCountingDispatcher) rm.getRMContext().getDispatcher())
|
||||||
|
.getEventHandlerCount());
|
||||||
|
assertEquals(errorMessageForService, expectedServiceCount,
|
||||||
|
rm.getServices().size());
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
class MyCountingDispatcher extends AbstractService implements Dispatcher {
|
||||||
|
|
||||||
|
private int eventHandlerCount;
|
||||||
|
|
||||||
|
public MyCountingDispatcher() {
|
||||||
|
super("MyCountingDispatcher");
|
||||||
|
this.eventHandlerCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EventHandler getEventHandler() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void register(Class<? extends Enum> eventType, EventHandler handler) {
|
||||||
|
this.eventHandlerCount ++;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getEventHandlerCount() {
|
||||||
|
return this.eventHandlerCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user