diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 125ff94d1a..7e30ac97c8 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1076,6 +1076,9 @@ Release 2.7.2 - UNRELEASED YARN-4209. RMStateStore FENCED state doesn’t work due to updateFencedState called by stateMachine.doTransition. (Zhihai Xu via rohithsharmaks) + YARN-4041. Slow delegation token renewal can severely prolong RM recovery + (Sunil G via jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 43a3a51a46..41254d8124 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -946,14 +946,16 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { } if (UserGroupInformation.isSecurityEnabled()) { - // synchronously renew delegation token on recovery. + // asynchronously renew delegation token on recovery. try { - app.rmContext.getDelegationTokenRenewer().addApplicationSync( - app.getApplicationId(), app.parseCredentials(), - app.submissionContext.getCancelTokensWhenComplete(), app.getUser()); + app.rmContext.getDelegationTokenRenewer() + .addApplicationAsyncDuringRecovery(app.getApplicationId(), + app.parseCredentials(), + app.submissionContext.getCancelTokensWhenComplete(), + app.getUser()); } catch (Exception e) { - String msg = "Failed to renew token for " + app.applicationId - + " on recovery : " + e.getMessage(); + String msg = "Failed to fetch user credentials from application:" + + e.getMessage(); app.diagnostics.append(msg); LOG.error(msg, e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index 426e460ec6..cca14e9472 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -387,6 +387,25 @@ public void addApplicationAsync(ApplicationId applicationId, Credentials ts, applicationId, ts, shouldCancelAtEnd, user)); } + /** + * Asynchronously add application tokens for renewal. + * + * @param applicationId + * added application + * @param ts + * tokens + * @param shouldCancelAtEnd + * true if tokens should be canceled when the app is done else false. + * @param user + * user + */ + public void addApplicationAsyncDuringRecovery(ApplicationId applicationId, + Credentials ts, boolean shouldCancelAtEnd, String user) { + processDelegationTokenRenewerEvent( + new DelegationTokenRenewerAppRecoverEvent(applicationId, ts, + shouldCancelAtEnd, user)); + } + /** * Synchronously renew delegation tokens. * @param user user @@ -398,7 +417,7 @@ public void addApplicationSync(ApplicationId applicationId, Credentials ts, applicationId, ts, shouldCancelAtEnd, user)); } - private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt) + private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt) throws IOException, InterruptedException { ApplicationId applicationId = evt.getApplicationId(); Credentials ts = evt.getCredentials(); @@ -842,6 +861,10 @@ public void run() { DelegationTokenRenewerAppSubmitEvent appSubmitEvt = (DelegationTokenRenewerAppSubmitEvent) evt; handleDTRenewerAppSubmitEvent(appSubmitEvt); + } else if (evt instanceof DelegationTokenRenewerAppRecoverEvent) { + DelegationTokenRenewerAppRecoverEvent appRecoverEvt = + (DelegationTokenRenewerAppRecoverEvent) evt; + handleDTRenewerAppRecoverEvent(appRecoverEvt); } else if (evt.getType().equals( DelegationTokenRenewerEventType.FINISH_APPLICATION)) { DelegationTokenRenewer.this.handleAppFinishEvent(evt); @@ -876,17 +899,50 @@ private void handleDTRenewerAppSubmitEvent( } } } - - static class DelegationTokenRenewerAppSubmitEvent extends + + @SuppressWarnings("unchecked") + private void handleDTRenewerAppRecoverEvent( + DelegationTokenRenewerAppRecoverEvent event) { + try { + // Setup tokens for renewal during recovery + DelegationTokenRenewer.this.handleAppSubmitEvent(event); + } catch (Throwable t) { + LOG.warn( + "Unable to add the application to the delegation token renewer.", t); + } + } + + static class DelegationTokenRenewerAppSubmitEvent + extends + AbstractDelegationTokenRenewerAppEvent { + public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId, + Credentials credentails, boolean shouldCancelAtEnd, String user) { + super(appId, credentails, shouldCancelAtEnd, user, + DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION); + } + } + + static class DelegationTokenRenewerAppRecoverEvent + extends + AbstractDelegationTokenRenewerAppEvent { + public DelegationTokenRenewerAppRecoverEvent(ApplicationId appId, + Credentials credentails, boolean shouldCancelAtEnd, String user) { + super(appId, credentails, shouldCancelAtEnd, user, + DelegationTokenRenewerEventType.RECOVER_APPLICATION); + } + } + + static class AbstractDelegationTokenRenewerAppEvent extends DelegationTokenRenewerEvent { private Credentials credentials; private boolean shouldCancelAtEnd; private String user; - public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId, - Credentials credentails, boolean shouldCancelAtEnd, String user) { - super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION); + public AbstractDelegationTokenRenewerAppEvent(ApplicationId appId, + Credentials credentails, boolean shouldCancelAtEnd, String user, + DelegationTokenRenewerEventType type) { + super(appId, type); this.credentials = credentails; this.shouldCancelAtEnd = shouldCancelAtEnd; this.user = user; @@ -907,6 +963,7 @@ public String getUser() { enum DelegationTokenRenewerEventType { VERIFY_AND_START_APPLICATION, + RECOVER_APPLICATION, FINISH_APPLICATION } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 531a4a90f1..cd84208ee2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1179,24 +1179,24 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer() // Need to wait for a while as now token renewal happens on another thread // and is asynchronous in nature. - waitForTokensToBeRenewed(rm2); + waitForTokensToBeRenewed(rm2, tokenSet); // verify tokens are properly populated back to rm2 DelegationTokenRenewer Assert.assertEquals(tokenSet, rm2.getRMContext() .getDelegationTokenRenewer().getDelegationTokens()); } - private void waitForTokensToBeRenewed(MockRM rm2) throws Exception { - int waitCnt = 20; - boolean atleastOneAppInNEWState = true; - while (waitCnt-- > 0 && atleastOneAppInNEWState) { - atleastOneAppInNEWState = false; - for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) { - if (rmApp.getState() == RMAppState.NEW) { - Thread.sleep(1000); - atleastOneAppInNEWState = true; - break; - } + private void waitForTokensToBeRenewed(MockRM rm2, + HashSet> tokenSet) throws Exception { + // Max wait time to get the token renewal can be kept as 1sec (100 * 10ms) + int waitCnt = 100; + while (waitCnt-- > 0) { + if (tokenSet.equals(rm2.getRMContext().getDelegationTokenRenewer() + .getDelegationTokens())) { + // Stop waiting as tokens are populated to DelegationTokenRenewer. + break; + } else { + Thread.sleep(10); } } }