YARN-280. RM does not reject app submission with invalid tokens (Daryn Sharp via tgraves)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1425079 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
92774331cc
commit
a806a2aac6
@ -241,6 +241,9 @@ Release 0.23.6 - UNRELEASED
|
|||||||
YARN-266. RM and JHS Web UIs are blank because AppsBlock is not escaping
|
YARN-266. RM and JHS Web UIs are blank because AppsBlock is not escaping
|
||||||
string properly (Ravi Prakash via jlowe)
|
string properly (Ravi Prakash via jlowe)
|
||||||
|
|
||||||
|
YARN-280. RM does not reject app submission with invalid tokens
|
||||||
|
(Daryn Sharp via tgraves)
|
||||||
|
|
||||||
Release 0.23.5 - UNRELEASED
|
Release 0.23.5 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -276,21 +276,26 @@ public synchronized void addApplication(
|
|||||||
Collection <Token<?>> tokens = ts.getAllTokens();
|
Collection <Token<?>> tokens = ts.getAllTokens();
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// find tokens for renewal, but don't add timers until we know
|
||||||
|
// all renewable tokens are valid
|
||||||
|
Set<DelegationTokenToRenew> dtrs = new HashSet<DelegationTokenToRenew>();
|
||||||
for(Token<?> token : tokens) {
|
for(Token<?> token : tokens) {
|
||||||
// first renew happens immediately
|
// first renew happens immediately
|
||||||
if (token.isManaged()) {
|
if (token.isManaged()) {
|
||||||
DelegationTokenToRenew dtr =
|
DelegationTokenToRenew dtr =
|
||||||
new DelegationTokenToRenew(applicationId, token, getConfig(), now,
|
new DelegationTokenToRenew(applicationId, token, getConfig(), now,
|
||||||
shouldCancelAtEnd);
|
shouldCancelAtEnd);
|
||||||
|
renewToken(dtr);
|
||||||
addTokenToList(dtr);
|
dtrs.add(dtr);
|
||||||
|
}
|
||||||
setTimerForTokenRenewal(dtr, true);
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
for (DelegationTokenToRenew dtr : dtrs) {
|
||||||
LOG.debug("Registering token for renewal for:" +
|
addTokenToList(dtr);
|
||||||
" service = " + token.getService() +
|
setTimerForTokenRenewal(dtr);
|
||||||
" for appId = " + applicationId);
|
if (LOG.isDebugEnabled()) {
|
||||||
}
|
LOG.debug("Registering token for renewal for:" +
|
||||||
|
" service = " + dtr.token.getService() +
|
||||||
|
" for appId = " + applicationId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -315,22 +320,13 @@ public synchronized void run() {
|
|||||||
|
|
||||||
Token<?> token = dttr.token;
|
Token<?> token = dttr.token;
|
||||||
try {
|
try {
|
||||||
// need to use doAs so that http can find the kerberos tgt
|
renewToken(dttr);
|
||||||
dttr.expirationDate = UserGroupInformation.getLoginUser()
|
|
||||||
.doAs(new PrivilegedExceptionAction<Long>(){
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Long run() throws Exception {
|
|
||||||
return dttr.token.renew(dttr.conf);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Renewing delegation-token for:" + token.getService() +
|
LOG.debug("Renewing delegation-token for:" + token.getService() +
|
||||||
"; new expiration;" + dttr.expirationDate);
|
"; new expiration;" + dttr.expirationDate);
|
||||||
}
|
}
|
||||||
|
|
||||||
setTimerForTokenRenewal(dttr, false);// set the next one
|
setTimerForTokenRenewal(dttr);// set the next one
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
|
LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
|
||||||
removeFailedDelegationToken(dttr);
|
removeFailedDelegationToken(dttr);
|
||||||
@ -347,19 +343,12 @@ public synchronized boolean cancel() {
|
|||||||
/**
|
/**
|
||||||
* set task to renew the token
|
* set task to renew the token
|
||||||
*/
|
*/
|
||||||
private
|
private void setTimerForTokenRenewal(DelegationTokenToRenew token)
|
||||||
void setTimerForTokenRenewal(DelegationTokenToRenew token,
|
throws IOException {
|
||||||
boolean firstTime) throws IOException {
|
|
||||||
|
|
||||||
// calculate timer time
|
// calculate timer time
|
||||||
long now = System.currentTimeMillis();
|
long expiresIn = token.expirationDate - System.currentTimeMillis();
|
||||||
long renewIn;
|
long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
|
||||||
if(firstTime) {
|
|
||||||
renewIn = now;
|
|
||||||
} else {
|
|
||||||
long expiresIn = (token.expirationDate - now);
|
|
||||||
renewIn = now + expiresIn - expiresIn/10; // little bit before the expiration
|
|
||||||
}
|
|
||||||
|
|
||||||
// need to create new task every time
|
// need to create new task every time
|
||||||
TimerTask tTask = new RenewalTimerTask(token);
|
TimerTask tTask = new RenewalTimerTask(token);
|
||||||
@ -368,6 +357,24 @@ void setTimerForTokenRenewal(DelegationTokenToRenew token,
|
|||||||
renewalTimer.schedule(token.timerTask, new Date(renewIn));
|
renewalTimer.schedule(token.timerTask, new Date(renewIn));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// renew a token
|
||||||
|
private void renewToken(final DelegationTokenToRenew dttr)
|
||||||
|
throws IOException {
|
||||||
|
// need to use doAs so that http can find the kerberos tgt
|
||||||
|
// NOTE: token renewers should be responsible for the correct UGI!
|
||||||
|
try {
|
||||||
|
dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
|
||||||
|
new PrivilegedExceptionAction<Long>(){
|
||||||
|
@Override
|
||||||
|
public Long run() throws Exception {
|
||||||
|
return dttr.token.renew(dttr.conf);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// cancel a token
|
// cancel a token
|
||||||
private void cancelToken(DelegationTokenToRenew t) {
|
private void cancelToken(DelegationTokenToRenew t) {
|
||||||
if(t.shouldCancelAtEnd) {
|
if(t.shouldCancelAtEnd) {
|
||||||
|
@ -357,6 +357,27 @@ public void testDTRenewal () throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidDTWithAddApplication() throws Exception {
|
||||||
|
MyFS dfs = (MyFS)FileSystem.get(conf);
|
||||||
|
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
|
||||||
|
|
||||||
|
MyToken token = dfs.getDelegationToken(new Text("user1"));
|
||||||
|
token.cancelToken();
|
||||||
|
|
||||||
|
Credentials ts = new Credentials();
|
||||||
|
ts.addToken(token.getKind(), token);
|
||||||
|
|
||||||
|
// register the tokens for renewal
|
||||||
|
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
|
||||||
|
try {
|
||||||
|
delegationTokenRenewer.addApplication(appId, ts, true);
|
||||||
|
fail("App submission with a cancelled token should have failed");
|
||||||
|
} catch (InvalidToken e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Basic idea of the test:
|
* Basic idea of the test:
|
||||||
* 1. register a token for 2 seconds with no cancel at the end
|
* 1. register a token for 2 seconds with no cancel at the end
|
||||||
|
Loading…
Reference in New Issue
Block a user