From 6608b75829992e84d265dec84b6cb52f041b454a Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Fri, 10 Jan 2014 20:15:00 +0000 Subject: [PATCH] YARN-1574. RMDispatcher should be reset on transition to standby. (Xuan Gong via kasha) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1557248 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/service/CompositeService.java | 2 +- hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/util/TestCompositeService.java | 29 ++++++- .../resourcemanager/ResourceManager.java | 28 ++++++- .../yarn/server/resourcemanager/TestRMHA.java | 80 +++++++++++++++++++ 5 files changed, 136 insertions(+), 6 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java index 383a7a8414..123ca52ba9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java @@ -95,7 +95,7 @@ protected boolean addIfService(Object object) { protected synchronized boolean removeService(Service service) { synchronized (serviceList) { - return serviceList.add(service); + return serviceList.remove(service); } } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index fa7591458c..8e6aef4730 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -313,6 +313,9 @@ Release 2.4.0 - UNRELEASED YARN-1293. Fixed TestContainerLaunch#testInvalidEnvSyntaxDiagnostics failure caused by non-English system locale. (Tsuyoshi OZAWA via jianhe) + YARN-1574. RMDispatcher should be reset on transition to standby. (Xuan Gong + via kasha) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java index 34b77cd92b..3dbdc135ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java @@ -337,7 +337,34 @@ public void serviceInit(Configuration conf) { assertEquals("Incorrect number of services", 1, testService.getServices().size()); } - + + @Test + public void testRemoveService() { + CompositeService testService = new CompositeService("TestService") { + @Override + public void serviceInit(Configuration conf) { + Integer notAService = new Integer(0); + assertFalse("Added an integer as a service", + addIfService(notAService)); + + Service service1 = new AbstractService("Service1") {}; + addIfService(service1); + + Service service2 = new AbstractService("Service2") {}; + addIfService(service2); + + Service service3 = new AbstractService("Service3") {}; + addIfService(service3); + + removeService(service1); + } + }; + + testService.init(new Configuration()); + assertEquals("Incorrect number of services", + 2, testService.getServices().size()); + } + public static class CompositeServiceAddingAChild extends CompositeService{ Service child; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 16c7ac7eed..83d7f9df83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -37,6 +37,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.Service; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; @@ -180,13 +181,11 @@ protected void serviceInit(Configuration conf) throws Exception { this.conf = conf; this.rmContext = new RMContextImpl(); - rmDispatcher = createDispatcher(); + // register the handlers for all AlwaysOn services using setupDispatcher(). + rmDispatcher = setupDispatcher(); addIfService(rmDispatcher); rmContext.setDispatcher(rmDispatcher); - rmDispatcher.register(RMFatalEventType.class, - new ResourceManager.RMFatalEventDispatcher(this.rmContext, this)); - adminService = createAdminService(); addService(adminService); rmContext.setRMAdminService(adminService); @@ -832,6 +831,7 @@ synchronized void transitionToStandby(boolean initialize) HAServiceProtocol.HAServiceState.ACTIVE) { stopActiveServices(); if (initialize) { + resetDispatcher(); createAndInitActiveServices(); } } @@ -994,4 +994,24 @@ private static void setHttpPolicy(Configuration conf) { YarnConfiguration.YARN_HTTP_POLICY_KEY, YarnConfiguration.YARN_HTTP_POLICY_DEFAULT))); } + + /** + * Register the handlers for alwaysOn services + */ + private Dispatcher setupDispatcher() { + Dispatcher dispatcher = createDispatcher(); + dispatcher.register(RMFatalEventType.class, + new ResourceManager.RMFatalEventDispatcher(this.rmContext, this)); + return dispatcher; + } + + private void resetDispatcher() { + Dispatcher dispatcher = setupDispatcher(); + ((Service)dispatcher).init(this.conf); + ((Service)dispatcher).start(); + removeService((Service)rmDispatcher); + rmDispatcher = dispatcher; + addIfService(rmDispatcher); + rmContext.setDispatcher(rmDispatcher); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index dc98b7cf38..9d1a46776c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -26,8 +26,11 @@ import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ha.HealthCheckFailedException; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Before; import org.junit.Test; @@ -222,4 +225,81 @@ public void testTransitionsWhenAutomaticFailoverEnabled() throws IOException { checkMonitorHealth(); checkActiveRMFunctionality(); } + + @Test + public void testRMDispatcherForHA() throws IOException { + String errorMessageForEventHandler = + "Expect to get the same number of handlers"; + String errorMessageForService = "Expect to get the same number of services"; + Configuration conf = new YarnConfiguration(configuration); + rm = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return new MyCountingDispatcher(); + } + }; + rm.init(conf); + int expectedEventHandlerCount = + ((MyCountingDispatcher) rm.getRMContext().getDispatcher()) + .getEventHandlerCount(); + int expectedServiceCount = rm.getServices().size(); + assertTrue(expectedEventHandlerCount != 0); + + StateChangeRequestInfo requestInfo = new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + assertEquals(STATE_ERR, HAServiceState.INITIALIZING, + rm.adminService.getServiceStatus().getState()); + assertFalse("RM is ready to become active before being started", + rm.adminService.getServiceStatus().isReadyToBecomeActive()); + rm.start(); + + //call transitions to standby and active a couple of times + rm.adminService.transitionToStandby(requestInfo); + rm.adminService.transitionToActive(requestInfo); + rm.adminService.transitionToStandby(requestInfo); + rm.adminService.transitionToActive(requestInfo); + rm.adminService.transitionToStandby(requestInfo); + + rm.adminService.transitionToActive(requestInfo); + assertEquals(errorMessageForEventHandler, expectedEventHandlerCount, + ((MyCountingDispatcher) rm.getRMContext().getDispatcher()) + .getEventHandlerCount()); + assertEquals(errorMessageForService, expectedServiceCount, + rm.getServices().size()); + + rm.adminService.transitionToStandby(requestInfo); + assertEquals(errorMessageForEventHandler, expectedEventHandlerCount, + ((MyCountingDispatcher) rm.getRMContext().getDispatcher()) + .getEventHandlerCount()); + assertEquals(errorMessageForService, expectedServiceCount, + rm.getServices().size()); + + rm.stop(); + } + + @SuppressWarnings("rawtypes") + class MyCountingDispatcher extends AbstractService implements Dispatcher { + + private int eventHandlerCount; + + public MyCountingDispatcher() { + super("MyCountingDispatcher"); + this.eventHandlerCount = 0; + } + + @Override + public EventHandler getEventHandler() { + return null; + } + + @Override + public void register(Class eventType, EventHandler handler) { + this.eventHandlerCount ++; + } + + public int getEventHandlerCount() { + return this.eventHandlerCount; + } + } }