YARN-1116. Populate AMRMTokens back to AMRMTokenSecretManager after RM restarts (Jian He via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1523146 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a56a4b6ef0
commit
f4951e0708
@ -237,15 +237,6 @@ public void run() {
|
|||||||
} catch (YarnRuntimeException e) {
|
} catch (YarnRuntimeException e) {
|
||||||
LOG.error("Error communicating with RM: " + e.getMessage() , e);
|
LOG.error("Error communicating with RM: " + e.getMessage() , e);
|
||||||
return;
|
return;
|
||||||
} catch (InvalidToken e) {
|
|
||||||
// This can happen if the RM has been restarted, since currently
|
|
||||||
// when RM restarts AMRMToken is not populated back to
|
|
||||||
// AMRMTokenSecretManager yet. Once this is fixed, no need
|
|
||||||
// to send JOB_AM_REBOOT event in this method any more.
|
|
||||||
eventHandler.handle(new JobEvent(job.getID(),
|
|
||||||
JobEventType.JOB_AM_REBOOT));
|
|
||||||
LOG.error("Error in authencating with RM: " ,e);
|
|
||||||
return;
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("ERROR IN CONTACTING RM. ", e);
|
LOG.error("ERROR IN CONTACTING RM. ", e);
|
||||||
continue;
|
continue;
|
||||||
|
@ -194,6 +194,9 @@ Release 2.1.1-beta - UNRELEASED
|
|||||||
YARN-1194. TestContainerLogsPage fails with native builds (Roman Shaposhnik
|
YARN-1194. TestContainerLogsPage fails with native builds (Roman Shaposhnik
|
||||||
via jlowe)
|
via jlowe)
|
||||||
|
|
||||||
|
YARN-1116. Populate AMRMTokens back to AMRMTokenSecretManager after RM
|
||||||
|
restarts (Jian He via bikas)
|
||||||
|
|
||||||
Release 2.1.0-beta - 2013-08-22
|
Release 2.1.0-beta - 2013-08-22
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -569,7 +569,7 @@ public void handle(RMAppEvent event) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recover(RMState state) {
|
public void recover(RMState state) throws Exception{
|
||||||
ApplicationState appState = state.getApplicationState().get(getApplicationId());
|
ApplicationState appState = state.getApplicationState().get(getApplicationId());
|
||||||
LOG.info("Recovering app: " + getApplicationId() + " with " +
|
LOG.info("Recovering app: " + getApplicationId() + " with " +
|
||||||
+ appState.getAttemptCount() + " attempts");
|
+ appState.getAttemptCount() + " attempts");
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
|
import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -675,7 +676,7 @@ public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recover(RMState state) {
|
public void recover(RMState state) throws Exception{
|
||||||
ApplicationState appState =
|
ApplicationState appState =
|
||||||
state.getApplicationState().get(getAppAttemptId().getApplicationId());
|
state.getApplicationState().get(getAppAttemptId().getApplicationId());
|
||||||
ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
|
ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
|
||||||
@ -690,7 +691,8 @@ public void recover(RMState state) {
|
|||||||
RMAppAttemptEventType.RECOVER));
|
RMAppAttemptEventType.RECOVER));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void recoverAppAttemptCredentials(Credentials appAttemptTokens) {
|
private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
|
||||||
|
throws IOException {
|
||||||
if (appAttemptTokens == null) {
|
if (appAttemptTokens == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -707,11 +709,7 @@ private void recoverAppAttemptCredentials(Credentials appAttemptTokens) {
|
|||||||
this.amrmToken =
|
this.amrmToken =
|
||||||
(Token<AMRMTokenIdentifier>) appAttemptTokens
|
(Token<AMRMTokenIdentifier>) appAttemptTokens
|
||||||
.getToken(RMStateStore.AM_RM_TOKEN_SERVICE);
|
.getToken(RMStateStore.AM_RM_TOKEN_SERVICE);
|
||||||
|
rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken);
|
||||||
// For now, no need to populate tokens back to AMRMTokenSecretManager,
|
|
||||||
// because running attempts are rebooted. Later in work-preserve restart,
|
|
||||||
// we'll create NEW->RUNNING transition in which the restored tokens will be
|
|
||||||
// added to the secret manager
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class BaseTransition implements
|
private static class BaseTransition implements
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
@ -30,6 +31,7 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.token.SecretManager;
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
@ -123,6 +125,19 @@ public synchronized byte[] createPassword(
|
|||||||
return password;
|
return password;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Populate persisted password of AMRMToken back to AMRMTokenSecretManager.
|
||||||
|
*/
|
||||||
|
public synchronized void
|
||||||
|
addPersistedPassword(Token<AMRMTokenIdentifier> token) throws IOException {
|
||||||
|
AMRMTokenIdentifier identifier = token.decodeIdentifier();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Adding password for " + identifier.getApplicationAttemptId());
|
||||||
|
}
|
||||||
|
this.passwords.put(identifier.getApplicationAttemptId(),
|
||||||
|
token.getPassword());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve the password for the given {@link AMRMTokenIdentifier}.
|
* Retrieve the password for the given {@link AMRMTokenIdentifier}.
|
||||||
* Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}.
|
* Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}.
|
||||||
|
@ -52,6 +52,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
@ -577,14 +578,16 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
|
|||||||
attempt1.getClientTokenMasterKey(),
|
attempt1.getClientTokenMasterKey(),
|
||||||
loadedAttempt1.getClientTokenMasterKey());
|
loadedAttempt1.getClientTokenMasterKey());
|
||||||
|
|
||||||
// assert secret manager also knows about the key
|
// assert ClientTokenSecretManager also knows about the key
|
||||||
Assert.assertArrayEquals(clientTokenMasterKey,
|
Assert.assertArrayEquals(clientTokenMasterKey,
|
||||||
rm2.getClientToAMTokenSecretManager().getMasterKey(attemptId1)
|
rm2.getClientToAMTokenSecretManager().getMasterKey(attemptId1)
|
||||||
.getEncoded());
|
.getEncoded());
|
||||||
|
|
||||||
// Not testing ApplicationTokenSecretManager has the password populated back,
|
// assert AMRMTokenSecretManager also knows about the AMRMToken password
|
||||||
// that is needed in work-preserving restart
|
Token<AMRMTokenIdentifier> amrmToken = loadedAttempt1.getAMRMToken();
|
||||||
|
Assert.assertArrayEquals(amrmToken.getPassword(),
|
||||||
|
rm2.getAMRMTokenSecretManager().retrievePassword(
|
||||||
|
amrmToken.decodeIdentifier()));
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user