YARN-65. Reduce RM app memory footprint once app has completed. Contributed by Manikandan R.
This commit is contained in:
parent
d08b8c801a
commit
06e5a7b5cf
@ -1459,7 +1459,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||||||
app.rmContext.getSystemMetricsPublisher()
|
app.rmContext.getSystemMetricsPublisher()
|
||||||
.appFinished(app, finalState, app.finishTime);
|
.appFinished(app, finalState, app.finishTime);
|
||||||
// set the memory free
|
// set the memory free
|
||||||
app.submissionContext.getAMContainerSpec().setTokensConf(null);
|
app.clearUnusedFields();
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2021,4 +2021,13 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||||||
public void setApplicationPriority(Priority applicationPriority) {
|
public void setApplicationPriority(Priority applicationPriority) {
|
||||||
this.applicationPriority = applicationPriority;
|
this.applicationPriority = applicationPriority;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear Unused fields to free memory.
|
||||||
|
* @param app
|
||||||
|
*/
|
||||||
|
private void clearUnusedFields() {
|
||||||
|
this.submissionContext.setAMContainerSpec(null);
|
||||||
|
this.submissionContext.setLogAggregationContext(null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,130 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test helper for MemoryRMStateStore will make sure the event.
|
||||||
|
*/
|
||||||
|
public class MockMemoryRMStateStore extends MemoryRMStateStore {
|
||||||
|
|
||||||
|
private Map<ApplicationId, ApplicationSubmissionContext> appSubCtxtCopy =
|
||||||
|
new HashMap<ApplicationId, ApplicationSubmissionContext>();
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
@Override
|
||||||
|
protected EventHandler getRMStateStoreEventHandler() {
|
||||||
|
return rmStateStoreEventHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized RMState loadState() throws Exception {
|
||||||
|
|
||||||
|
RMState cloneState = super.loadState();
|
||||||
|
|
||||||
|
for(Entry<ApplicationId, ApplicationStateData> state :
|
||||||
|
cloneState.getApplicationState().entrySet()) {
|
||||||
|
ApplicationStateData oldStateData = state.getValue();
|
||||||
|
oldStateData.setApplicationSubmissionContext(
|
||||||
|
this.appSubCtxtCopy.get(state.getKey()));
|
||||||
|
cloneState.getApplicationState().put(state.getKey(), oldStateData);
|
||||||
|
}
|
||||||
|
return cloneState;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void storeApplicationStateInternal(
|
||||||
|
ApplicationId appId, ApplicationStateData appState)
|
||||||
|
throws Exception {
|
||||||
|
// Clone Application Submission Context
|
||||||
|
this.cloneAppSubmissionContext(appState);
|
||||||
|
super.storeApplicationStateInternal(appId, appState);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void updateApplicationStateInternal(
|
||||||
|
ApplicationId appId, ApplicationStateData appState)
|
||||||
|
throws Exception {
|
||||||
|
// Clone Application Submission Context
|
||||||
|
this.cloneAppSubmissionContext(appState);
|
||||||
|
super.updateApplicationStateInternal(appId, appState);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clone Application Submission Context and Store in Map for
|
||||||
|
* later use.
|
||||||
|
*
|
||||||
|
* @param appState
|
||||||
|
*/
|
||||||
|
private void cloneAppSubmissionContext(ApplicationStateData appState) {
|
||||||
|
ApplicationSubmissionContext oldAppSubCtxt =
|
||||||
|
appState.getApplicationSubmissionContext();
|
||||||
|
ApplicationSubmissionContext context =
|
||||||
|
ApplicationSubmissionContext.newInstance(
|
||||||
|
oldAppSubCtxt.getApplicationId(),
|
||||||
|
oldAppSubCtxt.getApplicationName(),
|
||||||
|
oldAppSubCtxt.getQueue(),
|
||||||
|
oldAppSubCtxt.getPriority(),
|
||||||
|
oldAppSubCtxt.getAMContainerSpec(),
|
||||||
|
oldAppSubCtxt.getUnmanagedAM(),
|
||||||
|
oldAppSubCtxt.getCancelTokensWhenComplete(),
|
||||||
|
oldAppSubCtxt.getMaxAppAttempts(),
|
||||||
|
oldAppSubCtxt.getResource()
|
||||||
|
);
|
||||||
|
context.setAttemptFailuresValidityInterval(
|
||||||
|
oldAppSubCtxt.getAttemptFailuresValidityInterval());
|
||||||
|
context.setKeepContainersAcrossApplicationAttempts(
|
||||||
|
oldAppSubCtxt.getKeepContainersAcrossApplicationAttempts());
|
||||||
|
context.setAMContainerResourceRequests(
|
||||||
|
oldAppSubCtxt.getAMContainerResourceRequests());
|
||||||
|
context.setLogAggregationContext(oldAppSubCtxt.getLogAggregationContext());
|
||||||
|
context.setApplicationType(oldAppSubCtxt.getApplicationType());
|
||||||
|
this.appSubCtxtCopy.put(oldAppSubCtxt.getApplicationId(), context);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Traverse each app state and replace cloned app sub context
|
||||||
|
* into the state.
|
||||||
|
*
|
||||||
|
* @param actualState
|
||||||
|
* @return actualState
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public RMState reloadStateWithClonedAppSubCtxt(RMState actualState) {
|
||||||
|
for(Entry<ApplicationId, ApplicationStateData> state :
|
||||||
|
actualState.getApplicationState().entrySet()) {
|
||||||
|
ApplicationStateData oldStateData = state.getValue();
|
||||||
|
oldStateData.setApplicationSubmissionContext(
|
||||||
|
this.appSubCtxtCopy.get(state.getKey()));
|
||||||
|
actualState.getApplicationState().put(state.getKey(),
|
||||||
|
oldStateData);
|
||||||
|
}
|
||||||
|
return actualState;
|
||||||
|
}
|
||||||
|
}
|
@ -164,7 +164,7 @@ public class MockRM extends ResourceManager {
|
|||||||
} else {
|
} else {
|
||||||
Class storeClass = getRMContext().getStateStore().getClass();
|
Class storeClass = getRMContext().getStateStore().getClass();
|
||||||
if (storeClass.equals(MemoryRMStateStore.class)) {
|
if (storeClass.equals(MemoryRMStateStore.class)) {
|
||||||
MockRMMemoryStateStore mockStateStore = new MockRMMemoryStateStore();
|
MockMemoryRMStateStore mockStateStore = new MockMemoryRMStateStore();
|
||||||
mockStateStore.init(conf);
|
mockStateStore.init(conf);
|
||||||
setRMStateStore(mockStateStore);
|
setRMStateStore(mockStateStore);
|
||||||
} else if (storeClass.equals(NullRMStateStore.class)) {
|
} else if (storeClass.equals(NullRMStateStore.class)) {
|
||||||
|
@ -1,32 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test helper for MemoryRMStateStore will make sure the event.
|
|
||||||
*/
|
|
||||||
public class MockRMMemoryStateStore extends MemoryRMStateStore {
|
|
||||||
@SuppressWarnings("rawtypes")
|
|
||||||
@Override
|
|
||||||
protected EventHandler getRMStateStoreEventHandler() {
|
|
||||||
return rmStateStoreEventHandler;
|
|
||||||
}
|
|
||||||
}
|
|
@ -293,6 +293,8 @@ public class TestApplicationCleanup {
|
|||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = new MockRM(conf);
|
MockRM rm1 = new MockRM(conf);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
@ -304,7 +306,7 @@ public class TestApplicationCleanup {
|
|||||||
rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
|
rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
|
||||||
|
|
||||||
// start new RM
|
// start new RM
|
||||||
MockRM rm2 = new MockRM(conf, rm1.getRMStateStore());
|
MockRM rm2 = new MockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
|
||||||
// nm1 register to rm2, and do a heartbeat
|
// nm1 register to rm2, and do a heartbeat
|
||||||
|
@ -143,6 +143,8 @@ public class TestContainerResourceUsage {
|
|||||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
MockRM rm0 = new MockRM(conf);
|
MockRM rm0 = new MockRM(conf);
|
||||||
rm0.start();
|
rm0.start();
|
||||||
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm0.getRMStateStore();
|
||||||
MockNM nm =
|
MockNM nm =
|
||||||
new MockNM("127.0.0.1:1234", 65536, rm0.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 65536, rm0.getResourceTrackerService());
|
||||||
nm.registerNode();
|
nm.registerNode();
|
||||||
@ -229,7 +231,7 @@ public class TestContainerResourceUsage {
|
|||||||
vcoreSeconds, metricsBefore.getVcoreSeconds());
|
vcoreSeconds, metricsBefore.getVcoreSeconds());
|
||||||
|
|
||||||
// create new RM to represent RM restart. Load up the state store.
|
// create new RM to represent RM restart. Load up the state store.
|
||||||
MockRM rm1 = new MockRM(conf, rm0.getRMStateStore());
|
MockRM rm1 = new MockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
RMApp app0After =
|
RMApp app0After =
|
||||||
rm1.getRMContext().getRMApps().get(app0.getApplicationId());
|
rm1.getRMContext().getRMApps().get(app0.getApplicationId());
|
||||||
|
@ -415,7 +415,7 @@ public class TestRMHA {
|
|||||||
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||||
Configuration conf = new YarnConfiguration(configuration);
|
Configuration conf = new YarnConfiguration(configuration);
|
||||||
|
|
||||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
|
MemoryRMStateStore memStore = new MockMemoryRMStateStore() {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -465,7 +465,7 @@ public class TestRMHA {
|
|||||||
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||||
Configuration conf = new YarnConfiguration(configuration);
|
Configuration conf = new YarnConfiguration(configuration);
|
||||||
|
|
||||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
|
MemoryRMStateStore memStore = new MockMemoryRMStateStore() {
|
||||||
@Override
|
@Override
|
||||||
public void updateApplicationState(ApplicationStateData appState) {
|
public void updateApplicationState(ApplicationStateData appState) {
|
||||||
notifyStoreOperationFailed(new StoreFencedException());
|
notifyStoreOperationFailed(new StoreFencedException());
|
||||||
|
@ -199,7 +199,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
|
|
||||||
// PHASE 1: create RM and get state
|
// PHASE 1: create RM and get state
|
||||||
MockRM rm1 = createMockRM(conf);
|
MockRM rm1 = createMockRM(conf);
|
||||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||||
memStore.getState().getApplicationState();
|
memStore.getState().getApplicationState();
|
||||||
|
|
||||||
@ -679,7 +680,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception {
|
public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception {
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
|
MemoryRMStateStore memStore = new MockMemoryRMStateStore() {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -734,7 +735,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
// create RM
|
// create RM
|
||||||
MockRM rm1 = createMockRM(conf);
|
MockRM rm1 = createMockRM(conf);
|
||||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||||
memStore.getState().getApplicationState();
|
memStore.getState().getApplicationState();
|
||||||
// start RM
|
// start RM
|
||||||
@ -780,7 +782,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||||
// create RM
|
// create RM
|
||||||
MockRM rm1 = createMockRM(conf);
|
MockRM rm1 = createMockRM(conf);
|
||||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||||
memStore.getState().getApplicationState();
|
memStore.getState().getApplicationState();
|
||||||
// start RM
|
// start RM
|
||||||
@ -824,18 +827,18 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
public void testRMRestartKilledAppWithNoAttempts() throws Exception {
|
public void testRMRestartKilledAppWithNoAttempts() throws Exception {
|
||||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
|
MockMemoryRMStateStore memStore = new MockMemoryRMStateStore() {
|
||||||
@Override
|
@Override
|
||||||
public synchronized void storeApplicationAttemptStateInternal(
|
public synchronized void storeApplicationAttemptStateInternal(
|
||||||
ApplicationAttemptId attemptId,
|
ApplicationAttemptId appAttemptId,
|
||||||
ApplicationAttemptStateData attemptStateData) throws Exception {
|
ApplicationAttemptStateData attemptState) throws Exception {
|
||||||
// ignore attempt saving request.
|
// ignore attempt saving request.
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void updateApplicationAttemptStateInternal(
|
public synchronized void updateApplicationAttemptStateInternal(
|
||||||
ApplicationAttemptId attemptId,
|
ApplicationAttemptId appAttemptId,
|
||||||
ApplicationAttemptStateData attemptStateData) throws Exception {
|
ApplicationAttemptStateData attemptState) throws Exception {
|
||||||
// ignore attempt saving request.
|
// ignore attempt saving request.
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -868,7 +871,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||||
// PHASE 1: create RM and get state
|
// PHASE 1: create RM and get state
|
||||||
MockRM rm1 = createMockRM(conf);
|
MockRM rm1 = createMockRM(conf);
|
||||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||||
memStore.getState().getApplicationState();
|
memStore.getState().getApplicationState();
|
||||||
|
|
||||||
@ -926,6 +930,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
|
|
||||||
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
|
|
||||||
// a succeeded app.
|
// a succeeded app.
|
||||||
RMApp app0 = rm1.submitApp(200, "name", "user", null,
|
RMApp app0 = rm1.submitApp(200, "name", "user", null,
|
||||||
false, "default", 1, null, "myType");
|
false, "default", 1, null, "myType");
|
||||||
@ -953,7 +960,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
.appCreated(any(RMApp.class), anyLong());
|
.appCreated(any(RMApp.class), anyLong());
|
||||||
// restart rm
|
// restart rm
|
||||||
|
|
||||||
MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()) {
|
MockRM rm2 = new MockRM(conf, memStore) {
|
||||||
@Override
|
@Override
|
||||||
protected RMAppManager createRMAppManager() {
|
protected RMAppManager createRMAppManager() {
|
||||||
return spy(super.createRMAppManager());
|
return spy(super.createRMAppManager());
|
||||||
@ -1625,7 +1632,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = createMockRM(conf);
|
MockRM rm1 = createMockRM(conf);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
RMState rmState = memStore.getState();
|
RMState rmState = memStore.getState();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
@ -1664,7 +1672,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
// This is to test RM does not get hang on shutdown.
|
// This is to test RM does not get hang on shutdown.
|
||||||
@Test (timeout = 10000)
|
@Test (timeout = 10000)
|
||||||
public void testRMShutdown() throws Exception {
|
public void testRMShutdown() throws Exception {
|
||||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
|
MemoryRMStateStore memStore = new MockMemoryRMStateStore() {
|
||||||
@Override
|
@Override
|
||||||
public synchronized void checkVersion()
|
public synchronized void checkVersion()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
@ -1743,7 +1751,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
RMApp app1 = null;
|
RMApp app1 = null;
|
||||||
try {
|
try {
|
||||||
app1 = rm1.submitApp(200, "name", "user",
|
app1 = rm1.submitApp(200, "name", "user",
|
||||||
@ -1767,7 +1776,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
|
|
||||||
@Test (timeout = 20000)
|
@Test (timeout = 20000)
|
||||||
public void testAppRecoveredInOrderOnRMRestart() throws Exception {
|
public void testAppRecoveredInOrderOnRMRestart() throws Exception {
|
||||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore();
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
|
|
||||||
for (int i = 10; i > 0; i--) {
|
for (int i = 10; i > 0; i--) {
|
||||||
@ -2405,6 +2414,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
MockRM rm1 = new MockRM(conf);
|
MockRM rm1 = new MockRM(conf);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
|
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
@ -2441,7 +2452,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
MockRM rm2 = null;
|
MockRM rm2 = null;
|
||||||
// start RM2
|
// start RM2
|
||||||
try {
|
try {
|
||||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
rm2 = new MockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
Assert.assertTrue("RM start successfully", true);
|
Assert.assertTrue("RM start successfully", true);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -2542,6 +2553,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
|
|
||||||
// add node label "x" and set node to label mapping
|
// add node label "x" and set node to label mapping
|
||||||
Set<String> clusterNodeLabels = new HashSet<String>();
|
Set<String> clusterNodeLabels = new HashSet<String>();
|
||||||
@ -2568,7 +2581,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||||||
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, false);
|
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, false);
|
||||||
MockRM rm2 = new MockRM(
|
MockRM rm2 = new MockRM(
|
||||||
TestUtils.getConfigurationWithDefaultQueueLabels(conf),
|
TestUtils.getConfigurationWithDefaultQueueLabels(conf),
|
||||||
rm1.getRMStateStore()) {
|
memStore) {
|
||||||
@Override
|
@Override
|
||||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
||||||
|
@ -572,6 +572,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "");
|
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "");
|
||||||
rm1 = new MockRM(conf);
|
rm1 = new MockRM(conf);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
@ -583,7 +585,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{QUEUE_DOESNT_EXIST});
|
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{QUEUE_DOESNT_EXIST});
|
||||||
final String noQueue = CapacitySchedulerConfiguration.ROOT + "." + QUEUE_DOESNT_EXIST;
|
final String noQueue = CapacitySchedulerConfiguration.ROOT + "." + QUEUE_DOESNT_EXIST;
|
||||||
csConf.setCapacity(noQueue, 100);
|
csConf.setCapacity(noQueue, 100);
|
||||||
rm2 = new MockRM(csConf, rm1.getRMStateStore());
|
rm2 = new MockRM(csConf, memStore);
|
||||||
|
|
||||||
rm2.start();
|
rm2.start();
|
||||||
UserGroupInformation user2 = UserGroupInformation.createRemoteUser("user2");
|
UserGroupInformation user2 = UserGroupInformation.createRemoteUser("user2");
|
||||||
@ -721,11 +723,15 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
|
|
||||||
private void verifyAppRecoveryWithWrongQueueConfig(
|
private void verifyAppRecoveryWithWrongQueueConfig(
|
||||||
CapacitySchedulerConfiguration csConf, RMApp app, String diagnostics,
|
CapacitySchedulerConfiguration csConf, RMApp app, String diagnostics,
|
||||||
MemoryRMStateStore memStore, RMState state) throws Exception {
|
MockMemoryRMStateStore memStore, RMState state) throws Exception {
|
||||||
// Restart RM with fail-fast as false. App should be killed.
|
// Restart RM with fail-fast as false. App should be killed.
|
||||||
csConf.setBoolean(YarnConfiguration.RM_FAIL_FAST, false);
|
csConf.setBoolean(YarnConfiguration.RM_FAIL_FAST, false);
|
||||||
rm2 = new MockRM(csConf, memStore);
|
rm2 = new MockRM(csConf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
|
||||||
|
MockMemoryRMStateStore memStore2 =
|
||||||
|
(MockMemoryRMStateStore) rm2.getRMStateStore();
|
||||||
|
|
||||||
// Wait for app to be killed.
|
// Wait for app to be killed.
|
||||||
rm2.waitForState(app.getApplicationId(), RMAppState.KILLED);
|
rm2.waitForState(app.getApplicationId(), RMAppState.KILLED);
|
||||||
ApplicationReport report = rm2.getApplicationReport(app.getApplicationId());
|
ApplicationReport report = rm2.getApplicationReport(app.getApplicationId());
|
||||||
@ -734,24 +740,27 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
assertEquals(report.getYarnApplicationState(), YarnApplicationState.KILLED);
|
assertEquals(report.getYarnApplicationState(), YarnApplicationState.KILLED);
|
||||||
assertEquals(report.getDiagnostics(), diagnostics);
|
assertEquals(report.getDiagnostics(), diagnostics);
|
||||||
|
|
||||||
|
//Reload previous state with cloned app sub context object
|
||||||
|
RMState newState = memStore2.reloadStateWithClonedAppSubCtxt(state);
|
||||||
|
|
||||||
// Remove updated app info(app being KILLED) from state store and reinstate
|
// Remove updated app info(app being KILLED) from state store and reinstate
|
||||||
// state store to previous state i.e. which indicates app is RUNNING.
|
// state store to previous state i.e. which indicates app is RUNNING.
|
||||||
// This is to simulate app recovery with fail fast config as true.
|
// This is to simulate app recovery with fail fast config as true.
|
||||||
for(Map.Entry<ApplicationId, ApplicationStateData> entry :
|
for(Map.Entry<ApplicationId, ApplicationStateData> entry :
|
||||||
state.getApplicationState().entrySet()) {
|
newState.getApplicationState().entrySet()) {
|
||||||
ApplicationStateData appState = mock(ApplicationStateData.class);
|
ApplicationStateData appState = mock(ApplicationStateData.class);
|
||||||
ApplicationSubmissionContext ctxt =
|
ApplicationSubmissionContext ctxt =
|
||||||
mock(ApplicationSubmissionContext.class);
|
mock(ApplicationSubmissionContext.class);
|
||||||
when(appState.getApplicationSubmissionContext()).thenReturn(ctxt);
|
when(appState.getApplicationSubmissionContext()).thenReturn(ctxt);
|
||||||
when(ctxt.getApplicationId()).thenReturn(entry.getKey());
|
when(ctxt.getApplicationId()).thenReturn(entry.getKey());
|
||||||
memStore.removeApplicationStateInternal(appState);
|
memStore2.removeApplicationStateInternal(appState);
|
||||||
memStore.storeApplicationStateInternal(
|
memStore2.storeApplicationStateInternal(
|
||||||
entry.getKey(), entry.getValue());
|
entry.getKey(), entry.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now restart RM with fail-fast as true. QueueException should be thrown.
|
// Now restart RM with fail-fast as true. QueueException should be thrown.
|
||||||
csConf.setBoolean(YarnConfiguration.RM_FAIL_FAST, true);
|
csConf.setBoolean(YarnConfiguration.RM_FAIL_FAST, true);
|
||||||
MockRM rm = new MockRM(csConf, memStore);
|
MockRM rm = new MockRM(csConf, memStore2);
|
||||||
try {
|
try {
|
||||||
rm.start();
|
rm.start();
|
||||||
Assert.fail("QueueException must have been thrown");
|
Assert.fail("QueueException must have been thrown");
|
||||||
@ -781,6 +790,9 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
setupQueueConfiguration(csConf);
|
setupQueueConfiguration(csConf);
|
||||||
rm1 = new MockRM(csConf);
|
rm1 = new MockRM(csConf);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
|
||||||
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
MockNM nm =
|
MockNM nm =
|
||||||
new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
|
new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
|
||||||
nm.registerNode();
|
nm.registerNode();
|
||||||
@ -801,7 +813,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
String diags = "Application killed on recovery as it was submitted to " +
|
String diags = "Application killed on recovery as it was submitted to " +
|
||||||
"queue QueueB which is no longer a leaf queue after restart.";
|
"queue QueueB which is no longer a leaf queue after restart.";
|
||||||
verifyAppRecoveryWithWrongQueueConfig(csConf, app, diags,
|
verifyAppRecoveryWithWrongQueueConfig(csConf, app, diags,
|
||||||
(MemoryRMStateStore) rm1.getRMStateStore(), state);
|
memStore, state);
|
||||||
}
|
}
|
||||||
|
|
||||||
//Test behavior of an app if queue is removed during recovery. Test case does
|
//Test behavior of an app if queue is removed during recovery. Test case does
|
||||||
@ -826,6 +838,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
setupQueueConfiguration(csConf);
|
setupQueueConfiguration(csConf);
|
||||||
rm1 = new MockRM(csConf);
|
rm1 = new MockRM(csConf);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||||
MockNM nm2 =
|
MockNM nm2 =
|
||||||
@ -853,7 +867,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
rm1.clearQueueMetrics(app2);
|
rm1.clearQueueMetrics(app2);
|
||||||
|
|
||||||
// Take a copy of state store so that it can be reset to this state.
|
// Take a copy of state store so that it can be reset to this state.
|
||||||
RMState state = rm1.getRMStateStore().loadState();
|
RMState state = memStore.loadState();
|
||||||
|
|
||||||
// Set new configuration with QueueB removed.
|
// Set new configuration with QueueB removed.
|
||||||
csConf = new CapacitySchedulerConfiguration(conf);
|
csConf = new CapacitySchedulerConfiguration(conf);
|
||||||
@ -862,7 +876,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
String diags = "Application killed on recovery as it was submitted to " +
|
String diags = "Application killed on recovery as it was submitted to " +
|
||||||
"queue QueueB which no longer exists after restart.";
|
"queue QueueB which no longer exists after restart.";
|
||||||
verifyAppRecoveryWithWrongQueueConfig(csConf, app2, diags,
|
verifyAppRecoveryWithWrongQueueConfig(csConf, app2, diags,
|
||||||
(MemoryRMStateStore) rm1.getRMStateStore(), state);
|
memStore, state);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkParentQueue(ParentQueue parentQueue, int numContainers,
|
private void checkParentQueue(ParentQueue parentQueue, int numContainers,
|
||||||
@ -931,6 +945,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
public void testContainersNotRecoveredForCompletedApps() throws Exception {
|
public void testContainersNotRecoveredForCompletedApps() throws Exception {
|
||||||
rm1 = new MockRM(conf);
|
rm1 = new MockRM(conf);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
@ -938,7 +954,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1);
|
MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1);
|
||||||
|
|
||||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
rm2 = new MockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
NMContainerStatus runningContainer =
|
NMContainerStatus runningContainer =
|
||||||
@ -1212,6 +1228,9 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
// start RM
|
// start RM
|
||||||
rm1 = new MockRM(conf);
|
rm1 = new MockRM(conf);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
|
||||||
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
@ -1230,7 +1249,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
|
|
||||||
|
|
||||||
// start new RM
|
// start new RM
|
||||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
rm2 = new MockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
|
||||||
am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
||||||
@ -1370,7 +1389,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
// RM should start correctly.
|
// RM should start correctly.
|
||||||
@Test (timeout = 20000)
|
@Test (timeout = 20000)
|
||||||
public void testAppStateSavedButAttemptStateNotSaved() throws Exception {
|
public void testAppStateSavedButAttemptStateNotSaved() throws Exception {
|
||||||
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
MockMemoryRMStateStore memStore = new MockMemoryRMStateStore() {
|
||||||
@Override public synchronized void updateApplicationAttemptStateInternal(
|
@Override public synchronized void updateApplicationAttemptStateInternal(
|
||||||
ApplicationAttemptId appAttemptId,
|
ApplicationAttemptId appAttemptId,
|
||||||
ApplicationAttemptStateData attemptState) {
|
ApplicationAttemptStateData attemptState) {
|
||||||
@ -1414,6 +1433,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
// start RM
|
// start RM
|
||||||
rm1 = new MockRM(conf);
|
rm1 = new MockRM(conf);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
@ -1438,8 +1459,10 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
}
|
}
|
||||||
|
|
||||||
// start new RM
|
// start new RM
|
||||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
rm2 = new MockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
MockMemoryRMStateStore memStore2 =
|
||||||
|
(MockMemoryRMStateStore) rm2.getRMStateStore();
|
||||||
rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
|
rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
|
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||||
|
|
||||||
@ -1488,7 +1511,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||||||
recoveredApp.getFinalApplicationStatus());
|
recoveredApp.getFinalApplicationStatus());
|
||||||
|
|
||||||
// Restart RM once more to check UAM is not re-run
|
// Restart RM once more to check UAM is not re-run
|
||||||
MockRM rm3 = new MockRM(conf, rm1.getRMStateStore());
|
MockRM rm3 = new MockRM(conf, memStore2);
|
||||||
rm3.start();
|
rm3.start();
|
||||||
recoveredApp = rm3.getRMContext().getRMApps().get(app0.getApplicationId());
|
recoveredApp = rm3.getRMContext().getRMApps().get(app0.getApplicationId());
|
||||||
Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState());
|
Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState());
|
||||||
|
@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockMemoryRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
@ -702,8 +703,11 @@ public class TestAMRestart {
|
|||||||
// explicitly set max-am-retry count as 2.
|
// explicitly set max-am-retry count as 2.
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
MockRM rm1 = new MockRM(conf);
|
MockRM rm1 = new MockRM(conf);
|
||||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
|
||||||
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
|
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
@ -735,7 +739,6 @@ public class TestAMRestart {
|
|||||||
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000, false);
|
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000, false);
|
||||||
app1.setSystemClock(clock);
|
app1.setSystemClock(clock);
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
// Fail attempt1 normally
|
// Fail attempt1 normally
|
||||||
nm1.nodeHeartbeat(am1.getApplicationAttemptId(),
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(),
|
||||||
1, ContainerState.COMPLETE);
|
1, ContainerState.COMPLETE);
|
||||||
@ -771,8 +774,12 @@ public class TestAMRestart {
|
|||||||
@SuppressWarnings("resource")
|
@SuppressWarnings("resource")
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = new MockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
|
||||||
|
MockMemoryRMStateStore memStore1 =
|
||||||
|
(MockMemoryRMStateStore) rm2.getRMStateStore();
|
||||||
ApplicationStateData app1State =
|
ApplicationStateData app1State =
|
||||||
memStore.getState().getApplicationState().get(app1.getApplicationId());
|
memStore1.getState().getApplicationState().
|
||||||
|
get(app1.getApplicationId());
|
||||||
Assert.assertEquals(1, app1State.getFirstAttemptId());
|
Assert.assertEquals(1, app1State.getFirstAttemptId());
|
||||||
|
|
||||||
// re-register the NM
|
// re-register the NM
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyLong;
|
import static org.mockito.Matchers.anyLong;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
@ -28,17 +29,23 @@ import static org.mockito.Mockito.spy;
|
|||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
@ -46,14 +53,22 @@ import org.apache.hadoop.security.SecurityUtil;
|
|||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
import org.apache.hadoop.yarn.MockApps;
|
import org.apache.hadoop.yarn.MockApps;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
@ -251,7 +266,113 @@ public class TestRMAppTransitions {
|
|||||||
rmDispatcher.start();
|
rmDispatcher.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) {
|
private ByteBuffer getTokens() throws IOException {
|
||||||
|
Credentials ts = new Credentials();
|
||||||
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
ts.writeTokenStorageToStream(dob);
|
||||||
|
ByteBuffer securityTokens =
|
||||||
|
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||||
|
return securityTokens;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ByteBuffer getTokensConf() throws IOException {
|
||||||
|
|
||||||
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
Configuration appConf = new Configuration(false);
|
||||||
|
appConf.clear();
|
||||||
|
appConf.set("dfs.nameservices", "mycluster1,mycluster2");
|
||||||
|
appConf.set("dfs.namenode.rpc-address.mycluster2.nn1",
|
||||||
|
"123.0.0.1");
|
||||||
|
appConf.set("dfs.namenode.rpc-address.mycluster3.nn2",
|
||||||
|
"123.0.0.2");
|
||||||
|
appConf.write(dob);
|
||||||
|
ByteBuffer tokenConf =
|
||||||
|
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||||
|
return tokenConf;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, LocalResource> getLocalResources()
|
||||||
|
throws UnsupportedFileSystemException {
|
||||||
|
FileContext localFS = FileContext.getLocalFSFileContext();
|
||||||
|
File tmpDir = new File("target");
|
||||||
|
File scriptFile = new File(tmpDir, "scriptFile.sh");
|
||||||
|
URL resourceURL =
|
||||||
|
URL.fromPath(localFS
|
||||||
|
.makeQualified(new Path(scriptFile.getAbsolutePath())));
|
||||||
|
LocalResource localRes =
|
||||||
|
Records.newRecord(LocalResource.class);
|
||||||
|
localRes.setResource(resourceURL);
|
||||||
|
localRes.setSize(-1);
|
||||||
|
localRes.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||||
|
localRes.setType(LocalResourceType.FILE);
|
||||||
|
localRes.setTimestamp(scriptFile.lastModified());
|
||||||
|
String destinationFile = "dest_file";
|
||||||
|
Map<String, LocalResource> localResources =
|
||||||
|
new HashMap<String, LocalResource>();
|
||||||
|
localResources.put(destinationFile, localRes);
|
||||||
|
return localResources;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, String> getEnvironment() {
|
||||||
|
Map<String, String> userSetEnv = new HashMap<String, String>();
|
||||||
|
userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id");
|
||||||
|
userSetEnv.put(Environment.NM_HOST.name(), "user_set_NM_HOST");
|
||||||
|
userSetEnv.put(Environment.NM_PORT.name(), "user_set_NM_PORT");
|
||||||
|
userSetEnv.put(Environment.NM_HTTP_PORT.name(), "user_set_NM_HTTP_PORT");
|
||||||
|
userSetEnv.put(Environment.LOCAL_DIRS.name(), "user_set_LOCAL_DIR");
|
||||||
|
userSetEnv.put(Environment.USER.key(), "user_set_" +
|
||||||
|
Environment.USER.key());
|
||||||
|
userSetEnv.put(Environment.LOGNAME.name(), "user_set_LOGNAME");
|
||||||
|
userSetEnv.put(Environment.PWD.name(), "user_set_PWD");
|
||||||
|
userSetEnv.put(Environment.HOME.name(), "user_set_HOME");
|
||||||
|
return userSetEnv;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ContainerRetryContext getContainerRetryContext() {
|
||||||
|
ContainerRetryContext containerRetryContext = ContainerRetryContext
|
||||||
|
.newInstance(
|
||||||
|
ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES,
|
||||||
|
new HashSet<>(Arrays.asList(Integer.valueOf(111))), 0, 0);
|
||||||
|
return containerRetryContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, ByteBuffer> getServiceData() {
|
||||||
|
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
||||||
|
String serviceName = "non_exist_auxService";
|
||||||
|
serviceData.put(serviceName, ByteBuffer.wrap(serviceName.getBytes()));
|
||||||
|
return serviceData;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ContainerLaunchContext prepareContainerLaunchContext()
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
ContainerLaunchContext clc =
|
||||||
|
Records.newRecord(ContainerLaunchContext.class);
|
||||||
|
clc.setCommands(Arrays.asList("/bin/sleep 5"));
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
clc.setTokens(getTokens());
|
||||||
|
clc.setTokensConf(getTokensConf());
|
||||||
|
}
|
||||||
|
clc.setLocalResources(getLocalResources());
|
||||||
|
clc.setEnvironment(getEnvironment());
|
||||||
|
clc.setContainerRetryContext(getContainerRetryContext());
|
||||||
|
clc.setServiceData(getServiceData());
|
||||||
|
return clc;
|
||||||
|
}
|
||||||
|
|
||||||
|
private LogAggregationContext getLogAggregationContext() {
|
||||||
|
LogAggregationContext logAggregationContext =
|
||||||
|
LogAggregationContext.newInstance(
|
||||||
|
"includePattern", "excludePattern",
|
||||||
|
"rolledLogsIncludePattern",
|
||||||
|
"rolledLogsExcludePattern",
|
||||||
|
"policyClass",
|
||||||
|
"policyParameters");
|
||||||
|
return logAggregationContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RMApp createNewTestApp(ApplicationSubmissionContext
|
||||||
|
submissionContext) throws IOException {
|
||||||
ApplicationId applicationId = MockApps.newAppID(appId++);
|
ApplicationId applicationId = MockApps.newAppID(appId++);
|
||||||
String user = MockApps.newUserName();
|
String user = MockApps.newUserName();
|
||||||
String name = MockApps.newAppName();
|
String name = MockApps.newAppName();
|
||||||
@ -270,7 +391,9 @@ public class TestRMAppTransitions {
|
|||||||
// but applicationId is still set for safety
|
// but applicationId is still set for safety
|
||||||
submissionContext.setApplicationId(applicationId);
|
submissionContext.setApplicationId(applicationId);
|
||||||
submissionContext.setPriority(Priority.newInstance(0));
|
submissionContext.setPriority(Priority.newInstance(0));
|
||||||
submissionContext.setAMContainerSpec(mock(ContainerLaunchContext.class));
|
submissionContext.setAMContainerSpec(prepareContainerLaunchContext());
|
||||||
|
submissionContext.setLogAggregationContext(getLogAggregationContext());
|
||||||
|
|
||||||
RMApp application = new RMAppImpl(applicationId, rmContext, conf, name,
|
RMApp application = new RMAppImpl(applicationId, rmContext, conf, name,
|
||||||
user, queue, submissionContext, scheduler, masterService,
|
user, queue, submissionContext, scheduler, masterService,
|
||||||
System.currentTimeMillis(), "YARN", null,
|
System.currentTimeMillis(), "YARN", null,
|
||||||
@ -405,6 +528,7 @@ public class TestRMAppTransitions {
|
|||||||
// verify sendATSCreateEvent() is get called during
|
// verify sendATSCreateEvent() is get called during
|
||||||
// AddApplicationToSchedulerTransition.
|
// AddApplicationToSchedulerTransition.
|
||||||
verify(publisher).appCreated(eq(application), anyLong());
|
verify(publisher).appCreated(eq(application), anyLong());
|
||||||
|
verifyRMAppFieldsForNonFinalTransitions(application);
|
||||||
return application;
|
return application;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -422,6 +546,7 @@ public class TestRMAppTransitions {
|
|||||||
application.handle(event);
|
application.handle(event);
|
||||||
assertStartTimeSet(application);
|
assertStartTimeSet(application);
|
||||||
assertAppState(RMAppState.SUBMITTED, application);
|
assertAppState(RMAppState.SUBMITTED, application);
|
||||||
|
verifyRMAppFieldsForNonFinalTransitions(application);
|
||||||
return application;
|
return application;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -530,6 +655,7 @@ public class TestRMAppTransitions {
|
|||||||
assertFailed(application,
|
assertFailed(application,
|
||||||
".*Unmanaged application.*Failing the application.*");
|
".*Unmanaged application.*Failing the application.*");
|
||||||
assertAppFinalStateSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -539,6 +665,7 @@ public class TestRMAppTransitions {
|
|||||||
RMApp application = testCreateAppFinished(null, diagMsg);
|
RMApp application = testCreateAppFinished(null, diagMsg);
|
||||||
Assert.assertTrue("Finished application missing diagnostics",
|
Assert.assertTrue("Finished application missing diagnostics",
|
||||||
application.getDiagnostics().indexOf(diagMsg) != -1);
|
application.getDiagnostics().indexOf(diagMsg) != -1);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
@ -546,15 +673,7 @@ public class TestRMAppTransitions {
|
|||||||
LOG.info("--- START: testAppRecoverPath ---");
|
LOG.info("--- START: testAppRecoverPath ---");
|
||||||
ApplicationSubmissionContext sub =
|
ApplicationSubmissionContext sub =
|
||||||
Records.newRecord(ApplicationSubmissionContext.class);
|
Records.newRecord(ApplicationSubmissionContext.class);
|
||||||
ContainerLaunchContext clc =
|
sub.setAMContainerSpec(prepareContainerLaunchContext());
|
||||||
Records.newRecord(ContainerLaunchContext.class);
|
|
||||||
Credentials credentials = new Credentials();
|
|
||||||
DataOutputBuffer dob = new DataOutputBuffer();
|
|
||||||
credentials.writeTokenStorageToStream(dob);
|
|
||||||
ByteBuffer securityTokens =
|
|
||||||
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
|
||||||
clc.setTokens(securityTokens);
|
|
||||||
sub.setAMContainerSpec(clc);
|
|
||||||
testCreateAppSubmittedRecovery(sub);
|
testCreateAppSubmittedRecovery(sub);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -577,6 +696,7 @@ public class TestRMAppTransitions {
|
|||||||
assertAppFinalStateNotSaved(application);
|
assertAppFinalStateNotSaved(application);
|
||||||
verifyApplicationFinished(RMAppState.KILLED);
|
verifyApplicationFinished(RMAppState.KILLED);
|
||||||
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -594,6 +714,7 @@ public class TestRMAppTransitions {
|
|||||||
assertFailed(application, rejectedText);
|
assertFailed(application, rejectedText);
|
||||||
assertAppFinalStateSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
verifyApplicationFinished(RMAppState.FAILED);
|
verifyApplicationFinished(RMAppState.FAILED);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
@ -611,6 +732,7 @@ public class TestRMAppTransitions {
|
|||||||
assertFailed(application, rejectedText);
|
assertFailed(application, rejectedText);
|
||||||
assertAppFinalStateSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
verifyApplicationFinished(RMAppState.FAILED);
|
verifyApplicationFinished(RMAppState.FAILED);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
rmContext.getStateStore().removeApplication(application);
|
rmContext.getStateStore().removeApplication(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -633,6 +755,7 @@ public class TestRMAppTransitions {
|
|||||||
assertKilled(application);
|
assertKilled(application);
|
||||||
verifyApplicationFinished(RMAppState.KILLED);
|
verifyApplicationFinished(RMAppState.KILLED);
|
||||||
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
@ -650,6 +773,7 @@ public class TestRMAppTransitions {
|
|||||||
assertFailed(application, rejectedText);
|
assertFailed(application, rejectedText);
|
||||||
assertAppFinalStateSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
verifyApplicationFinished(RMAppState.FAILED);
|
verifyApplicationFinished(RMAppState.FAILED);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
@ -684,6 +808,7 @@ public class TestRMAppTransitions {
|
|||||||
assertFailed(application, rejectedText);
|
assertFailed(application, rejectedText);
|
||||||
assertAppFinalStateSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
verifyApplicationFinished(RMAppState.FAILED);
|
verifyApplicationFinished(RMAppState.FAILED);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -706,6 +831,7 @@ public class TestRMAppTransitions {
|
|||||||
assertAppFinalStateSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
verifyApplicationFinished(RMAppState.KILLED);
|
verifyApplicationFinished(RMAppState.KILLED);
|
||||||
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -769,6 +895,7 @@ public class TestRMAppTransitions {
|
|||||||
assertAppFinalStateSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
verifyApplicationFinished(RMAppState.KILLED);
|
verifyApplicationFinished(RMAppState.KILLED);
|
||||||
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -816,6 +943,7 @@ public class TestRMAppTransitions {
|
|||||||
assertKilled(application);
|
assertKilled(application);
|
||||||
verifyApplicationFinished(RMAppState.KILLED);
|
verifyApplicationFinished(RMAppState.KILLED);
|
||||||
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -873,6 +1001,7 @@ public class TestRMAppTransitions {
|
|||||||
assertFailed(application, ".*Failing the application.*");
|
assertFailed(application, ".*Failing the application.*");
|
||||||
assertAppFinalStateSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
verifyApplicationFinished(RMAppState.FAILED);
|
verifyApplicationFinished(RMAppState.FAILED);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -914,6 +1043,7 @@ public class TestRMAppTransitions {
|
|||||||
assertFinalAppStatus(FinalApplicationStatus.FAILED, application);
|
assertFinalAppStatus(FinalApplicationStatus.FAILED, application);
|
||||||
Assert.assertTrue("Finished app missing diagnostics", application
|
Assert.assertTrue("Finished app missing diagnostics", application
|
||||||
.getDiagnostics().indexOf(diagMsg) != -1);
|
.getDiagnostics().indexOf(diagMsg) != -1);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -933,6 +1063,7 @@ public class TestRMAppTransitions {
|
|||||||
Assert.assertEquals("application diagnostics is not correct",
|
Assert.assertEquals("application diagnostics is not correct",
|
||||||
"", diag.toString());
|
"", diag.toString());
|
||||||
verifyApplicationFinished(RMAppState.FINISHED);
|
verifyApplicationFinished(RMAppState.FINISHED);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
@ -962,6 +1093,7 @@ public class TestRMAppTransitions {
|
|||||||
|
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.FAILED, application);
|
assertAppState(RMAppState.FAILED, application);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
@ -1016,6 +1148,7 @@ public class TestRMAppTransitions {
|
|||||||
|
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.KILLED, application);
|
assertAppState(RMAppState.KILLED, application);
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 30000)
|
@Test(timeout = 30000)
|
||||||
@ -1061,11 +1194,12 @@ public class TestRMAppTransitions {
|
|||||||
RMAppState finalState = appState.getState();
|
RMAppState finalState = appState.getState();
|
||||||
Assert.assertEquals("Application is not in finalState.", finalState,
|
Assert.assertEquals("Application is not in finalState.", finalState,
|
||||||
application.getState());
|
application.getState());
|
||||||
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void createRMStateForApplications(
|
public void createRMStateForApplications(
|
||||||
Map<ApplicationId, ApplicationStateData> applicationState,
|
Map<ApplicationId, ApplicationStateData> applicationState,
|
||||||
RMAppState rmAppState) {
|
RMAppState rmAppState) throws IOException {
|
||||||
RMApp app = createNewTestApp(null);
|
RMApp app = createNewTestApp(null);
|
||||||
ApplicationStateData appState =
|
ApplicationStateData appState =
|
||||||
ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(),
|
ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(),
|
||||||
@ -1075,7 +1209,7 @@ public class TestRMAppTransitions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetAppReport() {
|
public void testGetAppReport() throws IOException {
|
||||||
RMApp app = createNewTestApp(null);
|
RMApp app = createNewTestApp(null);
|
||||||
assertAppState(RMAppState.NEW, app);
|
assertAppState(RMAppState.NEW, app);
|
||||||
ApplicationReport report = app.createAndGetApplicationReport(null, true);
|
ApplicationReport report = app.createAndGetApplicationReport(null, true);
|
||||||
@ -1109,4 +1243,41 @@ public class TestRMAppTransitions {
|
|||||||
Assert.assertEquals(finalState, appRemovedEvent.getFinalState());
|
Assert.assertEquals(finalState, appRemovedEvent.getFinalState());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyRMAppFieldsForNonFinalTransitions(RMApp application)
|
||||||
|
throws IOException {
|
||||||
|
assertEquals(Arrays.asList("/bin/sleep 5"),
|
||||||
|
application.getApplicationSubmissionContext().
|
||||||
|
getAMContainerSpec().getCommands());
|
||||||
|
assertEquals(getLocalResources(),
|
||||||
|
application.getApplicationSubmissionContext().
|
||||||
|
getAMContainerSpec().getLocalResources());
|
||||||
|
if(UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
assertEquals(getTokens(),
|
||||||
|
application.getApplicationSubmissionContext().
|
||||||
|
getAMContainerSpec().getTokens());
|
||||||
|
assertEquals(getTokensConf(),
|
||||||
|
application.getApplicationSubmissionContext().
|
||||||
|
getAMContainerSpec().getTokensConf());
|
||||||
|
}
|
||||||
|
assertEquals(getEnvironment(),
|
||||||
|
application.getApplicationSubmissionContext().
|
||||||
|
getAMContainerSpec().getEnvironment());
|
||||||
|
assertEquals(getContainerRetryContext(),
|
||||||
|
application.getApplicationSubmissionContext().
|
||||||
|
getAMContainerSpec().getContainerRetryContext());
|
||||||
|
assertEquals(getServiceData(),
|
||||||
|
application.getApplicationSubmissionContext().
|
||||||
|
getAMContainerSpec().getServiceData());
|
||||||
|
assertEquals(getLogAggregationContext(),
|
||||||
|
application.getApplicationSubmissionContext().
|
||||||
|
getLogAggregationContext());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyRMAppFieldsForFinalTransitions(RMApp application) {
|
||||||
|
assertEquals(null, application.getApplicationSubmissionContext().
|
||||||
|
getAMContainerSpec());
|
||||||
|
assertEquals(null, application.getApplicationSubmissionContext().
|
||||||
|
getLogAggregationContext());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
|||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRMMemoryStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockMemoryRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
||||||
@ -78,7 +78,7 @@ public class TestRMDelegationTokens {
|
|||||||
UserGroupInformation.getLoginUser()
|
UserGroupInformation.getLoginUser()
|
||||||
.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
|
.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
|
||||||
|
|
||||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore();
|
MemoryRMStateStore memStore = new MockMemoryRMStateStore();
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
RMState rmState = memStore.getState();
|
RMState rmState = memStore.getState();
|
||||||
|
|
||||||
@ -132,7 +132,7 @@ public class TestRMDelegationTokens {
|
|||||||
// Test all expired keys are removed from state-store.
|
// Test all expired keys are removed from state-store.
|
||||||
@Test(timeout = 15000)
|
@Test(timeout = 15000)
|
||||||
public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception {
|
public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception {
|
||||||
MemoryRMStateStore memStore = new MockRMMemoryStateStore();
|
MemoryRMStateStore memStore = new MockMemoryRMStateStore();
|
||||||
memStore.init(testConf);
|
memStore.init(testConf);
|
||||||
RMState rmState = memStore.getState();
|
RMState rmState = memStore.getState();
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user