YARN-5098. Fixed ResourceManager's DelegationTokenRenewer to replace expiring system-tokens if RM stops and only restarts after a long time. Contributed by Jian He.
This commit is contained in:
parent
99cc439e29
commit
f10ebc67f5
@ -53,6 +53,7 @@
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
@ -459,6 +460,18 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
|
||||
try {
|
||||
renewToken(dttr);
|
||||
} catch (IOException ioe) {
|
||||
if (ioe instanceof SecretManager.InvalidToken
|
||||
&& dttr.maxDate < Time.now()
|
||||
&& evt instanceof DelegationTokenRenewerAppRecoverEvent
|
||||
&& token.getKind().equals(HDFS_DELEGATION_KIND)) {
|
||||
LOG.info("Failed to renew hdfs token " + dttr
|
||||
+ " on recovery as it expired, requesting new hdfs token for "
|
||||
+ applicationId + ", user=" + evt.getUser(), ioe);
|
||||
requestNewHdfsDelegationTokenAsProxyUser(
|
||||
Arrays.asList(applicationId), evt.getUser(),
|
||||
evt.shouldCancelAtEnd());
|
||||
continue;
|
||||
}
|
||||
throw new IOException("Failed to renew token: " + dttr.token, ioe);
|
||||
}
|
||||
}
|
||||
@ -485,7 +498,8 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
|
||||
}
|
||||
|
||||
if (!hasHdfsToken) {
|
||||
requestNewHdfsDelegationToken(Arrays.asList(applicationId), evt.getUser(),
|
||||
requestNewHdfsDelegationTokenAsProxyUser(Arrays.asList(applicationId),
|
||||
evt.getUser(),
|
||||
shouldCancelAtEnd);
|
||||
}
|
||||
}
|
||||
@ -586,8 +600,7 @@ public Long run() throws Exception {
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
LOG.info("Renewed delegation-token= [" + dttr + "], for "
|
||||
+ dttr.referringAppIds);
|
||||
LOG.info("Renewed delegation-token= [" + dttr + "]");
|
||||
}
|
||||
|
||||
// Request new hdfs token if the token is about to expire, and remove the old
|
||||
@ -625,12 +638,12 @@ private void requestNewHdfsDelegationTokenIfNeeded(
|
||||
}
|
||||
}
|
||||
LOG.info("Token= (" + dttr + ") is expiring, request new token.");
|
||||
requestNewHdfsDelegationToken(applicationIds, dttr.user,
|
||||
requestNewHdfsDelegationTokenAsProxyUser(applicationIds, dttr.user,
|
||||
dttr.shouldCancelAtEnd);
|
||||
}
|
||||
}
|
||||
|
||||
private void requestNewHdfsDelegationToken(
|
||||
private void requestNewHdfsDelegationTokenAsProxyUser(
|
||||
Collection<ApplicationId> referringAppIds,
|
||||
String user, boolean shouldCancelAtEnd) throws IOException,
|
||||
InterruptedException {
|
||||
@ -912,8 +925,8 @@ private void handleDTRenewerAppRecoverEvent(
|
||||
// Setup tokens for renewal during recovery
|
||||
DelegationTokenRenewer.this.handleAppSubmitEvent(event);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn(
|
||||
"Unable to add the application to the delegation token renewer.", t);
|
||||
LOG.warn("Unable to add the application to the delegation token"
|
||||
+ " renewer on recovery.", t);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -43,6 +43,7 @@
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -84,6 +85,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
@ -968,6 +970,101 @@ public Boolean get() {
|
||||
Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken));
|
||||
}
|
||||
|
||||
|
||||
// 1. token is expired before app completes.
|
||||
// 2. RM shutdown.
|
||||
// 3. When RM recovers the app, token renewal will fail as token expired.
|
||||
// RM should request a new token and sent it to NM for log-aggregation.
|
||||
@Test
|
||||
public void testRMRestartWithExpiredToken() throws Exception {
|
||||
Configuration yarnConf = new YarnConfiguration();
|
||||
yarnConf
|
||||
.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
|
||||
yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
yarnConf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
yarnConf
|
||||
.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
UserGroupInformation.setConfiguration(yarnConf);
|
||||
|
||||
// create Token1:
|
||||
Text userText1 = new Text("user1");
|
||||
DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1,
|
||||
new Text("renewer1"), userText1);
|
||||
final Token<DelegationTokenIdentifier> originalToken =
|
||||
new Token<>(dtId1.getBytes(), "password1".getBytes(), dtId1.getKind(),
|
||||
new Text("service1"));
|
||||
Credentials credentials = new Credentials();
|
||||
credentials.addToken(userText1, originalToken);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(yarnConf);
|
||||
MockRM rm1 = new TestSecurityMockRM(yarnConf, memStore);
|
||||
rm1.start();
|
||||
RMApp app = rm1.submitApp(200, "name", "user",
|
||||
new HashMap<ApplicationAccessType, String>(), false, "default", 1,
|
||||
credentials);
|
||||
|
||||
// create token2
|
||||
Text userText2 = new Text("user1");
|
||||
DelegationTokenIdentifier dtId2 =
|
||||
new DelegationTokenIdentifier(userText1, new Text("renewer2"),
|
||||
userText2);
|
||||
final Token<DelegationTokenIdentifier> updatedToken =
|
||||
new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
|
||||
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
|
||||
AtomicBoolean firstRenewInvoked = new AtomicBoolean(false);
|
||||
AtomicBoolean secondRenewInvoked = new AtomicBoolean(false);
|
||||
MockRM rm2 = new TestSecurityMockRM(yarnConf, memStore) {
|
||||
@Override
|
||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||
return new DelegationTokenRenewer() {
|
||||
|
||||
@Override
|
||||
protected void renewToken(final DelegationTokenToRenew dttr)
|
||||
throws IOException {
|
||||
|
||||
if (dttr.token.equals(updatedToken)) {
|
||||
secondRenewInvoked.set(true);
|
||||
super.renewToken(dttr);
|
||||
} else if (dttr.token.equals(originalToken)){
|
||||
firstRenewInvoked.set(true);
|
||||
throw new InvalidToken("Failed to renew");
|
||||
} else {
|
||||
throw new IOException("Unexpected");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Token<?>[] obtainSystemTokensForUser(String user,
|
||||
final Credentials credentials) throws IOException {
|
||||
credentials.addToken(updatedToken.getService(), updatedToken);
|
||||
return new Token<?>[] { updatedToken };
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
// simulating restart the rm
|
||||
rm2.start();
|
||||
|
||||
// check nm can retrieve the token
|
||||
final MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
|
||||
ByteBuffer tokenBuffer =
|
||||
response.getSystemCredentialsForApps().get(app.getApplicationId());
|
||||
Assert.assertNotNull(tokenBuffer);
|
||||
Credentials appCredentials = new Credentials();
|
||||
DataInputByteBuffer buf = new DataInputByteBuffer();
|
||||
tokenBuffer.rewind();
|
||||
buf.reset(tokenBuffer);
|
||||
appCredentials.readTokenStorageStream(buf);
|
||||
Assert.assertTrue(firstRenewInvoked.get() && secondRenewInvoked.get());
|
||||
Assert.assertTrue(appCredentials.getAllTokens().contains(updatedToken));
|
||||
}
|
||||
|
||||
// YARN will get the token for the app submitted without the delegation token.
|
||||
@Test
|
||||
public void testAppSubmissionWithoutDelegationToken() throws Exception {
|
||||
@ -1158,4 +1255,5 @@ public void testCancelWithMultipleAppSubmissions() throws Exception{
|
||||
Assert.assertTrue(dttr.isTimerCancelled());
|
||||
Assert.assertTrue(Renewer.cancelled);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user