MAPREDUCE-3291. App fail to launch due to delegation token not found in cache (Robert Evans via mahadev)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1198583 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
072bdd85d1
commit
6733a1ca5e
@ -78,6 +78,9 @@ Release 0.23.1 - Unreleased
|
||||
MAPREDUCE-3217. Reenabled and fixed bugs in the failing ant test
|
||||
TestAuditLogger. (Devaraj K via vinodkv)
|
||||
|
||||
MAPREDUCE-3291. App fail to launch due to delegation token not
|
||||
found in cache (Robert Evans via mahadev)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -389,6 +389,8 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
|
||||
appContext.setApplicationName( // Job name
|
||||
jobConf.get(JobContext.JOB_NAME,
|
||||
YarnConfiguration.DEFAULT_APPLICATION_NAME));
|
||||
appContext.setCancelTokensWhenComplete(
|
||||
conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
|
||||
appContext.setAMContainerSpec(amContainer); // AM Container
|
||||
|
||||
return appContext;
|
||||
|
@ -18,8 +18,11 @@
|
||||
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||
|
||||
/**
|
||||
@ -148,4 +151,21 @@ public interface ApplicationSubmissionContext {
|
||||
@Public
|
||||
@Stable
|
||||
public void setAMContainerSpec(ContainerLaunchContext amContainer);
|
||||
|
||||
/**
|
||||
* @return true if tokens should be canceled when the app completes.
|
||||
*/
|
||||
@LimitedPrivate("mapreduce")
|
||||
@Unstable
|
||||
public boolean getCancelTokensWhenComplete();
|
||||
|
||||
/**
|
||||
* Set to false if tokens should not be canceled when the app finished else
|
||||
* false. WARNING: this is not recommended unless you want your single job
|
||||
* tokens to be reused by others jobs.
|
||||
* @param cancel true if tokens should be canceled when the app finishes.
|
||||
*/
|
||||
@LimitedPrivate("mapreduce")
|
||||
@Unstable
|
||||
public void setCancelTokensWhenComplete(boolean cancel);
|
||||
}
|
@ -206,6 +206,19 @@ public void setAMContainerSpec(ContainerLaunchContext amContainer) {
|
||||
}
|
||||
this.amContainer = amContainer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getCancelTokensWhenComplete() {
|
||||
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
//There is a default so cancelTokens should never be null
|
||||
return p.getCancelTokensWhenComplete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCancelTokensWhenComplete(boolean cancel) {
|
||||
maybeInitBuilder();
|
||||
builder.setCancelTokensWhenComplete(cancel);
|
||||
}
|
||||
|
||||
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
|
||||
return new PriorityPBImpl(p);
|
||||
|
@ -216,6 +216,7 @@ message ApplicationSubmissionContextProto {
|
||||
optional string queue = 4 [default = "default"];
|
||||
optional PriorityProto priority = 5;
|
||||
optional ContainerLaunchContextProto am_container_spec = 6;
|
||||
optional bool cancel_tokens_when_complete = 7 [default = true];
|
||||
}
|
||||
|
||||
enum ApplicationAccessTypeProto {
|
||||
|
@ -278,7 +278,8 @@ protected synchronized void submitApplication(
|
||||
// Setup tokens for renewal
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
this.rmContext.getDelegationTokenRenewer().addApplication(
|
||||
applicationId,parseCredentials(submissionContext)
|
||||
applicationId,parseCredentials(submissionContext),
|
||||
submissionContext.getCancelTokensWhenComplete()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -108,15 +108,17 @@ private static class DelegationTokenToRenew {
|
||||
public final Configuration conf;
|
||||
public long expirationDate;
|
||||
public TimerTask timerTask;
|
||||
public final boolean shouldCancelAtEnd;
|
||||
|
||||
public DelegationTokenToRenew(
|
||||
ApplicationId jId, Token<?> token,
|
||||
Configuration conf, long expirationDate) {
|
||||
Configuration conf, long expirationDate, boolean shouldCancelAtEnd) {
|
||||
this.token = token;
|
||||
this.applicationId = jId;
|
||||
this.conf = conf;
|
||||
this.expirationDate = expirationDate;
|
||||
this.timerTask = null;
|
||||
this.shouldCancelAtEnd = shouldCancelAtEnd;
|
||||
if (this.token==null || this.applicationId==null || this.conf==null) {
|
||||
throw new IllegalArgumentException("Invalid params to renew token" +
|
||||
";token=" + this.token +
|
||||
@ -218,10 +220,12 @@ private void addTokenToList(DelegationTokenToRenew t) {
|
||||
* Add application tokens for renewal.
|
||||
* @param applicationId added application
|
||||
* @param ts tokens
|
||||
* @param shouldCancelAtEnd true if tokens should be canceled when the app is
|
||||
* done else false.
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized void addApplication(
|
||||
ApplicationId applicationId, Credentials ts)
|
||||
ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
|
||||
throws IOException {
|
||||
if (ts == null) {
|
||||
return; //nothing to add
|
||||
@ -239,7 +243,8 @@ public synchronized void addApplication(
|
||||
// first renew happens immediately
|
||||
if (token.isManaged()) {
|
||||
DelegationTokenToRenew dtr =
|
||||
new DelegationTokenToRenew(applicationId, token, getConfig(), now);
|
||||
new DelegationTokenToRenew(applicationId, token, getConfig(), now,
|
||||
shouldCancelAtEnd);
|
||||
|
||||
addTokenToList(dtr);
|
||||
|
||||
@ -317,7 +322,11 @@ void setTimerForTokenRenewal(DelegationTokenToRenew token,
|
||||
|
||||
// cancel a token
|
||||
private void cancelToken(DelegationTokenToRenew t) {
|
||||
dtCancelThread.cancelToken(t.token, t.conf);
|
||||
if(t.shouldCancelAtEnd) {
|
||||
dtCancelThread.cancelToken(t.token, t.conf);
|
||||
} else {
|
||||
LOG.info("Did not cancel "+t);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -21,6 +21,7 @@
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
@ -243,16 +244,16 @@ static MyToken createTokens(Text renewer)
|
||||
/**
|
||||
* Basic idea of the test:
|
||||
* 1. create tokens.
|
||||
* 2. Mark one of them to be renewed in 2 seconds (istead of
|
||||
* 24 hourse)
|
||||
* 2. Mark one of them to be renewed in 2 seconds (instead of
|
||||
* 24 hours)
|
||||
* 3. register them for renewal
|
||||
* 4. sleep for 3 seconds
|
||||
* 5. count number of renewals (should 3 initial ones + one extra)
|
||||
* 6. register another token for 2 seconds
|
||||
* 7. cancel it immediately
|
||||
* 8. Sleep and check that the 2 seconds renew didn't happen
|
||||
* (totally 5 reneals)
|
||||
* 9. check cancelation
|
||||
* (totally 5 renewals)
|
||||
* 9. check cancellation
|
||||
* @throws IOException
|
||||
* @throws URISyntaxException
|
||||
*/
|
||||
@ -287,7 +288,7 @@ public void testDTRenewal () throws Exception {
|
||||
// register the tokens for renewal
|
||||
ApplicationId applicationId_0 =
|
||||
BuilderUtils.newApplicationId(0, 0);
|
||||
delegationTokenRenewer.addApplication(applicationId_0, ts);
|
||||
delegationTokenRenewer.addApplication(applicationId_0, ts, true);
|
||||
|
||||
// first 3 initial renewals + 1 real
|
||||
int numberOfExpectedRenewals = 3+1;
|
||||
@ -326,7 +327,7 @@ public void testDTRenewal () throws Exception {
|
||||
|
||||
|
||||
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
|
||||
delegationTokenRenewer.addApplication(applicationId_1, ts);
|
||||
delegationTokenRenewer.addApplication(applicationId_1, ts, true);
|
||||
delegationTokenRenewer.removeApplication(applicationId_1);
|
||||
|
||||
numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
|
||||
@ -347,4 +348,49 @@ public void testDTRenewal () throws Exception {
|
||||
//expected
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Basic idea of the test:
|
||||
* 1. register a token for 2 seconds with no cancel at the end
|
||||
* 2. cancel it immediately
|
||||
* 3. Sleep and check that the 2 seconds renew didn't happen
|
||||
* (totally 5 renewals)
|
||||
* 4. check cancellation
|
||||
* @throws IOException
|
||||
* @throws URISyntaxException
|
||||
*/
|
||||
@Test
|
||||
public void testDTRenewalWithNoCancel () throws Exception {
|
||||
MyFS dfs = (MyFS)FileSystem.get(conf);
|
||||
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
|
||||
|
||||
Credentials ts = new Credentials();
|
||||
MyToken token1 = dfs.getDelegationToken(new Text("user1"));
|
||||
|
||||
//to cause this one to be set for renew in 2 secs
|
||||
Renewer.tokenToRenewIn2Sec = token1;
|
||||
LOG.info("token="+token1+" should be renewed for 2 secs");
|
||||
|
||||
String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0";
|
||||
ts.addToken(new Text(nn1), token1);
|
||||
|
||||
|
||||
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
|
||||
delegationTokenRenewer.addApplication(applicationId_1, ts, false);
|
||||
delegationTokenRenewer.removeApplication(applicationId_1);
|
||||
|
||||
int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
|
||||
try {
|
||||
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
|
||||
} catch (InterruptedException e) {}
|
||||
LOG.info("Counter = " + Renewer.counter + ";t="+ Renewer.lastRenewed);
|
||||
|
||||
// counter and the token should still be the old ones
|
||||
assertEquals("renew wasn't called as many times as expected",
|
||||
numberOfExpectedRenewals, Renewer.counter);
|
||||
|
||||
// also renewing of the canceled token should not fail, because it has not
|
||||
// been canceled
|
||||
token1.renew(conf);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user