diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 8879362e1b..bc05c623e2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -2414,10 +2414,7 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() conf.setInt( MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - - MyResourceManager rm1 = new MyResourceManager(conf, memStore); + MyResourceManager rm1 = new MyResourceManager(conf); rm1.start(); // Submit the application @@ -2504,7 +2501,7 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() assertBlacklistAdditionsAndRemovals(0, 0, rm1); // Phase-2 start 2nd RM is up - MyResourceManager rm2 = new MyResourceManager(conf, memStore); + MyResourceManager rm2 = new MyResourceManager(conf, rm1.getRMStateStore()); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); allocator.updateSchedulerProxy(rm2); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 5a215e5706..e96780747c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -102,8 +102,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; - - import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.log4j.Level; @@ -174,14 +172,6 @@ public MockRM(Configuration conf, RMStateStore store, disableDrainEventsImplicitly = false; } - public class MockRMMemoryStateStore extends MemoryRMStateStore { - @SuppressWarnings("rawtypes") - @Override - protected EventHandler getRMStateStoreEventHandler() { - return rmStateStoreEventHandler; - } - } - public class MockRMNullStateStore extends NullRMStateStore { @SuppressWarnings("rawtypes") @Override @@ -1294,4 +1284,8 @@ protected void serviceInit(Configuration conf) throws Exception { ((AsyncDispatcher) getRmDispatcher()).disableExitOnDispatchException(); } } + + public RMStateStore getRMStateStore() { + return getRMContext().getStateStore(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 422b7eb88a..ebca7a354f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -153,7 +153,6 @@ public void testAppCleanup() throws Exception { rm.stop(); } - @SuppressWarnings("resource") @Test public void testContainerCleanup() throws Exception { @@ -291,11 +290,8 @@ private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) @Test (timeout = 60000) public void testAppCleanupWhenRMRestartedAfterAppFinished() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -308,7 +304,7 @@ public void testAppCleanupWhenRMRestartedAfterAppFinished() throws Exception { rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED); // start new RM - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); // nm1 register to rm2, and do a heartbeat @@ -327,11 +323,9 @@ public void testAppCleanupWhenRMRestartedAfterAppFinished() throws Exception { @Test(timeout = 60000) public void testAppCleanupWhenRMRestartedBeforeAppFinished() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 1024, rm1.getResourceTrackerService()); @@ -357,7 +351,7 @@ public void testAppCleanupWhenRMRestartedBeforeAppFinished() throws Exception { } // start new RM - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); // nm1/nm2 register to rm2, and do a heartbeat @@ -383,16 +377,12 @@ public void testAppCleanupWhenRMRestartedBeforeAppFinished() throws Exception { rm2.stop(); } - @SuppressWarnings("resource") @Test (timeout = 60000) public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -405,7 +395,7 @@ public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING); // start new RM - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); // nm1 register to rm2, and do a heartbeat @@ -426,11 +416,9 @@ public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws @Test (timeout = 60000) public void testAppCleanupWhenNMReconnects() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -466,11 +454,9 @@ public void testAppCleanupWhenNMReconnects() throws Exception { @Test(timeout = 60000) public void testProcessingNMContainerStatusesOnNMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); // 1. Start the cluster-RM,NM,Submit app with 1024MB,Launch & register AM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); rm1.start(); int nmMemory = 8192; int amMemory = 1024; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java index 3db00a242d..ba9de6c8d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java @@ -138,10 +138,8 @@ public void testUsageWithMultipleContainersAndRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - - MockRM rm0 = new MockRM(conf, memStore); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + MockRM rm0 = new MockRM(conf); rm0.start(); MockNM nm = new MockNM("127.0.0.1:1234", 65536, rm0.getResourceTrackerService()); @@ -229,7 +227,7 @@ public void testUsageWithMultipleContainersAndRMRestart() throws Exception { vcoreSeconds, metricsBefore.getVcoreSeconds()); // create new RM to represent RM restart. Load up the state store. - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf, rm0.getRMStateStore()); rm1.start(); RMApp app0After = rm1.getRMContext().getRMApps().get(app0.getApplicationId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java index 75ef5c775b..526621004c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java @@ -413,11 +413,9 @@ private void makeAMContainerExit(MockRM rm, ContainerId amContainer, } private MockRM startRM(YarnConfiguration conf) { - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - - MockRM rm = new MockRM(conf, memStore); - + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + MockRM rm = new MockRM(conf); rm.start(); return rm; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index ec6b1e64f5..b5293a5281 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -415,7 +415,7 @@ public void testFailoverWhenTransitionToActiveThrowException() configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); Configuration conf = new YarnConfiguration(configuration); - MemoryRMStateStore memStore = new MemoryRMStateStore() { + MemoryRMStateStore memStore = new MockRMMemoryStateStore() { int count = 0; @Override @@ -465,7 +465,7 @@ public void testTransitionedToStandbyShouldNotHang() throws Exception { configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); Configuration conf = new YarnConfiguration(configuration); - MemoryRMStateStore memStore = new MemoryRMStateStore() { + MemoryRMStateStore memStore = new MockRMMemoryStateStore() { @Override public void updateApplicationState(ApplicationStateData appState) { notifyStoreOperationFailed(new StoreFencedException()); @@ -530,12 +530,10 @@ public void testFailoverClearsRMContext() throws Exception { configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); Configuration conf = new YarnConfiguration(configuration); - - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); // 1. start RM - rm = new MockRM(conf, memStore); + rm = new MockRM(conf); rm.init(conf); rm.start(); @@ -565,7 +563,7 @@ public void testFailoverClearsRMContext() throws Exception { verifyClusterMetrics(0, 0, 0, 0, 0, 0); // 3. Create new RM - rm = new MockRM(conf, memStore) { + rm = new MockRM(conf, rm.getRMStateStore()) { @Override protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 955b4b61cd..f9d0eae29e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -178,24 +178,23 @@ private MockRM createMockRM(YarnConfiguration conf, RMStateStore store) { return rm; } - @SuppressWarnings("rawtypes") + private MockRM createMockRM(YarnConfiguration config) { + MockRM rm = new MockRM(config); + rms.add(rm); + return rm; + } + @Test (timeout=180000) public void testRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - RMState rmState = memStore.getState(); + // PHASE 1: create RM and get state + MockRM rm1 = createMockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); Map rmAppState = - rmState.getApplicationState(); - - - // PHASE 1: create state in an RM - - // start RM - MockRM rm1 = createMockRM(conf, memStore); - + memStore.getState().getApplicationState(); + // start like normal because state is empty rm1.start(); @@ -451,14 +450,12 @@ public void testRMRestart() throws Exception { public void testRMRestartAppRunningAMFailed() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - RMState rmState = memStore.getState(); - Map rmAppState = - rmState.getApplicationState(); - // start RM - MockRM rm1 = createMockRM(conf, memStore); + // Create RM + MockRM rm1 = createMockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); + Map rmAppState = + memStore.getState().getApplicationState(); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -508,14 +505,13 @@ public void testRMRestartWaitForPreviousAMToFinish() throws Exception { // be started immediately. YarnConfiguration conf = new YarnConfiguration(this.conf); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 40); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - RMState rmState = memStore.getState(); + + // create RM + MockRM rm1 = createMockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); Map rmAppState = - rmState.getApplicationState(); - + memStore.getState().getApplicationState(); // start RM - final MockRM rm1 = createMockRM(conf, memStore); rm1.start(); AbstractYarnScheduler ys = (AbstractYarnScheduler)rm1.getResourceScheduler(); @@ -674,7 +670,7 @@ public Boolean get() { @Test (timeout = 60000) public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); - MemoryRMStateStore memStore = new MemoryRMStateStore() { + MemoryRMStateStore memStore = new MockRMMemoryStateStore() { int count = 0; @Override @@ -727,14 +723,12 @@ public void updateApplicationStateInternal(ApplicationId appId, @Test (timeout = 60000) public void testRMRestartFailedApp() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - RMState rmState = memStore.getState(); + // create RM + MockRM rm1 = createMockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); Map rmAppState = - rmState.getApplicationState(); - + memStore.getState().getApplicationState(); // start RM - MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -775,14 +769,12 @@ public void testRMRestartFailedApp() throws Exception { public void testRMRestartKilledApp() throws Exception{ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - RMState rmState = memStore.getState(); + // create RM + MockRM rm1 = createMockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); Map rmAppState = - rmState.getApplicationState(); - + memStore.getState().getApplicationState(); // start RM - MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -823,7 +815,7 @@ public void testRMRestartKilledApp() throws Exception{ @Test (timeout = 60000) public void testRMRestartKilledAppWithNoAttempts() throws Exception { - MemoryRMStateStore memStore = new MemoryRMStateStore() { + MemoryRMStateStore memStore = new MockRMMemoryStateStore() { @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId attemptId, @@ -865,14 +857,13 @@ public synchronized void updateApplicationAttemptStateInternal( public void testRMRestartSucceededApp() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - RMState rmState = memStore.getState(); + // PHASE 1: create RM and get state + MockRM rm1 = createMockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); Map rmAppState = - rmState.getApplicationState(); + memStore.getState().getApplicationState(); - // start RM - MockRM rm1 = createMockRM(conf, memStore); + // start like normal because state is empty rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -913,11 +904,8 @@ public void testRMRestartSucceededApp() throws Exception { @Test (timeout = 60000) public void testRMRestartGetApplicationList() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - // start RM - MockRM rm1 = new MockRM(conf, memStore) { + MockRM rm1 = new MockRM(conf) { @Override protected SystemMetricsPublisher createSystemMetricsPublisher() { return spy(super.createSystemMetricsPublisher()); @@ -956,7 +944,7 @@ protected SystemMetricsPublisher createSystemMetricsPublisher() { .appCreated(any(RMApp.class), anyLong()); // restart rm - MockRM rm2 = new MockRM(conf, memStore) { + MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()) { @Override protected RMAppManager createRMAppManager() { return spy(super.createRMAppManager()); @@ -1077,13 +1065,12 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - RMState rmState = memStore.getState(); - + // create RM + MockRM rm1 = createMockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); Map rmAppState = - rmState.getApplicationState(); - MockRM rm1 = createMockRM(conf, memStore); + memStore.getState().getApplicationState(); + // start RM rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1146,16 +1133,15 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { public void testRMRestartTimelineCollectorContext() throws Exception { conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - RMState rmState = memStore.getState(); - Map rmAppState = - rmState.getApplicationState(); + MockRM rm1 = null; MockRM rm2 = null; try { - rm1 = createMockRM(conf, memStore); + rm1 = createMockRM(conf); rm1.start(); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); + Map rmAppState = + memStore.getState().getApplicationState(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); @@ -1212,13 +1198,12 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer() "kerberos"); UserGroupInformation.setConfiguration(conf); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - RMState rmState = memStore.getState(); - + // create RM + MockRM rm1 = new TestSecurityMockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); Map rmAppState = - rmState.getApplicationState(); - MockRM rm1 = new TestSecurityMockRM(conf, memStore); + memStore.getState().getApplicationState(); + // start RM rm1.start(); HashSet> tokenSet = @@ -1307,13 +1292,12 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { "kerberos"); UserGroupInformation.setConfiguration(conf); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - RMState rmState = memStore.getState(); - + // create RM + MockRM rm1 = new TestSecurityMockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); Map rmAppState = - rmState.getApplicationState(); - MockRM rm1 = new TestSecurityMockRM(conf, memStore); + memStore.getState().getApplicationState(); + // start RM rm1.start(); MockNM nm1 = new MockNM("0.0.0.0:4321", 15120, rm1.getResourceTrackerService()); @@ -1388,8 +1372,10 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { "kerberos"); conf.set(YarnConfiguration.RM_ADDRESS, "localhost:8032"); UserGroupInformation.setConfiguration(conf); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); + + MockRM rm1 = new TestSecurityMockRM(conf); + rm1.start(); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); RMState rmState = memStore.getState(); Map rmAppState = @@ -1399,10 +1385,6 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { Set rmDTMasterKeyState = rmState.getRMDTSecretManagerState().getMasterKeyState(); - MockRM rm1 = new TestSecurityMockRM(conf, memStore); - - rm1.start(); - // create an empty credential Credentials ts = new Credentials(); @@ -1537,10 +1519,8 @@ public void testAppSubmissionWithOldDelegationTokenAfterRMRestart() "kerberos"); conf.set(YarnConfiguration.RM_ADDRESS, "localhost:8032"); UserGroupInformation.setConfiguration(conf); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - MockRM rm1 = new TestSecurityMockRM(conf, memStore); + MockRM rm1 = new TestSecurityMockRM(conf); rm1.start(); GetDelegationTokenRequest request1 = @@ -1553,7 +1533,7 @@ public void testAppSubmissionWithOldDelegationTokenAfterRMRestart() ConverterUtils.convertFromYarn(response1.getRMDelegationToken(), rmAddr); // start new RM - MockRM rm2 = new TestSecurityMockRM(conf, memStore); + MockRM rm2 = new TestSecurityMockRM(conf, rm1.getRMStateStore()); rm2.start(); // submit an app with the old delegation token got from previous RM. @@ -1631,14 +1611,13 @@ protected void handleStoreEvent(RMStateStoreEvent event) { @Test (timeout = 60000) public void testFinishedAppRemovalAfterRMRestart() throws Exception { - MemoryRMStateStore memStore = new MemoryRMStateStore(); conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1); - memStore.init(conf); - RMState rmState = memStore.getState(); // start RM - MockRM rm1 = createMockRM(conf, memStore); + MockRM rm1 = createMockRM(conf); rm1.start(); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); + RMState rmState = memStore.getState(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); @@ -1676,7 +1655,7 @@ public void testFinishedAppRemovalAfterRMRestart() throws Exception { // This is to test RM does not get hang on shutdown. @Test (timeout = 10000) public void testRMShutdown() throws Exception { - MemoryRMStateStore memStore = new MemoryRMStateStore() { + MemoryRMStateStore memStore = new MockRMMemoryStateStore() { @Override public synchronized void checkVersion() throws Exception { @@ -1742,10 +1721,7 @@ public void testAppFailedOnSubmissionSavedInStateStore() throws Exception { conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - - MockRM rm1 = new TestSecurityMockRM(conf, memStore) { + MockRM rm1 = new TestSecurityMockRM(conf) { class TestDelegationTokenRenewer extends DelegationTokenRenewer { public void addApplicationAsync(ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd, String user, Configuration appConf) { @@ -1758,6 +1734,7 @@ protected DelegationTokenRenewer createDelegationTokenRenewer() { } }; rm1.start(); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); RMApp app1 = null; try { app1 = rm1.submitApp(200, "name", "user", @@ -1781,7 +1758,7 @@ protected DelegationTokenRenewer createDelegationTokenRenewer() { @Test (timeout = 20000) public void testAppRecoveredInOrderOnRMRestart() throws Exception { - MemoryRMStateStore memStore = new MemoryRMStateStore(); + MemoryRMStateStore memStore = new MockRMMemoryStateStore(); memStore.init(conf); for (int i = 10; i > 0; i--) { @@ -1836,12 +1813,8 @@ protected void recoverApplication(ApplicationStateData appState, public void testQueueMetricsOnRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - - // PHASE 1: create state in an RM // start RM - MockRM rm1 = createMockRM(conf, memStore); + MockRM rm1 = createMockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1879,7 +1852,7 @@ public void testQueueMetricsOnRMRestart() throws Exception { // PHASE 2: create new RM and start from old state // create new RM to represent restart and recover state - MockRM rm2 = createMockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, rm1.getRMStateStore()); QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics(); resetQueueMetrics(qm2); assertQueueMetrics(qm2, 0, 0, 0, 0); @@ -1960,7 +1933,6 @@ private void assertQueueMetrics(QueueMetrics qm, int appsSubmitted, @Test (timeout = 60000) public void testDecomissionedNMsMetricsOnRMRestart() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile.getAbsolutePath()); writeToHostsFile(""); @@ -2039,11 +2011,9 @@ public void testSynchronouslyRenewDTOnRecovery() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); // start RM - MockRM rm1 = createMockRM(conf, memStore); + MockRM rm1 = createMockRM(conf); rm1.start(); final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -2051,7 +2021,7 @@ public void testSynchronouslyRenewDTOnRecovery() throws Exception { RMApp app0 = rm1.submitApp(200); final MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); - MockRM rm2 = new MockRM(conf, memStore) { + MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()) { @Override protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, @@ -2158,6 +2128,10 @@ public TestSecurityMockRM(Configuration conf, RMStateStore store) { super(conf, store); } + public TestSecurityMockRM(Configuration conf) { + super(conf); + } + @Override public void init(Configuration conf) { // reset localServiceAddress. @@ -2208,10 +2182,8 @@ public void testRMRestartRecoveringNodeLabelManager() throws Exception { conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR, nodeLabelFsStoreDirURI); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); - MockRM rm1 = new MockRM(conf, memStore) { + MockRM rm1 = new MockRM(conf) { @Override protected RMNodeLabelsManager createNodeLabelManager() { RMNodeLabelsManager mgr = new RMNodeLabelsManager(); @@ -2261,7 +2233,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals(1, nodeLabelManager.getNodeLabels().size()); Assert.assertTrue(nodeLabels.get(n1).equals(toSet("y"))); - MockRM rm2 = new MockRM(conf, memStore) { + MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()) { @Override protected RMNodeLabelsManager createNodeLabelManager() { RMNodeLabelsManager mgr = new RMNodeLabelsManager(); @@ -2290,14 +2262,12 @@ public void testRMRestartFailAppAttempt() throws Exception { int maxAttempt = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - RMState rmState = memStore.getState(); + // create RM + MockRM rm1 = createMockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); Map rmAppState = - rmState.getApplicationState(); - + memStore.getState().getApplicationState(); // start RM - MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -2365,10 +2335,8 @@ public void testRMRestartNodeMapping() throws Exception { conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR, nodeLabelFsStoreDirURI); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); - MockRM rm1 = new MockRM(conf, memStore) { + MockRM rm1 = new MockRM(conf) { @Override protected RMNodeLabelsManager createNodeLabelManager() { RMNodeLabelsManager mgr = new RMNodeLabelsManager(); @@ -2396,7 +2364,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { nodeLabelManager.replaceLabelsOnNode(ImmutableMap.of(n1, toSet("x"))); MockRM rm2 = null; for (int i = 0; i < 2; i++) { - rm2 = new MockRM(conf, memStore) { + rm2 = new MockRM(conf, rm1.getRMStateStore()) { @Override protected RMNodeLabelsManager createNodeLabelManager() { RMNodeLabelsManager mgr = new RMNodeLabelsManager(); @@ -2419,15 +2387,12 @@ protected RMNodeLabelsManager createNodeLabelManager() { @Test(timeout = 120000) public void testRMRestartAfterPreemption() throws Exception { - Configuration conf = new Configuration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); if (!getSchedulerType().equals(SchedulerType.CAPACITY)) { return; } - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); rm1.start(); CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); @@ -2466,7 +2431,7 @@ public void testRMRestartAfterPreemption() throws Exception { MockRM rm2 = null; // start RM2 try { - rm2 = new MockRM(conf, memStore); + rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); Assert.assertTrue("RM start successfully", true); } catch (Exception e) { @@ -2480,11 +2445,10 @@ public void testRMRestartAfterPreemption() throws Exception { @Test(timeout = 60000) public void testRMRestartOnMissingAttempts() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - + // create RM + MockRM rm1 = createMockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); // start RM - MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -2540,13 +2504,10 @@ private MockAM launchAndFailAM(RMApp app, MockRM rm, MockNM nm) @Test(timeout = 60000) public void testRMRestartAfterNodeLabelDisabled() throws Exception { - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); MockRM rm1 = new MockRM( - TestUtils.getConfigurationWithDefaultQueueLabels(conf), memStore) { + TestUtils.getConfigurationWithDefaultQueueLabels(conf)) { @Override protected RMNodeLabelsManager createNodeLabelManager() { RMNodeLabelsManager mgr = new RMNodeLabelsManager(); @@ -2580,7 +2541,8 @@ protected RMNodeLabelsManager createNodeLabelManager() { // restart rm with node label disabled conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, false); MockRM rm2 = new MockRM( - TestUtils.getConfigurationWithDefaultQueueLabels(conf), memStore) { + TestUtils.getConfigurationWithDefaultQueueLabels(conf), + rm1.getRMStateStore()) { @Override protected RMNodeLabelsManager createNodeLabelManager() { RMNodeLabelsManager mgr = new RMNodeLabelsManager(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index c4cf2560f8..d02822791b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -148,9 +148,7 @@ public void testSchedulerRecovery() throws Exception { int containerMemory = 1024; Resource containerResource = Resource.newInstance(containerMemory, 1); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - rm1 = new MockRM(conf, memStore); + rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); @@ -162,7 +160,7 @@ public void testSchedulerRecovery() throws Exception { rm1.clearQueueMetrics(app1); // Re-start RM - rm2 = new MockRM(conf, memStore); + rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); // recover app @@ -296,9 +294,7 @@ public void testDynamicQueueRecovery() throws Exception { int containerMemory = 1024; Resource containerResource = Resource.newInstance(containerMemory, 1); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(schedulerConf); - rm1 = new MockRM(schedulerConf, memStore); + rm1 = new MockRM(schedulerConf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); @@ -316,7 +312,7 @@ public void testDynamicQueueRecovery() throws Exception { rm1.clearQueueMetrics(app1); // 3. Fail over (restart) RM. - rm2 = new MockRM(schedulerConf, memStore); + rm2 = new MockRM(schedulerConf, rm1.getRMStateStore()); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); // 4. Validate app is recovered post failover. @@ -570,9 +566,7 @@ private void setupQueueConfigurationChildOfB(CapacitySchedulerConfiguration conf public void testRMRestartWithRemovedQueue() throws Exception{ conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); conf.set(YarnConfiguration.YARN_ADMIN_ACL, ""); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - rm1 = new MockRM(conf, memStore); + rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); @@ -585,7 +579,7 @@ public void testRMRestartWithRemovedQueue() throws Exception{ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{QUEUE_DOESNT_EXIST}); final String noQueue = CapacitySchedulerConfiguration.ROOT + "." + QUEUE_DOESNT_EXIST; csConf.setCapacity(noQueue, 100); - rm2 = new MockRM(csConf,memStore); + rm2 = new MockRM(csConf, rm1.getRMStateStore()); rm2.start(); UserGroupInformation user2 = UserGroupInformation.createRemoteUser("user2"); @@ -622,9 +616,7 @@ public void testCapacitySchedulerRecovery() throws Exception { CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf); setupQueueConfiguration(csConf); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(csConf); - rm1 = new MockRM(csConf, memStore); + rm1 = new MockRM(csConf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); @@ -648,7 +640,7 @@ public void testCapacitySchedulerRecovery() throws Exception { csConf.set(PREFIX + "root.Default.QueueB.state", "STOPPED"); // Re-start RM - rm2 = new MockRM(csConf, memStore); + rm2 = new MockRM(csConf, rm1.getRMStateStore()); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); nm2.setResourceTrackerService(rm2.getResourceTrackerService()); @@ -783,9 +775,7 @@ public void testCapacityLeafQueueBecomesParentOnRecovery() throws Exception { CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf); setupQueueConfiguration(csConf); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(csConf); - rm1 = new MockRM(csConf, memStore); + rm1 = new MockRM(csConf); rm1.start(); MockNM nm = new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService()); @@ -798,7 +788,7 @@ public void testCapacityLeafQueueBecomesParentOnRecovery() throws Exception { getYarnApplicationState(), YarnApplicationState.RUNNING); // Take a copy of state store so that it can be reset to this state. - RMState state = memStore.loadState(); + RMState state = rm1.getRMStateStore().loadState(); // Change scheduler config with child queues added to QueueB. csConf = new CapacitySchedulerConfiguration(conf); @@ -806,7 +796,8 @@ public void testCapacityLeafQueueBecomesParentOnRecovery() throws Exception { String diags = "Application killed on recovery as it was submitted to " + "queue QueueB which is no longer a leaf queue after restart."; - verifyAppRecoveryWithWrongQueueConfig(csConf, app, diags, memStore, state); + verifyAppRecoveryWithWrongQueueConfig(csConf, app, diags, + (MemoryRMStateStore) rm1.getRMStateStore(), state); } //Test behavior of an app if queue is removed during recovery. Test case does @@ -829,9 +820,7 @@ public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf); setupQueueConfiguration(csConf); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(csConf); - rm1 = new MockRM(csConf, memStore); + rm1 = new MockRM(csConf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); @@ -860,7 +849,7 @@ public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { rm1.clearQueueMetrics(app2); // Take a copy of state store so that it can be reset to this state. - RMState state = memStore.loadState(); + RMState state = rm1.getRMStateStore().loadState(); // Set new configuration with QueueB removed. csConf = new CapacitySchedulerConfiguration(conf); @@ -868,7 +857,8 @@ public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { String diags = "Application killed on recovery as it was submitted to " + "queue QueueB which no longer exists after restart."; - verifyAppRecoveryWithWrongQueueConfig(csConf, app2, diags, memStore, state); + verifyAppRecoveryWithWrongQueueConfig(csConf, app2, diags, + (MemoryRMStateStore) rm1.getRMStateStore(), state); } private void checkParentQueue(ParentQueue parentQueue, int numContainers, @@ -883,10 +873,8 @@ private void checkParentQueue(ParentQueue parentQueue, int numContainers, // should not recover the containers that belong to the failed AM. @Test(timeout = 20000) public void testAMfailedBetweenRMRestart() throws Exception { - MemoryRMStateStore memStore = new MemoryRMStateStore(); conf.setLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0); - memStore.init(conf); - rm1 = new MockRM(conf, memStore); + rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); @@ -894,7 +882,7 @@ public void testAMfailedBetweenRMRestart() throws Exception { RMApp app1 = rm1.submitApp(200); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - rm2 = new MockRM(conf, memStore); + rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); @@ -937,9 +925,7 @@ public void testAMfailedBetweenRMRestart() throws Exception { // recover containers for completed apps. @Test(timeout = 20000) public void testContainersNotRecoveredForCompletedApps() throws Exception { - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - rm1 = new MockRM(conf, memStore); + rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); @@ -948,7 +934,7 @@ public void testContainersNotRecoveredForCompletedApps() throws Exception { MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1); - rm2 = new MockRM(conf, memStore); + rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); NMContainerStatus runningContainer = @@ -975,11 +961,9 @@ public void testContainersNotRecoveredForCompletedApps() throws Exception { @Test (timeout = 600000) public void testAppReregisterOnRMWorkPreservingRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); // start RM - rm1 = new MockRM(conf, memStore); + rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -993,7 +977,7 @@ public void testAppReregisterOnRMWorkPreservingRestart() throws Exception { am0.registerAppAttempt(); // start new RM - rm2 = new MockRM(conf, memStore); + rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); @@ -1008,9 +992,7 @@ public void testAppReregisterOnRMWorkPreservingRestart() throws Exception { @Test (timeout = 30000) public void testAMContainerStatusWithRMRestart() throws Exception { - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - rm1 = new MockRM(conf, memStore); + rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); @@ -1025,7 +1007,7 @@ public void testAMContainerStatusWithRMRestart() throws Exception { attempt0.getMasterContainer().getId()).isAMContainer()); // Re-start RM - rm2 = new MockRM(conf, memStore); + rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); @@ -1044,9 +1026,7 @@ public void testAMContainerStatusWithRMRestart() throws Exception { @Test (timeout = 20000) public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception { // start RM - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - rm1 = new MockRM(conf, memStore); + rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1056,7 +1036,7 @@ public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception { RMApp app0 = rm1.submitApp(200); MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); - rm2 = new MockRM(conf, memStore); + rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); // scheduler app/attempt is immediately available after RM is re-started. @@ -1077,9 +1057,7 @@ public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception { // container should not be recovered. @Test (timeout = 50000) public void testReleasedContainerNotRecovered() throws Exception { - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - rm1 = new MockRM(conf, memStore); + rm1 = new MockRM(conf); MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); rm1.start(); @@ -1089,7 +1067,7 @@ public void testReleasedContainerNotRecovered() throws Exception { // Re-start RM conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 8000); - rm2 = new MockRM(conf, memStore); + rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); @@ -1175,9 +1153,7 @@ public void testNewContainersNotAllocatedDuringSchedulerRecovery() throws Exception { conf.setLong( YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 4000); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - rm1 = new MockRM(conf, memStore); + rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); @@ -1186,7 +1162,7 @@ public void testNewContainersNotAllocatedDuringSchedulerRecovery() MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); // Restart RM - rm2 = new MockRM(conf, memStore); + rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); nm1.registerNode(); @@ -1229,11 +1205,8 @@ public void testNewContainersNotAllocatedDuringSchedulerRecovery() public void testRetriedFinishApplicationMasterRequest() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - // start RM - rm1 = new MockRM(conf, memStore); + rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1253,7 +1226,7 @@ public void testRetriedFinishApplicationMasterRequest() // start new RM - rm2 = new MockRM(conf, memStore); + rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); @@ -1266,9 +1239,7 @@ public void testAppFailedToRenewTokenOnRecovery() throws Exception { "kerberos"); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); UserGroupInformation.setConfiguration(conf); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - MockRM rm1 = new TestSecurityMockRM(conf, memStore); + MockRM rm1 = new TestSecurityMockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); @@ -1276,7 +1247,7 @@ public void testAppFailedToRenewTokenOnRecovery() throws Exception { RMApp app1 = rm1.submitApp(200); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - MockRM rm2 = new TestSecurityMockRM(conf, memStore) { + MockRM rm2 = new TestSecurityMockRM(conf, rm1.getRMStateStore()) { protected DelegationTokenRenewer createDelegationTokenRenewer() { return new DelegationTokenRenewer() { @Override @@ -1313,9 +1284,7 @@ public void addApplicationSync(ApplicationId applicationId, */ @Test (timeout = 30000) public void testAppFailToValidateResourceRequestOnRecovery() throws Exception{ - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - rm1 = new MockRM(conf, memStore); + rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); @@ -1328,16 +1297,14 @@ public void testAppFailToValidateResourceRequestOnRecovery() throws Exception{ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 50); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100); - rm2 = new MockRM(conf, memStore); + rm2 = new MockRM(conf, rm1.getRMStateStore()); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); rm2.start(); } @Test(timeout = 20000) public void testContainerCompleteMsgNotLostAfterAMFailedAndRMRestart() throws Exception { - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - rm1 = new MockRM(conf, memStore); + rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = @@ -1370,7 +1337,7 @@ public void testContainerCompleteMsgNotLostAfterAMFailedAndRMRestart() throws Ex MockAM am1 = MockRM.launchAndRegisterAM(app0, rm1, nm1); // rm failover - rm2 = new MockRM(conf, memStore); + rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); @@ -1439,11 +1406,9 @@ public void testAppStateSavedButAttemptStateNotSaved() throws Exception { @Test(timeout = 600000) public void testUAMRecoveryOnRMWorkPreservingRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); // start RM - rm1 = new MockRM(conf, memStore); + rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1471,7 +1436,7 @@ public void testUAMRecoveryOnRMWorkPreservingRestart() throws Exception { Assert.assertFalse(conts.isEmpty()); // start new RM - rm2 = new MockRM(conf, memStore); + rm2 = new MockRM(conf, rm1.getRMStateStore()); rm2.start(); rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); @@ -1521,7 +1486,7 @@ public void testUAMRecoveryOnRMWorkPreservingRestart() throws Exception { recoveredApp.getFinalApplicationStatus()); // Restart RM once more to check UAM is not re-run - MockRM rm3 = new MockRM(conf, memStore); + MockRM rm3 = new MockRM(conf, rm1.getRMStateStore()); rm3.start(); recoveredApp = rm3.getRMContext().getRMApps().get(app0.getApplicationId()); Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index f178884376..528afacea4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; @@ -381,9 +380,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); @@ -405,7 +402,9 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); ApplicationStateData appState = - memStore.getState().getApplicationState().get(app1.getApplicationId()); + ((MemoryRMStateStore) rm1.getRMStateStore()).getState() + .getApplicationState().get(app1.getApplicationId()); + // AM should be restarted even though max-am-attempt is 1. MockAM am2 = rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); @@ -508,9 +507,7 @@ public void testMaxAttemptOneMeansOne() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); @@ -548,10 +545,9 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception { conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); @@ -630,10 +626,9 @@ public void testRMRestartOrFailoverNotCountedForAMFailures() conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); // explicitly set max-am-retry count as 2. conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); CapacityScheduler scheduler = (CapacityScheduler) rm1.getResourceScheduler(); @@ -706,10 +701,8 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); // explicitly set max-am-retry count as 2. conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java index fdc47b9e56..f7e76bb077 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java @@ -164,9 +164,8 @@ public void testApplicationLifetimeOnRMRestart() throws Exception { true); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); @@ -235,8 +234,6 @@ public void testUpdateApplicationTimeoutForStateStoreUpdateFail() throws Exception { MockRM rm1 = null; try { - conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); - MemoryRMStateStore memStore = new MemoryRMStateStore() { private int count = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index f1adb5eb04..60b9e4bc95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -382,10 +381,8 @@ public void testMaxAllocationAfterUpdateNodeResource() throws IOException { @Test(timeout = 10000) public void testReleasedContainerIfAppAttemptisNull() throws Exception { YarnConfiguration conf=getConf(); - conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - MockRM rm1 = new MockRM(conf, memStore); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + MockRM rm1 = new MockRM(conf); try { rm1.start(); MockNM nm1 = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index fd17bd91a3..cad0151cf0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -26,7 +26,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; @@ -42,8 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; @@ -405,16 +402,11 @@ public void testRMRestartWithChangeInPriority() throws Exception { YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - RMState rmState = memStore.getState(); - Map rmAppState = rmState - .getApplicationState(); - // PHASE 1: create state in an RM // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, @@ -611,10 +603,8 @@ public void testOrderOfActivatingThePriorityApplicationOnRMRestart() YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); MockNM nm1 = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 41a7ce8186..0642cd937b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -106,7 +106,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -2965,9 +2964,7 @@ public void testSchedulerKeyGarbageCollection() throws Exception { new YarnConfiguration(new CapacitySchedulerConfiguration()); conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - MockRM rm = new MockRM(conf, memStore); + MockRM rm = new MockRM(conf); rm.start(); HashMap nodes = new HashMap<>(); @@ -3129,10 +3126,7 @@ public void testHierarchyQueuesCurrentLimits() throws Exception { new YarnConfiguration( setupQueueConfiguration(new CapacitySchedulerConfiguration())); conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); - - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 100 * GB, rm1.getResourceTrackerService()); @@ -3212,9 +3206,7 @@ public void testParentQueueMaxCapsAreRespected() throws Exception { YarnConfiguration conf = new YarnConfiguration(csConf); conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService()); @@ -3259,9 +3251,7 @@ public void testQueueHierarchyPendingResourceUpdate() throws Exception { mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - MockRM rm = new MockRM(conf, memStore) { + MockRM rm = new MockRM(conf) { protected RMNodeLabelsManager createNodeLabelManager() { return mgr; } @@ -3668,9 +3658,8 @@ public void testPendingResourceUpdatedAccordingToIncreaseRequestChanges() final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); mgr.init(conf); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - MockRM rm = new MockRM(conf, memStore) { + + MockRM rm = new MockRM(conf) { protected RMNodeLabelsManager createNodeLabelManager() { return mgr; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestWorkPreservingRMRestartForNodeLabel.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestWorkPreservingRMRestartForNodeLabel.java index 0386aabf9f..36ee68e80c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestWorkPreservingRMRestartForNodeLabel.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestWorkPreservingRMRestartForNodeLabel.java @@ -133,21 +133,17 @@ public void testWorkPreservingRestartForNodeLabel() throws Exception { mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"), NodeId.newInstance("h2", 0), toSet("y"))); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - conf = TestUtils.getConfigurationWithDefaultQueueLabels(conf); - + // inject node label manager MockRM rm1 = - new MockRM(conf, - memStore) { + new MockRM(conf) { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; } }; - + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.getRMContext().setNodeLabelManager(mgr); rm1.start(); MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 22332879f6..0d54c3322c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -5112,12 +5112,10 @@ public void testRefreshQueuesWhenRMHA() throws Exception { out.println(""); out.close(); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(conf); - + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); // 3. start a active RM - MockRM rm2 = new MockRM(conf, memStore); - rm2.init(conf); + MockRM rm2 = new MockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm2.getRMStateStore(); rm2.start(); MockNM nm = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 0190db68dc..9fb9d422a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -1006,9 +1006,8 @@ public void testRMRestartWithExpiredToken() throws Exception { Credentials credentials = new Credentials(); credentials.addToken(userText1, originalToken); - MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(yarnConf); - MockRM rm1 = new TestSecurityMockRM(yarnConf, memStore); + MockRM rm1 = new TestSecurityMockRM(yarnConf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); RMApp app = rm1.submitApp(200, "name", "user", new HashMap(), false, "default", 1, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java index 80310a503a..06c642afef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRMMemoryStateStore; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService; import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM; @@ -61,10 +62,10 @@ public void setup() { rootLogger.setLevel(Level.DEBUG); ExitUtil.disableSystemExit(); testConf = new YarnConfiguration(); + testConf + .set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); UserGroupInformation.setLoginUser(null); UserGroupInformation.setConfiguration(testConf); - testConf.set(YarnConfiguration.RM_STORE, - MemoryRMStateStore.class.getName()); } // Test the DT mast key in the state-store when the mast key is being rolled. @@ -73,7 +74,7 @@ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception { Configuration conf = new Configuration(testConf); conf.set("hadoop.security.authentication", "kerberos"); UserGroupInformation.setConfiguration(conf); - MemoryRMStateStore memStore = new MemoryRMStateStore(); + MemoryRMStateStore memStore = new MockRMMemoryStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); @@ -127,7 +128,7 @@ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception { // Test all expired keys are removed from state-store. @Test(timeout = 15000) public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception { - MemoryRMStateStore memStore = new MemoryRMStateStore(); + MemoryRMStateStore memStore = new MockRMMemoryStateStore(); memStore.init(testConf); RMState rmState = memStore.getState();