diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 3453dfff94..05e71f5b6a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -78,6 +78,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3217. Reenabled and fixed bugs in the failing ant test TestAuditLogger. (Devaraj K via vinodkv) + MAPREDUCE-3291. App fail to launch due to delegation token not + found in cache (Robert Evans via mahadev) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 03fc8836d8..f77b6e8674 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -389,6 +389,8 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( appContext.setApplicationName( // Job name jobConf.get(JobContext.JOB_NAME, YarnConfiguration.DEFAULT_APPLICATION_NAME)); + appContext.setCancelTokensWhenComplete( + conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true)); appContext.setAMContainerSpec(amContainer); // AM Container return appContext; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 2b454796c7..5ef71fea94 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -18,8 +18,11 @@ package org.apache.hadoop.yarn.api.records; +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ClientRMProtocol; /** @@ -148,4 +151,21 @@ public interface ApplicationSubmissionContext { @Public @Stable public void setAMContainerSpec(ContainerLaunchContext amContainer); + + /** + * @return true if tokens should be canceled when the app completes. + */ + @LimitedPrivate("mapreduce") + @Unstable + public boolean getCancelTokensWhenComplete(); + + /** + * Set to false if tokens should not be canceled when the app finished else + * false. WARNING: this is not recommended unless you want your single job + * tokens to be reused by others jobs. + * @param cancel true if tokens should be canceled when the app finishes. + */ + @LimitedPrivate("mapreduce") + @Unstable + public void setCancelTokensWhenComplete(boolean cancel); } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 1f8b5c24b1..9d66a167b4 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -206,6 +206,19 @@ public void setAMContainerSpec(ContainerLaunchContext amContainer) { } this.amContainer = amContainer; } + + @Override + public boolean getCancelTokensWhenComplete() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + //There is a default so cancelTokens should never be null + return p.getCancelTokensWhenComplete(); + } + + @Override + public void setCancelTokensWhenComplete(boolean cancel) { + maybeInitBuilder(); + builder.setCancelTokensWhenComplete(cancel); + } private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { return new PriorityPBImpl(p); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index a6dfecd71c..ae6fd1b046 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -216,6 +216,7 @@ message ApplicationSubmissionContextProto { optional string queue = 4 [default = "default"]; optional PriorityProto priority = 5; optional ContainerLaunchContextProto am_container_spec = 6; + optional bool cancel_tokens_when_complete = 7 [default = true]; } enum ApplicationAccessTypeProto { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index e5a90cab55..4c4334c4dc 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -278,7 +278,8 @@ protected synchronized void submitApplication( // Setup tokens for renewal if (UserGroupInformation.isSecurityEnabled()) { this.rmContext.getDelegationTokenRenewer().addApplication( - applicationId,parseCredentials(submissionContext) + applicationId,parseCredentials(submissionContext), + submissionContext.getCancelTokensWhenComplete() ); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index b3ab9a1c4e..a7d2e4582d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -108,15 +108,17 @@ private static class DelegationTokenToRenew { public final Configuration conf; public long expirationDate; public TimerTask timerTask; + public final boolean shouldCancelAtEnd; public DelegationTokenToRenew( ApplicationId jId, Token token, - Configuration conf, long expirationDate) { + Configuration conf, long expirationDate, boolean shouldCancelAtEnd) { this.token = token; this.applicationId = jId; this.conf = conf; this.expirationDate = expirationDate; this.timerTask = null; + this.shouldCancelAtEnd = shouldCancelAtEnd; if (this.token==null || this.applicationId==null || this.conf==null) { throw new IllegalArgumentException("Invalid params to renew token" + ";token=" + this.token + @@ -218,10 +220,12 @@ private void addTokenToList(DelegationTokenToRenew t) { * Add application tokens for renewal. * @param applicationId added application * @param ts tokens + * @param shouldCancelAtEnd true if tokens should be canceled when the app is + * done else false. * @throws IOException */ public synchronized void addApplication( - ApplicationId applicationId, Credentials ts) + ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd) throws IOException { if (ts == null) { return; //nothing to add @@ -239,7 +243,8 @@ public synchronized void addApplication( // first renew happens immediately if (token.isManaged()) { DelegationTokenToRenew dtr = - new DelegationTokenToRenew(applicationId, token, getConfig(), now); + new DelegationTokenToRenew(applicationId, token, getConfig(), now, + shouldCancelAtEnd); addTokenToList(dtr); @@ -317,7 +322,11 @@ void setTimerForTokenRenewal(DelegationTokenToRenew token, // cancel a token private void cancelToken(DelegationTokenToRenew t) { - dtCancelThread.cancelToken(t.token, t.conf); + if(t.shouldCancelAtEnd) { + dtCancelThread.cancelToken(t.token, t.conf); + } else { + LOG.info("Did not cancel "+t); + } } /** diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 61870a817d..9448fe0ed4 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.net.URI; @@ -243,16 +244,16 @@ static MyToken createTokens(Text renewer) /** * Basic idea of the test: * 1. create tokens. - * 2. Mark one of them to be renewed in 2 seconds (istead of - * 24 hourse) + * 2. Mark one of them to be renewed in 2 seconds (instead of + * 24 hours) * 3. register them for renewal * 4. sleep for 3 seconds * 5. count number of renewals (should 3 initial ones + one extra) * 6. register another token for 2 seconds * 7. cancel it immediately * 8. Sleep and check that the 2 seconds renew didn't happen - * (totally 5 reneals) - * 9. check cancelation + * (totally 5 renewals) + * 9. check cancellation * @throws IOException * @throws URISyntaxException */ @@ -287,7 +288,7 @@ public void testDTRenewal () throws Exception { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - delegationTokenRenewer.addApplication(applicationId_0, ts); + delegationTokenRenewer.addApplication(applicationId_0, ts, true); // first 3 initial renewals + 1 real int numberOfExpectedRenewals = 3+1; @@ -326,7 +327,7 @@ public void testDTRenewal () throws Exception { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); - delegationTokenRenewer.addApplication(applicationId_1, ts); + delegationTokenRenewer.addApplication(applicationId_1, ts, true); delegationTokenRenewer.removeApplication(applicationId_1); numberOfExpectedRenewals = Renewer.counter; // number of renewals so far @@ -347,4 +348,49 @@ public void testDTRenewal () throws Exception { //expected } } + + /** + * Basic idea of the test: + * 1. register a token for 2 seconds with no cancel at the end + * 2. cancel it immediately + * 3. Sleep and check that the 2 seconds renew didn't happen + * (totally 5 renewals) + * 4. check cancellation + * @throws IOException + * @throws URISyntaxException + */ + @Test + public void testDTRenewalWithNoCancel () throws Exception { + MyFS dfs = (MyFS)FileSystem.get(conf); + LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode()); + + Credentials ts = new Credentials(); + MyToken token1 = dfs.getDelegationToken(new Text("user1")); + + //to cause this one to be set for renew in 2 secs + Renewer.tokenToRenewIn2Sec = token1; + LOG.info("token="+token1+" should be renewed for 2 secs"); + + String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0"; + ts.addToken(new Text(nn1), token1); + + + ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); + delegationTokenRenewer.addApplication(applicationId_1, ts, false); + delegationTokenRenewer.removeApplication(applicationId_1); + + int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far + try { + Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew + } catch (InterruptedException e) {} + LOG.info("Counter = " + Renewer.counter + ";t="+ Renewer.lastRenewed); + + // counter and the token should still be the old ones + assertEquals("renew wasn't called as many times as expected", + numberOfExpectedRenewals, Renewer.counter); + + // also renewing of the canceled token should not fail, because it has not + // been canceled + token1.renew(conf); + } }