YARN-2010. Handle app-recovery failures gracefully. (Jian He and Karthik Kambatla via kasha)
This commit is contained in:
parent
d78191a716
commit
b2cd269802
@ -856,6 +856,9 @@ Release 2.6.0 - UNRELEASED
|
||||
of races between the launch and the stop-container call and when root
|
||||
processes crash. (Billie Rinaldi via vinodkv)
|
||||
|
||||
YARN-2010. Handle app-recovery failures gracefully.
|
||||
(Jian He and Karthik Kambatla via kasha)
|
||||
|
||||
Release 2.5.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -47,6 +47,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRecoverEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
@ -274,12 +275,11 @@ protected void submitApplication(
|
||||
ApplicationId appId = submissionContext.getApplicationId();
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
Credentials credentials = null;
|
||||
try {
|
||||
credentials = parseCredentials(submissionContext);
|
||||
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
|
||||
credentials, submissionContext.getCancelTokensWhenComplete(),
|
||||
application.getUser());
|
||||
parseCredentials(submissionContext),
|
||||
submissionContext.getCancelTokensWhenComplete(),
|
||||
application.getUser());
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Unable to parse credentials.", e);
|
||||
// Sending APP_REJECTED is fine, since we assume that the
|
||||
@ -299,10 +299,8 @@ protected void submitApplication(
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void
|
||||
recoverApplication(ApplicationState appState, RMState rmState)
|
||||
throws Exception {
|
||||
protected void recoverApplication(ApplicationState appState, RMState rmState)
|
||||
throws Exception {
|
||||
ApplicationSubmissionContext appContext =
|
||||
appState.getApplicationSubmissionContext();
|
||||
ApplicationId appId = appState.getAppId();
|
||||
@ -311,33 +309,7 @@ protected void submitApplication(
|
||||
RMAppImpl application =
|
||||
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
|
||||
appState.getUser());
|
||||
application.recover(rmState);
|
||||
if (isApplicationInFinalState(appState.getState())) {
|
||||
// We are synchronously moving the application into final state so that
|
||||
// momentarily client will not see this application in NEW state. Also
|
||||
// for finished applications we will avoid renewing tokens.
|
||||
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
|
||||
return;
|
||||
}
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
Credentials credentials = null;
|
||||
try {
|
||||
credentials = parseCredentials(appContext);
|
||||
// synchronously renew delegation token on recovery.
|
||||
rmContext.getDelegationTokenRenewer().addApplicationSync(appId,
|
||||
credentials, appContext.getCancelTokensWhenComplete(),
|
||||
application.getUser());
|
||||
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Unable to parse and renew delegation tokens.", e);
|
||||
this.rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMAppRejectedEvent(appId, e.getMessage()));
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
|
||||
}
|
||||
application.handle(new RMAppRecoverEvent(appId, rmState));
|
||||
}
|
||||
|
||||
private RMAppImpl createAndPopulateNewRMApp(
|
||||
@ -416,18 +388,9 @@ private ResourceRequest validateAndCreateResourceRequest(
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private boolean isApplicationInFinalState(RMAppState rmAppState) {
|
||||
if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED
|
||||
|| rmAppState == RMAppState.KILLED) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
protected Credentials parseCredentials(ApplicationSubmissionContext application)
|
||||
throws IOException {
|
||||
protected Credentials parseCredentials(
|
||||
ApplicationSubmissionContext application) throws IOException {
|
||||
Credentials credentials = new Credentials();
|
||||
DataInputByteBuffer dibb = new DataInputByteBuffer();
|
||||
ByteBuffer tokens = application.getAMContainerSpec().getTokens();
|
||||
|
@ -18,8 +18,10 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
@ -36,6 +38,8 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
@ -825,6 +829,15 @@ private static final class RMAppRecoveredTransition implements
|
||||
@Override
|
||||
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||
|
||||
RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event;
|
||||
try {
|
||||
app.recover(recoverEvent.getRMState());
|
||||
} catch (Exception e) {
|
||||
String msg = app.applicationId + " failed to recover. " + e.getMessage();
|
||||
failToRecoverApp(app, event, msg, e);
|
||||
return RMAppState.FINAL_SAVING;
|
||||
}
|
||||
|
||||
// The app has completed.
|
||||
if (app.recoveredFinalState != null) {
|
||||
app.recoverAppAttempts();
|
||||
@ -832,6 +845,20 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||
return app.recoveredFinalState;
|
||||
}
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
// synchronously renew delegation token on recovery.
|
||||
try {
|
||||
app.rmContext.getDelegationTokenRenewer().addApplicationSync(
|
||||
app.getApplicationId(), app.parseCredentials(),
|
||||
app.submissionContext.getCancelTokensWhenComplete(), app.getUser());
|
||||
} catch (Exception e) {
|
||||
String msg = "Failed to renew delegation token on recovery for "
|
||||
+ app.applicationId + e.getMessage();
|
||||
failToRecoverApp(app, event, msg, e);
|
||||
return RMAppState.FINAL_SAVING;
|
||||
}
|
||||
}
|
||||
|
||||
// No existent attempts means the attempt associated with this app was not
|
||||
// started or started but not yet saved.
|
||||
if (app.attempts.isEmpty()) {
|
||||
@ -865,6 +892,14 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||
// Thus we return ACCECPTED state on recovery.
|
||||
return RMAppState.ACCEPTED;
|
||||
}
|
||||
|
||||
private void failToRecoverApp(RMAppImpl app, RMAppEvent event, String msg,
|
||||
Exception e) {
|
||||
app.diagnostics.append(msg);
|
||||
LOG.error(msg, e);
|
||||
app.rememberTargetTransitionsAndStoreState(event, new FinalTransition(
|
||||
RMAppState.FAILED), RMAppState.FAILED, RMAppState.FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class AddApplicationToSchedulerTransition extends
|
||||
@ -1296,4 +1331,16 @@ public void setSystemClock(Clock clock) {
|
||||
public ReservationId getReservationId() {
|
||||
return submissionContext.getReservationID();
|
||||
}
|
||||
|
||||
protected Credentials parseCredentials() throws IOException {
|
||||
Credentials credentials = new Credentials();
|
||||
DataInputByteBuffer dibb = new DataInputByteBuffer();
|
||||
ByteBuffer tokens = submissionContext.getAMContainerSpec().getTokens();
|
||||
if (tokens != null) {
|
||||
dibb.reset(tokens);
|
||||
credentials.readTokenStorageStream(dibb);
|
||||
tokens.rewind();
|
||||
}
|
||||
return credentials;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,36 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
|
||||
public class RMAppRecoverEvent extends RMAppEvent {
|
||||
|
||||
private final RMState state;
|
||||
|
||||
public RMAppRecoverEvent(ApplicationId appId, RMState state) {
|
||||
super(appId, RMAppEventType.RECOVER);
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
public RMState getRMState() {
|
||||
return state;
|
||||
}
|
||||
}
|
@ -833,8 +833,10 @@ private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
byte[] clientTokenMasterKeyBytes = appAttemptTokens.getSecretKey(
|
||||
RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME);
|
||||
clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager()
|
||||
.registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
|
||||
if (clientTokenMasterKeyBytes != null) {
|
||||
clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager()
|
||||
.registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
|
||||
}
|
||||
}
|
||||
|
||||
this.amrmToken =
|
||||
|
@ -0,0 +1,32 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
||||
@Private
|
||||
public class QueueNotFoundException extends YarnRuntimeException {
|
||||
|
||||
private static final long serialVersionUID = 187239430L;
|
||||
|
||||
public QueueNotFoundException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
@ -80,6 +80,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
|
||||
@ -676,15 +677,13 @@ private synchronized void addApplication(ApplicationId applicationId,
|
||||
//During a restart, this indicates a queue was removed, which is
|
||||
//not presently supported
|
||||
if (isAppRecovering) {
|
||||
//throwing RuntimeException because some other exceptions are caught
|
||||
//(including YarnRuntimeException) and we want this to force an exit
|
||||
String queueErrorMsg = "Queue named " + queueName
|
||||
String queueErrorMsg = "Queue named " + queueName
|
||||
+ " missing during application recovery."
|
||||
+ " Queue removal during recovery is not presently supported by the"
|
||||
+ " capacity scheduler, please restart with all queues configured"
|
||||
+ " which were present before shutdown/restart.";
|
||||
LOG.fatal(queueErrorMsg);
|
||||
throw new RuntimeException(queueErrorMsg);
|
||||
throw new QueueNotFoundException(queueErrorMsg);
|
||||
}
|
||||
String message = "Application " + applicationId +
|
||||
" submitted by user " + user + " to unknown queue: " + queueName;
|
||||
|
@ -37,6 +37,7 @@
|
||||
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
@ -61,6 +62,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
@ -570,10 +572,10 @@ public void testCapacitySchedulerRecovery() throws Exception {
|
||||
// submission
|
||||
//2. Remove one of the queues, restart the RM
|
||||
//3. Verify that the expected exception was thrown
|
||||
@Test (timeout = 30000)
|
||||
@Test (timeout = 30000, expected = QueueNotFoundException.class)
|
||||
public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
|
||||
if (!schedulerClass.equals(CapacityScheduler.class)) {
|
||||
return;
|
||||
throw new QueueNotFoundException("Dummy");
|
||||
}
|
||||
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|
||||
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||
@ -614,17 +616,7 @@ public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
|
||||
new CapacitySchedulerConfiguration(conf);
|
||||
setupQueueConfigurationOnlyA(csConf);
|
||||
rm2 = new MockRM(csConf, memStore);
|
||||
boolean runtimeThrown = false;
|
||||
try {
|
||||
rm2.start();
|
||||
} catch (RuntimeException e) {
|
||||
//we're catching it because we want to verify the message
|
||||
//and we don't want to set it as an expected exception for the
|
||||
//test because we only want it to happen here
|
||||
assertTrue(e.getMessage().contains(B + " missing"));
|
||||
runtimeThrown = true;
|
||||
}
|
||||
assertTrue(runtimeThrown);
|
||||
rm2.start();
|
||||
}
|
||||
|
||||
private void checkParentQueue(ParentQueue parentQueue, int numContainers,
|
||||
|
@ -28,6 +28,7 @@
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
@ -35,6 +36,8 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||
@ -43,6 +46,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||
@ -73,9 +77,11 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -199,10 +205,11 @@ public void setUp() throws Exception {
|
||||
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
|
||||
store = mock(RMStateStore.class);
|
||||
writer = mock(RMApplicationHistoryWriter.class);
|
||||
DelegationTokenRenewer renewer = mock(DelegationTokenRenewer.class);
|
||||
RMContext realRMContext =
|
||||
new RMContextImpl(rmDispatcher,
|
||||
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||
null, new AMRMTokenSecretManager(conf, this.rmContext),
|
||||
renewer, new AMRMTokenSecretManager(conf, this.rmContext),
|
||||
new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM(),
|
||||
@ -387,8 +394,12 @@ protected RMApp testCreateAppSubmittedRecovery(
|
||||
ApplicationSubmissionContext submissionContext) throws IOException {
|
||||
RMApp application = createNewTestApp(submissionContext);
|
||||
// NEW => SUBMITTED event RMAppEventType.RECOVER
|
||||
RMState state = new RMState();
|
||||
ApplicationState appState = new ApplicationState(123, 123, null, "user");
|
||||
state.getApplicationState().put(application.getApplicationId(), appState);
|
||||
RMAppEvent event =
|
||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.RECOVER);
|
||||
new RMAppRecoverEvent(application.getApplicationId(), state);
|
||||
|
||||
application.handle(event);
|
||||
assertStartTimeSet(application);
|
||||
assertAppState(RMAppState.SUBMITTED, application);
|
||||
@ -514,7 +525,46 @@ public void testAppSuccessPath() throws IOException {
|
||||
@Test (timeout = 30000)
|
||||
public void testAppRecoverPath() throws IOException {
|
||||
LOG.info("--- START: testAppRecoverPath ---");
|
||||
testCreateAppSubmittedRecovery(null);
|
||||
ApplicationSubmissionContext sub =
|
||||
Records.newRecord(ApplicationSubmissionContext.class);
|
||||
ContainerLaunchContext clc =
|
||||
Records.newRecord(ContainerLaunchContext.class);
|
||||
Credentials credentials = new Credentials();
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
credentials.writeTokenStorageToStream(dob);
|
||||
ByteBuffer securityTokens =
|
||||
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
clc.setTokens(securityTokens);
|
||||
sub.setAMContainerSpec(clc);
|
||||
testCreateAppSubmittedRecovery(sub);
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testAppRecoverToFailed() throws IOException {
|
||||
LOG.info("--- START: testAppRecoverToFailed ---");
|
||||
ApplicationSubmissionContext sub =
|
||||
Records.newRecord(ApplicationSubmissionContext.class);
|
||||
ContainerLaunchContext clc =
|
||||
Records.newRecord(ContainerLaunchContext.class);
|
||||
Credentials credentials = new Credentials();
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
credentials.writeTokenStorageToStream(dob);
|
||||
ByteBuffer securityTokens =
|
||||
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
clc.setTokens(securityTokens);
|
||||
sub.setAMContainerSpec(clc);
|
||||
|
||||
RMApp application = createNewTestApp(sub);
|
||||
// NEW => FINAL_SAVING, event RMAppEventType.RECOVER
|
||||
RMState state = new RMState();
|
||||
RMAppEvent event =
|
||||
new RMAppRecoverEvent(application.getApplicationId(), state);
|
||||
// NPE will throw on recovery.
|
||||
application.handle(event);
|
||||
assertAppState(RMAppState.FINAL_SAVING, application);
|
||||
sendAppUpdateSavedEvent(application);
|
||||
rmDispatcher.await();
|
||||
assertAppState(RMAppState.FAILED, application);
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
@ -917,7 +967,6 @@ public void testAppsRecoveringStates() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testRecoverApplication(ApplicationState appState, RMState rmState)
|
||||
throws Exception {
|
||||
ApplicationSubmissionContext submissionContext =
|
||||
@ -932,15 +981,15 @@ public void testRecoverApplication(ApplicationState appState, RMState rmState)
|
||||
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
||||
submissionContext.getResource(), 1));
|
||||
Assert.assertEquals(RMAppState.NEW, application.getState());
|
||||
application.recover(rmState);
|
||||
|
||||
RMAppEvent recoverEvent =
|
||||
new RMAppRecoverEvent(application.getApplicationId(), rmState);
|
||||
// Trigger RECOVER event.
|
||||
application.handle(recoverEvent);
|
||||
// Application final status looked from recoveredFinalStatus
|
||||
Assert.assertTrue("Application is not in recoveredFinalStatus.",
|
||||
RMAppImpl.isAppInFinalState(application));
|
||||
|
||||
// Trigger RECOVER event.
|
||||
application.handle(new RMAppEvent(appState.getAppId(),
|
||||
RMAppEventType.RECOVER));
|
||||
rmDispatcher.await();
|
||||
RMAppState finalState = appState.getState();
|
||||
Assert.assertEquals("Application is not in finalState.", finalState,
|
||||
|
Loading…
Reference in New Issue
Block a user