YARN-2834. Fixed ResourceManager to ignore token-renewal failures on recovery consistent with the (somewhat incorrect) behaviour in the non-recovery case. Contributed by Jian He.
This commit is contained in:
parent
770cc14442
commit
e76faebc95
@ -925,6 +925,10 @@ Release 2.6.0 - 2014-11-15
|
||||
YARN-2830. Add backwords compatible ContainerId.newInstance constructor.
|
||||
(jeagles via acmurthy)
|
||||
|
||||
YARN-2834. Fixed ResourceManager to ignore token-renewal failures on recovery
|
||||
consistent with the (somewhat incorrect) behaviour in the non-recovery case.
|
||||
(Jian He via vinodkv)
|
||||
|
||||
Release 2.5.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -714,7 +714,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recover(RMState state) throws Exception{
|
||||
public void recover(RMState state) {
|
||||
ApplicationState appState = state.getApplicationState().get(getApplicationId());
|
||||
this.recoveredFinalState = appState.getState();
|
||||
LOG.info("Recovering app: " + getApplicationId() + " with " +
|
||||
@ -830,14 +830,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||
|
||||
RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event;
|
||||
try {
|
||||
app.recover(recoverEvent.getRMState());
|
||||
} catch (Exception e) {
|
||||
String msg = app.applicationId + " failed to recover. " + e.getMessage();
|
||||
failToRecoverApp(app, event, msg, e);
|
||||
return RMAppState.FINAL_SAVING;
|
||||
}
|
||||
|
||||
app.recover(recoverEvent.getRMState());
|
||||
// The app has completed.
|
||||
if (app.recoveredFinalState != null) {
|
||||
app.recoverAppAttempts();
|
||||
@ -852,10 +845,10 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
app.getApplicationId(), app.parseCredentials(),
|
||||
app.submissionContext.getCancelTokensWhenComplete(), app.getUser());
|
||||
} catch (Exception e) {
|
||||
String msg = "Failed to renew delegation token on recovery for "
|
||||
+ app.applicationId + e.getMessage();
|
||||
failToRecoverApp(app, event, msg, e);
|
||||
return RMAppState.FINAL_SAVING;
|
||||
String msg = "Failed to renew token for " + app.applicationId
|
||||
+ " on recovery : " + e.getMessage();
|
||||
app.diagnostics.append(msg);
|
||||
LOG.error(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -892,14 +885,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
// Thus we return ACCECPTED state on recovery.
|
||||
return RMAppState.ACCEPTED;
|
||||
}
|
||||
|
||||
private void failToRecoverApp(RMAppImpl app, RMAppEvent event, String msg,
|
||||
Exception e) {
|
||||
app.diagnostics.append(msg);
|
||||
LOG.error(msg, e);
|
||||
app.rememberTargetTransitionsAndStoreState(event, new FinalTransition(
|
||||
RMAppState.FAILED), RMAppState.FAILED, RMAppState.FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class AddApplicationToSchedulerTransition extends
|
||||
|
@ -789,7 +789,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recover(RMState state) throws Exception {
|
||||
public void recover(RMState state) {
|
||||
ApplicationState appState =
|
||||
state.getApplicationState().get(getAppAttemptId().getApplicationId());
|
||||
ApplicationAttemptState attemptState =
|
||||
@ -823,7 +823,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||
}
|
||||
|
||||
private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
|
||||
RMAppAttemptState state) throws IOException {
|
||||
RMAppAttemptState state) {
|
||||
if (appAttemptTokens == null || state == RMAppAttemptState.FAILED
|
||||
|| state == RMAppAttemptState.FINISHED
|
||||
|| state == RMAppAttemptState.KILLED) {
|
||||
|
@ -18,15 +18,15 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@ -35,9 +35,10 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
@ -50,9 +51,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
@ -71,9 +70,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
@ -1011,4 +1014,50 @@ public class TestWorkPreservingRMRestart {
|
||||
am0.unregisterAppAttempt(false);
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testAppFailedToRenewTokenOnRecovery() throws Exception {
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
MockRM rm2 = new TestSecurityMockRM(conf, memStore) {
|
||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||
return new DelegationTokenRenewer() {
|
||||
@Override
|
||||
public void addApplicationSync(ApplicationId applicationId,
|
||||
Credentials ts, boolean shouldCancelAtEnd, String user)
|
||||
throws IOException {
|
||||
throw new IOException("Token renew failed !!");
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
rm2.start();
|
||||
NMContainerStatus containerStatus =
|
||||
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1,
|
||||
ContainerState.RUNNING);
|
||||
nm1.registerNode(Arrays.asList(containerStatus), null);
|
||||
|
||||
// am re-register
|
||||
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
||||
am1.registerAppAttempt(true);
|
||||
rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
||||
|
||||
// Because the token expired, am could crash.
|
||||
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||
}
|
||||
}
|
||||
|
@ -539,34 +539,6 @@ public class TestRMAppTransitions {
|
||||
testCreateAppSubmittedRecovery(sub);
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testAppRecoverToFailed() throws IOException {
|
||||
LOG.info("--- START: testAppRecoverToFailed ---");
|
||||
ApplicationSubmissionContext sub =
|
||||
Records.newRecord(ApplicationSubmissionContext.class);
|
||||
ContainerLaunchContext clc =
|
||||
Records.newRecord(ContainerLaunchContext.class);
|
||||
Credentials credentials = new Credentials();
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
credentials.writeTokenStorageToStream(dob);
|
||||
ByteBuffer securityTokens =
|
||||
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
clc.setTokens(securityTokens);
|
||||
sub.setAMContainerSpec(clc);
|
||||
|
||||
RMApp application = createNewTestApp(sub);
|
||||
// NEW => FINAL_SAVING, event RMAppEventType.RECOVER
|
||||
RMState state = new RMState();
|
||||
RMAppEvent event =
|
||||
new RMAppRecoverEvent(application.getApplicationId(), state);
|
||||
// NPE will throw on recovery.
|
||||
application.handle(event);
|
||||
assertAppState(RMAppState.FINAL_SAVING, application);
|
||||
sendAppUpdateSavedEvent(application);
|
||||
rmDispatcher.await();
|
||||
assertAppState(RMAppState.FAILED, application);
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testAppNewKill() throws IOException {
|
||||
LOG.info("--- START: testAppNewKill ---");
|
||||
|
Loading…
x
Reference in New Issue
Block a user