YARN-5548. Use MockRMMemoryStateStore to reduce test failures (Bibin A Chundatt via Varun Saxena)
This commit is contained in:
parent
27a1a5fde9
commit
f66fd11e51
@ -2414,10 +2414,7 @@ public class TestRMContainerAllocator {
|
||||
conf.setInt(
|
||||
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
MyResourceManager rm1 = new MyResourceManager(conf, memStore);
|
||||
MyResourceManager rm1 = new MyResourceManager(conf);
|
||||
rm1.start();
|
||||
|
||||
// Submit the application
|
||||
@ -2504,7 +2501,7 @@ public class TestRMContainerAllocator {
|
||||
assertBlacklistAdditionsAndRemovals(0, 0, rm1);
|
||||
|
||||
// Phase-2 start 2nd RM is up
|
||||
MyResourceManager rm2 = new MyResourceManager(conf, memStore);
|
||||
MyResourceManager rm2 = new MyResourceManager(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
allocator.updateSchedulerProxy(rm2);
|
||||
|
@ -102,8 +102,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
import org.apache.log4j.Level;
|
||||
@ -174,14 +172,6 @@ public class MockRM extends ResourceManager {
|
||||
disableDrainEventsImplicitly = false;
|
||||
}
|
||||
|
||||
public class MockRMMemoryStateStore extends MemoryRMStateStore {
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Override
|
||||
protected EventHandler getRMStateStoreEventHandler() {
|
||||
return rmStateStoreEventHandler;
|
||||
}
|
||||
}
|
||||
|
||||
public class MockRMNullStateStore extends NullRMStateStore {
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Override
|
||||
@ -1294,4 +1284,8 @@ public class MockRM extends ResourceManager {
|
||||
((AsyncDispatcher) getRmDispatcher()).disableExitOnDispatchException();
|
||||
}
|
||||
}
|
||||
|
||||
public RMStateStore getRMStateStore() {
|
||||
return getRMContext().getStateStore();
|
||||
}
|
||||
}
|
||||
|
@ -153,7 +153,6 @@ public class TestApplicationCleanup {
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
@Test
|
||||
public void testContainerCleanup() throws Exception {
|
||||
|
||||
@ -291,11 +290,8 @@ public class TestApplicationCleanup {
|
||||
@Test (timeout = 60000)
|
||||
public void testAppCleanupWhenRMRestartedAfterAppFinished() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// start RM
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -308,7 +304,7 @@ public class TestApplicationCleanup {
|
||||
rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
|
||||
|
||||
// start new RM
|
||||
MockRM rm2 = new MockRM(conf, memStore);
|
||||
MockRM rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
|
||||
// nm1 register to rm2, and do a heartbeat
|
||||
@ -327,11 +323,9 @@ public class TestApplicationCleanup {
|
||||
@Test(timeout = 60000)
|
||||
public void testAppCleanupWhenRMRestartedBeforeAppFinished() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// start RM
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 1024, rm1.getResourceTrackerService());
|
||||
@ -357,7 +351,7 @@ public class TestApplicationCleanup {
|
||||
}
|
||||
|
||||
// start new RM
|
||||
MockRM rm2 = new MockRM(conf, memStore);
|
||||
MockRM rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
|
||||
// nm1/nm2 register to rm2, and do a heartbeat
|
||||
@ -383,16 +377,12 @@ public class TestApplicationCleanup {
|
||||
rm2.stop();
|
||||
}
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
@Test (timeout = 60000)
|
||||
public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws
|
||||
Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// start RM
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -405,7 +395,7 @@ public class TestApplicationCleanup {
|
||||
rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
|
||||
|
||||
// start new RM
|
||||
MockRM rm2 = new MockRM(conf, memStore);
|
||||
MockRM rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
|
||||
// nm1 register to rm2, and do a heartbeat
|
||||
@ -426,11 +416,9 @@ public class TestApplicationCleanup {
|
||||
@Test (timeout = 60000)
|
||||
public void testAppCleanupWhenNMReconnects() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// start RM
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -466,11 +454,9 @@ public class TestApplicationCleanup {
|
||||
@Test(timeout = 60000)
|
||||
public void testProcessingNMContainerStatusesOnNMRestart() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// 1. Start the cluster-RM,NM,Submit app with 1024MB,Launch & register AM
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
int nmMemory = 8192;
|
||||
int amMemory = 1024;
|
||||
|
@ -138,10 +138,8 @@ public class TestContainerResourceUsage {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
MockRM rm0 = new MockRM(conf, memStore);
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
MockRM rm0 = new MockRM(conf);
|
||||
rm0.start();
|
||||
MockNM nm =
|
||||
new MockNM("127.0.0.1:1234", 65536, rm0.getResourceTrackerService());
|
||||
@ -229,7 +227,7 @@ public class TestContainerResourceUsage {
|
||||
vcoreSeconds, metricsBefore.getVcoreSeconds());
|
||||
|
||||
// create new RM to represent RM restart. Load up the state store.
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf, rm0.getRMStateStore());
|
||||
rm1.start();
|
||||
RMApp app0After =
|
||||
rm1.getRMContext().getRMApps().get(app0.getApplicationId());
|
||||
|
@ -413,11 +413,9 @@ public class TestNodeBlacklistingOnAMFailures {
|
||||
}
|
||||
|
||||
private MockRM startRM(YarnConfiguration conf) {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
MockRM rm = new MockRM(conf, memStore);
|
||||
|
||||
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
return rm;
|
||||
}
|
||||
|
@ -415,7 +415,7 @@ public class TestRMHA {
|
||||
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||
Configuration conf = new YarnConfiguration(configuration);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
|
||||
int count = 0;
|
||||
|
||||
@Override
|
||||
@ -465,7 +465,7 @@ public class TestRMHA {
|
||||
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||
Configuration conf = new YarnConfiguration(configuration);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
|
||||
@Override
|
||||
public void updateApplicationState(ApplicationStateData appState) {
|
||||
notifyStoreOperationFailed(new StoreFencedException());
|
||||
@ -530,12 +530,10 @@ public class TestRMHA {
|
||||
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
Configuration conf = new YarnConfiguration(configuration);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
|
||||
// 1. start RM
|
||||
rm = new MockRM(conf, memStore);
|
||||
rm = new MockRM(conf);
|
||||
rm.init(conf);
|
||||
rm.start();
|
||||
|
||||
@ -565,7 +563,7 @@ public class TestRMHA {
|
||||
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
|
||||
|
||||
// 3. Create new RM
|
||||
rm = new MockRM(conf, memStore) {
|
||||
rm = new MockRM(conf, rm.getRMStateStore()) {
|
||||
@Override
|
||||
protected ResourceTrackerService createResourceTrackerService() {
|
||||
return new ResourceTrackerService(this.rmContext,
|
||||
|
@ -178,24 +178,23 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
return rm;
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private MockRM createMockRM(YarnConfiguration config) {
|
||||
MockRM rm = new MockRM(config);
|
||||
rms.add(rm);
|
||||
return rm;
|
||||
}
|
||||
|
||||
@Test (timeout=180000)
|
||||
public void testRMRestart() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
// PHASE 1: create RM and get state
|
||||
MockRM rm1 = createMockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
rmState.getApplicationState();
|
||||
|
||||
|
||||
// PHASE 1: create state in an RM
|
||||
|
||||
// start RM
|
||||
MockRM rm1 = createMockRM(conf, memStore);
|
||||
|
||||
memStore.getState().getApplicationState();
|
||||
|
||||
// start like normal because state is empty
|
||||
rm1.start();
|
||||
|
||||
@ -451,14 +450,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
public void testRMRestartAppRunningAMFailed() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
rmState.getApplicationState();
|
||||
|
||||
// start RM
|
||||
MockRM rm1 = createMockRM(conf, memStore);
|
||||
// Create RM
|
||||
MockRM rm1 = createMockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
memStore.getState().getApplicationState();
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -508,14 +505,13 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
// be started immediately.
|
||||
YarnConfiguration conf = new YarnConfiguration(this.conf);
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 40);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
|
||||
// create RM
|
||||
MockRM rm1 = createMockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
rmState.getApplicationState();
|
||||
|
||||
memStore.getState().getApplicationState();
|
||||
// start RM
|
||||
final MockRM rm1 = createMockRM(conf, memStore);
|
||||
rm1.start();
|
||||
AbstractYarnScheduler ys =
|
||||
(AbstractYarnScheduler)rm1.getResourceScheduler();
|
||||
@ -674,7 +670,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
@Test (timeout = 60000)
|
||||
public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
|
||||
int count = 0;
|
||||
|
||||
@Override
|
||||
@ -727,14 +723,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
@Test (timeout = 60000)
|
||||
public void testRMRestartFailedApp() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
// create RM
|
||||
MockRM rm1 = createMockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
rmState.getApplicationState();
|
||||
|
||||
memStore.getState().getApplicationState();
|
||||
// start RM
|
||||
MockRM rm1 = createMockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -775,14 +769,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
public void testRMRestartKilledApp() throws Exception{
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
// create RM
|
||||
MockRM rm1 = createMockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
rmState.getApplicationState();
|
||||
|
||||
memStore.getState().getApplicationState();
|
||||
// start RM
|
||||
MockRM rm1 = createMockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -823,7 +815,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testRMRestartKilledAppWithNoAttempts() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
|
||||
@Override
|
||||
public synchronized void storeApplicationAttemptStateInternal(
|
||||
ApplicationAttemptId attemptId,
|
||||
@ -865,14 +857,13 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
public void testRMRestartSucceededApp() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
// PHASE 1: create RM and get state
|
||||
MockRM rm1 = createMockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
rmState.getApplicationState();
|
||||
memStore.getState().getApplicationState();
|
||||
|
||||
// start RM
|
||||
MockRM rm1 = createMockRM(conf, memStore);
|
||||
// start like normal because state is empty
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -913,11 +904,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
@Test (timeout = 60000)
|
||||
public void testRMRestartGetApplicationList() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// start RM
|
||||
MockRM rm1 = new MockRM(conf, memStore) {
|
||||
MockRM rm1 = new MockRM(conf) {
|
||||
@Override
|
||||
protected SystemMetricsPublisher createSystemMetricsPublisher() {
|
||||
return spy(super.createSystemMetricsPublisher());
|
||||
@ -956,7 +944,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
.appCreated(any(RMApp.class), anyLong());
|
||||
// restart rm
|
||||
|
||||
MockRM rm2 = new MockRM(conf, memStore) {
|
||||
MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()) {
|
||||
@Override
|
||||
protected RMAppManager createRMAppManager() {
|
||||
return spy(super.createRMAppManager());
|
||||
@ -1077,13 +1065,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
|
||||
// create RM
|
||||
MockRM rm1 = createMockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
rmState.getApplicationState();
|
||||
MockRM rm1 = createMockRM(conf, memStore);
|
||||
memStore.getState().getApplicationState();
|
||||
// start RM
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -1146,16 +1133,15 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
public void testRMRestartTimelineCollectorContext() throws Exception {
|
||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
rmState.getApplicationState();
|
||||
|
||||
MockRM rm1 = null;
|
||||
MockRM rm2 = null;
|
||||
try {
|
||||
rm1 = createMockRM(conf, memStore);
|
||||
rm1 = createMockRM(conf);
|
||||
rm1.start();
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
memStore.getState().getApplicationState();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
@ -1212,13 +1198,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
|
||||
// create RM
|
||||
MockRM rm1 = new TestSecurityMockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
rmState.getApplicationState();
|
||||
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
|
||||
memStore.getState().getApplicationState();
|
||||
// start RM
|
||||
rm1.start();
|
||||
|
||||
HashSet<Token<RMDelegationTokenIdentifier>> tokenSet =
|
||||
@ -1307,13 +1292,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
|
||||
// create RM
|
||||
MockRM rm1 = new TestSecurityMockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
rmState.getApplicationState();
|
||||
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
|
||||
memStore.getState().getApplicationState();
|
||||
// start RM
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("0.0.0.0:4321", 15120, rm1.getResourceTrackerService());
|
||||
@ -1388,8 +1372,10 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
"kerberos");
|
||||
conf.set(YarnConfiguration.RM_ADDRESS, "localhost:8032");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
MockRM rm1 = new TestSecurityMockRM(conf);
|
||||
rm1.start();
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
RMState rmState = memStore.getState();
|
||||
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
@ -1399,10 +1385,6 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
Set<DelegationKey> rmDTMasterKeyState =
|
||||
rmState.getRMDTSecretManagerState().getMasterKeyState();
|
||||
|
||||
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
|
||||
|
||||
rm1.start();
|
||||
|
||||
// create an empty credential
|
||||
Credentials ts = new Credentials();
|
||||
|
||||
@ -1537,10 +1519,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
"kerberos");
|
||||
conf.set(YarnConfiguration.RM_ADDRESS, "localhost:8032");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
|
||||
MockRM rm1 = new TestSecurityMockRM(conf);
|
||||
rm1.start();
|
||||
|
||||
GetDelegationTokenRequest request1 =
|
||||
@ -1553,7 +1533,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
ConverterUtils.convertFromYarn(response1.getRMDelegationToken(), rmAddr);
|
||||
|
||||
// start new RM
|
||||
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
|
||||
MockRM rm2 = new TestSecurityMockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
|
||||
// submit an app with the old delegation token got from previous RM.
|
||||
@ -1631,14 +1611,13 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testFinishedAppRemovalAfterRMRestart() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
|
||||
// start RM
|
||||
MockRM rm1 = createMockRM(conf, memStore);
|
||||
MockRM rm1 = createMockRM(conf);
|
||||
rm1.start();
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
RMState rmState = memStore.getState();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
@ -1676,7 +1655,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
// This is to test RM does not get hang on shutdown.
|
||||
@Test (timeout = 10000)
|
||||
public void testRMShutdown() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
|
||||
@Override
|
||||
public synchronized void checkVersion()
|
||||
throws Exception {
|
||||
@ -1742,10 +1721,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
MockRM rm1 = new TestSecurityMockRM(conf, memStore) {
|
||||
MockRM rm1 = new TestSecurityMockRM(conf) {
|
||||
class TestDelegationTokenRenewer extends DelegationTokenRenewer {
|
||||
public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
|
||||
boolean shouldCancelAtEnd, String user, Configuration appConf) {
|
||||
@ -1758,6 +1734,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
}
|
||||
};
|
||||
rm1.start();
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
RMApp app1 = null;
|
||||
try {
|
||||
app1 = rm1.submitApp(200, "name", "user",
|
||||
@ -1781,7 +1758,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
|
||||
@Test (timeout = 20000)
|
||||
public void testAppRecoveredInOrderOnRMRestart() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
for (int i = 10; i > 0; i--) {
|
||||
@ -1836,12 +1813,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
public void testQueueMetricsOnRMRestart() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// PHASE 1: create state in an RM
|
||||
// start RM
|
||||
MockRM rm1 = createMockRM(conf, memStore);
|
||||
MockRM rm1 = createMockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -1879,7 +1852,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
|
||||
// PHASE 2: create new RM and start from old state
|
||||
// create new RM to represent restart and recover state
|
||||
MockRM rm2 = createMockRM(conf, memStore);
|
||||
MockRM rm2 = createMockRM(conf, rm1.getRMStateStore());
|
||||
QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics();
|
||||
resetQueueMetrics(qm2);
|
||||
assertQueueMetrics(qm2, 0, 0, 0, 0);
|
||||
@ -1960,7 +1933,6 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testDecomissionedNMsMetricsOnRMRestart() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||
hostFile.getAbsolutePath());
|
||||
writeToHostsFile("");
|
||||
@ -2039,11 +2011,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// start RM
|
||||
MockRM rm1 = createMockRM(conf, memStore);
|
||||
MockRM rm1 = createMockRM(conf);
|
||||
rm1.start();
|
||||
final MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -2051,7 +2021,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
RMApp app0 = rm1.submitApp(200);
|
||||
final MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
|
||||
|
||||
MockRM rm2 = new MockRM(conf, memStore) {
|
||||
MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()) {
|
||||
@Override
|
||||
protected ResourceTrackerService createResourceTrackerService() {
|
||||
return new ResourceTrackerService(this.rmContext,
|
||||
@ -2158,6 +2128,10 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
super(conf, store);
|
||||
}
|
||||
|
||||
public TestSecurityMockRM(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Configuration conf) {
|
||||
// reset localServiceAddress.
|
||||
@ -2208,10 +2182,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
|
||||
nodeLabelFsStoreDirURI);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||
MockRM rm1 = new MockRM(conf, memStore) {
|
||||
MockRM rm1 = new MockRM(conf) {
|
||||
@Override
|
||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
||||
@ -2261,7 +2233,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
Assert.assertEquals(1, nodeLabelManager.getNodeLabels().size());
|
||||
Assert.assertTrue(nodeLabels.get(n1).equals(toSet("y")));
|
||||
|
||||
MockRM rm2 = new MockRM(conf, memStore) {
|
||||
MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()) {
|
||||
@Override
|
||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
||||
@ -2290,14 +2262,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
int maxAttempt =
|
||||
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
// create RM
|
||||
MockRM rm1 = createMockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
rmState.getApplicationState();
|
||||
|
||||
memStore.getState().getApplicationState();
|
||||
// start RM
|
||||
MockRM rm1 = createMockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -2365,10 +2335,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
|
||||
nodeLabelFsStoreDirURI);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||
MockRM rm1 = new MockRM(conf, memStore) {
|
||||
MockRM rm1 = new MockRM(conf) {
|
||||
@Override
|
||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
||||
@ -2396,7 +2364,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
nodeLabelManager.replaceLabelsOnNode(ImmutableMap.of(n1, toSet("x")));
|
||||
MockRM rm2 = null;
|
||||
for (int i = 0; i < 2; i++) {
|
||||
rm2 = new MockRM(conf, memStore) {
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore()) {
|
||||
@Override
|
||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
||||
@ -2419,15 +2387,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
|
||||
@Test(timeout = 120000)
|
||||
public void testRMRestartAfterPreemption() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||
if (!getSchedulerType().equals(SchedulerType.CAPACITY)) {
|
||||
return;
|
||||
}
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
// start RM
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||
|
||||
@ -2466,7 +2431,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
MockRM rm2 = null;
|
||||
// start RM2
|
||||
try {
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
Assert.assertTrue("RM start successfully", true);
|
||||
} catch (Exception e) {
|
||||
@ -2480,11 +2445,10 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
@Test(timeout = 60000)
|
||||
public void testRMRestartOnMissingAttempts() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// create RM
|
||||
MockRM rm1 = createMockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
// start RM
|
||||
MockRM rm1 = createMockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -2540,13 +2504,10 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testRMRestartAfterNodeLabelDisabled() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||
|
||||
MockRM rm1 = new MockRM(
|
||||
TestUtils.getConfigurationWithDefaultQueueLabels(conf), memStore) {
|
||||
TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
|
||||
@Override
|
||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
||||
@ -2580,7 +2541,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||
// restart rm with node label disabled
|
||||
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, false);
|
||||
MockRM rm2 = new MockRM(
|
||||
TestUtils.getConfigurationWithDefaultQueueLabels(conf), memStore) {
|
||||
TestUtils.getConfigurationWithDefaultQueueLabels(conf),
|
||||
rm1.getRMStateStore()) {
|
||||
@Override
|
||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
||||
|
@ -148,9 +148,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
int containerMemory = 1024;
|
||||
Resource containerResource = Resource.newInstance(containerMemory, 1);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
@ -162,7 +160,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
rm1.clearQueueMetrics(app1);
|
||||
|
||||
// Re-start RM
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
// recover app
|
||||
@ -296,9 +294,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
int containerMemory = 1024;
|
||||
Resource containerResource = Resource.newInstance(containerMemory, 1);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(schedulerConf);
|
||||
rm1 = new MockRM(schedulerConf, memStore);
|
||||
rm1 = new MockRM(schedulerConf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
@ -316,7 +312,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
rm1.clearQueueMetrics(app1);
|
||||
|
||||
// 3. Fail over (restart) RM.
|
||||
rm2 = new MockRM(schedulerConf, memStore);
|
||||
rm2 = new MockRM(schedulerConf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
// 4. Validate app is recovered post failover.
|
||||
@ -570,9 +566,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
public void testRMRestartWithRemovedQueue() throws Exception{
|
||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "");
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
@ -585,7 +579,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{QUEUE_DOESNT_EXIST});
|
||||
final String noQueue = CapacitySchedulerConfiguration.ROOT + "." + QUEUE_DOESNT_EXIST;
|
||||
csConf.setCapacity(noQueue, 100);
|
||||
rm2 = new MockRM(csConf,memStore);
|
||||
rm2 = new MockRM(csConf, rm1.getRMStateStore());
|
||||
|
||||
rm2.start();
|
||||
UserGroupInformation user2 = UserGroupInformation.createRemoteUser("user2");
|
||||
@ -622,9 +616,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration(conf);
|
||||
setupQueueConfiguration(csConf);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(csConf);
|
||||
rm1 = new MockRM(csConf, memStore);
|
||||
rm1 = new MockRM(csConf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
@ -648,7 +640,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
csConf.set(PREFIX + "root.Default.QueueB.state", "STOPPED");
|
||||
|
||||
// Re-start RM
|
||||
rm2 = new MockRM(csConf, memStore);
|
||||
rm2 = new MockRM(csConf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
nm2.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
@ -783,9 +775,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration(conf);
|
||||
setupQueueConfiguration(csConf);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(csConf);
|
||||
rm1 = new MockRM(csConf, memStore);
|
||||
rm1 = new MockRM(csConf);
|
||||
rm1.start();
|
||||
MockNM nm =
|
||||
new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
|
||||
@ -798,7 +788,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
getYarnApplicationState(), YarnApplicationState.RUNNING);
|
||||
|
||||
// Take a copy of state store so that it can be reset to this state.
|
||||
RMState state = memStore.loadState();
|
||||
RMState state = rm1.getRMStateStore().loadState();
|
||||
|
||||
// Change scheduler config with child queues added to QueueB.
|
||||
csConf = new CapacitySchedulerConfiguration(conf);
|
||||
@ -806,7 +796,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
|
||||
String diags = "Application killed on recovery as it was submitted to " +
|
||||
"queue QueueB which is no longer a leaf queue after restart.";
|
||||
verifyAppRecoveryWithWrongQueueConfig(csConf, app, diags, memStore, state);
|
||||
verifyAppRecoveryWithWrongQueueConfig(csConf, app, diags,
|
||||
(MemoryRMStateStore) rm1.getRMStateStore(), state);
|
||||
}
|
||||
|
||||
//Test behavior of an app if queue is removed during recovery. Test case does
|
||||
@ -829,9 +820,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration(conf);
|
||||
setupQueueConfiguration(csConf);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(csConf);
|
||||
rm1 = new MockRM(csConf, memStore);
|
||||
rm1 = new MockRM(csConf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
@ -860,7 +849,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
rm1.clearQueueMetrics(app2);
|
||||
|
||||
// Take a copy of state store so that it can be reset to this state.
|
||||
RMState state = memStore.loadState();
|
||||
RMState state = rm1.getRMStateStore().loadState();
|
||||
|
||||
// Set new configuration with QueueB removed.
|
||||
csConf = new CapacitySchedulerConfiguration(conf);
|
||||
@ -868,7 +857,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
|
||||
String diags = "Application killed on recovery as it was submitted to " +
|
||||
"queue QueueB which no longer exists after restart.";
|
||||
verifyAppRecoveryWithWrongQueueConfig(csConf, app2, diags, memStore, state);
|
||||
verifyAppRecoveryWithWrongQueueConfig(csConf, app2, diags,
|
||||
(MemoryRMStateStore) rm1.getRMStateStore(), state);
|
||||
}
|
||||
|
||||
private void checkParentQueue(ParentQueue parentQueue, int numContainers,
|
||||
@ -883,10 +873,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
// should not recover the containers that belong to the failed AM.
|
||||
@Test(timeout = 20000)
|
||||
public void testAMfailedBetweenRMRestart() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
conf.setLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0);
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
@ -894,7 +882,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
|
||||
@ -937,9 +925,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
// recover containers for completed apps.
|
||||
@Test(timeout = 20000)
|
||||
public void testContainersNotRecoveredForCompletedApps() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
@ -948,7 +934,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1);
|
||||
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
NMContainerStatus runningContainer =
|
||||
@ -975,11 +961,9 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
@Test (timeout = 600000)
|
||||
public void testAppReregisterOnRMWorkPreservingRestart() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// start RM
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -993,7 +977,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
am0.registerAppAttempt();
|
||||
|
||||
// start new RM
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
|
||||
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||
@ -1008,9 +992,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testAMContainerStatusWithRMRestart() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
@ -1025,7 +1007,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
attempt0.getMasterContainer().getId()).isAMContainer());
|
||||
|
||||
// Re-start RM
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
|
||||
@ -1044,9 +1026,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
@Test (timeout = 20000)
|
||||
public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception {
|
||||
// start RM
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -1056,7 +1036,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
RMApp app0 = rm1.submitApp(200);
|
||||
MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
|
||||
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
// scheduler app/attempt is immediately available after RM is re-started.
|
||||
@ -1077,9 +1057,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
// container should not be recovered.
|
||||
@Test (timeout = 50000)
|
||||
public void testReleasedContainerNotRecovered() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1 = new MockRM(conf);
|
||||
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
rm1.start();
|
||||
@ -1089,7 +1067,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
|
||||
// Re-start RM
|
||||
conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 8000);
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
@ -1175,9 +1153,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
throws Exception {
|
||||
conf.setLong(
|
||||
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 4000);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
@ -1186,7 +1162,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
// Restart RM
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
@ -1229,11 +1205,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
public void testRetriedFinishApplicationMasterRequest()
|
||||
throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// start RM
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -1253,7 +1226,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
|
||||
|
||||
// start new RM
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
|
||||
am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
||||
@ -1266,9 +1239,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
"kerberos");
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
|
||||
MockRM rm1 = new TestSecurityMockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
@ -1276,7 +1247,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
MockRM rm2 = new TestSecurityMockRM(conf, memStore) {
|
||||
MockRM rm2 = new TestSecurityMockRM(conf, rm1.getRMStateStore()) {
|
||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||
return new DelegationTokenRenewer() {
|
||||
@Override
|
||||
@ -1313,9 +1284,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
*/
|
||||
@Test (timeout = 30000)
|
||||
public void testAppFailToValidateResourceRequestOnRecovery() throws Exception{
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
@ -1328,16 +1297,14 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 50);
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100);
|
||||
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
rm2.start();
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
public void testContainerCompleteMsgNotLostAfterAMFailedAndRMRestart() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
|
||||
MockNM nm1 =
|
||||
@ -1370,7 +1337,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
|
||||
|
||||
// rm failover
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
|
||||
@ -1439,11 +1406,9 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
@Test(timeout = 600000)
|
||||
public void testUAMRecoveryOnRMWorkPreservingRestart() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// start RM
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
@ -1471,7 +1436,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
Assert.assertFalse(conts.isEmpty());
|
||||
|
||||
// start new RM
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
|
||||
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||
@ -1521,7 +1486,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||
recoveredApp.getFinalApplicationStatus());
|
||||
|
||||
// Restart RM once more to check UAM is not re-run
|
||||
MockRM rm3 = new MockRM(conf, memStore);
|
||||
MockRM rm3 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm3.start();
|
||||
recoveredApp = rm3.getRMContext().getRMApps().get(app0.getApplicationId());
|
||||
Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState());
|
||||
|
@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
@ -381,9 +380,7 @@ public class TestAMRestart {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||
@ -405,7 +402,9 @@ public class TestAMRestart {
|
||||
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
ApplicationStateData appState =
|
||||
memStore.getState().getApplicationState().get(app1.getApplicationId());
|
||||
((MemoryRMStateStore) rm1.getRMStateStore()).getState()
|
||||
.getApplicationState().get(app1.getApplicationId());
|
||||
|
||||
// AM should be restarted even though max-am-attempt is 1.
|
||||
MockAM am2 =
|
||||
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
|
||||
@ -508,9 +507,7 @@ public class TestAMRestart {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||
@ -548,10 +545,9 @@ public class TestAMRestart {
|
||||
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||
@ -630,10 +626,9 @@ public class TestAMRestart {
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
// explicitly set max-am-retry count as 2.
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
rm1.start();
|
||||
CapacityScheduler scheduler =
|
||||
(CapacityScheduler) rm1.getResourceScheduler();
|
||||
@ -706,10 +701,8 @@ public class TestAMRestart {
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
// explicitly set max-am-retry count as 2.
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||
|
@ -164,9 +164,8 @@ public class TestApplicationLifetimeMonitor {
|
||||
true);
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
@ -235,8 +234,6 @@ public class TestApplicationLifetimeMonitor {
|
||||
throws Exception {
|
||||
MockRM rm1 = null;
|
||||
try {
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||
private int count = 0;
|
||||
|
||||
|
@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
@ -382,10 +381,8 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
||||
@Test(timeout = 10000)
|
||||
public void testReleasedContainerIfAppAttemptisNull() throws Exception {
|
||||
YarnConfiguration conf=getConf();
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
try {
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
|
@ -26,7 +26,6 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
@ -42,8 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
@ -405,16 +402,11 @@ public class TestApplicationPriority {
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState = rmState
|
||||
.getApplicationState();
|
||||
|
||||
// PHASE 1: create state in an RM
|
||||
|
||||
// start RM
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
rm1.start();
|
||||
|
||||
MockNM nm1 = new MockNM("127.0.0.1:1234", 15120,
|
||||
@ -611,10 +603,8 @@ public class TestApplicationPriority {
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
rm1.start();
|
||||
|
||||
MockNM nm1 =
|
||||
|
@ -106,7 +106,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
|
||||
@ -2965,9 +2964,7 @@ public class TestCapacityScheduler {
|
||||
new YarnConfiguration(new CapacitySchedulerConfiguration());
|
||||
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
MockRM rm = new MockRM(conf, memStore);
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
|
||||
HashMap<NodeId, MockNM> nodes = new HashMap<>();
|
||||
@ -3129,10 +3126,7 @@ public class TestCapacityScheduler {
|
||||
new YarnConfiguration(
|
||||
setupQueueConfiguration(new CapacitySchedulerConfiguration()));
|
||||
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 100 * GB, rm1.getResourceTrackerService());
|
||||
@ -3212,9 +3206,7 @@ public class TestCapacityScheduler {
|
||||
YarnConfiguration conf = new YarnConfiguration(csConf);
|
||||
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService());
|
||||
@ -3259,9 +3251,7 @@ public class TestCapacityScheduler {
|
||||
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
|
||||
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
MockRM rm = new MockRM(conf, memStore) {
|
||||
MockRM rm = new MockRM(conf) {
|
||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||
return mgr;
|
||||
}
|
||||
@ -3668,9 +3658,8 @@ public class TestCapacityScheduler {
|
||||
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
|
||||
mgr.init(conf);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
MockRM rm = new MockRM(conf, memStore) {
|
||||
|
||||
MockRM rm = new MockRM(conf) {
|
||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||
return mgr;
|
||||
}
|
||||
|
@ -133,21 +133,17 @@ public class TestWorkPreservingRMRestartForNodeLabel {
|
||||
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
|
||||
NodeId.newInstance("h2", 0), toSet("y")));
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
conf = TestUtils.getConfigurationWithDefaultQueueLabels(conf);
|
||||
|
||||
|
||||
// inject node label manager
|
||||
MockRM rm1 =
|
||||
new MockRM(conf,
|
||||
memStore) {
|
||||
new MockRM(conf) {
|
||||
@Override
|
||||
public RMNodeLabelsManager createNodeLabelManager() {
|
||||
return mgr;
|
||||
}
|
||||
};
|
||||
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||
rm1.start();
|
||||
MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
|
||||
|
@ -5112,12 +5112,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
// 3. start a active RM
|
||||
MockRM rm2 = new MockRM(conf, memStore);
|
||||
rm2.init(conf);
|
||||
MockRM rm2 = new MockRM(conf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm2.getRMStateStore();
|
||||
rm2.start();
|
||||
|
||||
MockNM nm =
|
||||
|
@ -1006,9 +1006,8 @@ public class TestDelegationTokenRenewer {
|
||||
Credentials credentials = new Credentials();
|
||||
credentials.addToken(userText1, originalToken);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(yarnConf);
|
||||
MockRM rm1 = new TestSecurityMockRM(yarnConf, memStore);
|
||||
MockRM rm1 = new TestSecurityMockRM(yarnConf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
rm1.start();
|
||||
RMApp app = rm1.submitApp(200, "name", "user",
|
||||
new HashMap<ApplicationAccessType, String>(), false, "default", 1,
|
||||
|
@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRMMemoryStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
||||
@ -61,10 +62,10 @@ public class TestRMDelegationTokens {
|
||||
rootLogger.setLevel(Level.DEBUG);
|
||||
ExitUtil.disableSystemExit();
|
||||
testConf = new YarnConfiguration();
|
||||
testConf
|
||||
.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
UserGroupInformation.setLoginUser(null);
|
||||
UserGroupInformation.setConfiguration(testConf);
|
||||
testConf.set(YarnConfiguration.RM_STORE,
|
||||
MemoryRMStateStore.class.getName());
|
||||
}
|
||||
|
||||
// Test the DT mast key in the state-store when the mast key is being rolled.
|
||||
@ -73,7 +74,7 @@ public class TestRMDelegationTokens {
|
||||
Configuration conf = new Configuration(testConf);
|
||||
conf.set("hadoop.security.authentication", "kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
|
||||
@ -127,7 +128,7 @@ public class TestRMDelegationTokens {
|
||||
// Test all expired keys are removed from state-store.
|
||||
@Test(timeout = 15000)
|
||||
public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore();
|
||||
memStore.init(testConf);
|
||||
RMState rmState = memStore.getState();
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user