From 7fd6ae24798cd3fdd77dbb00089a922407026e02 Mon Sep 17 00:00:00 2001 From: Jian He Date: Fri, 18 Aug 2017 23:20:44 -0700 Subject: [PATCH] YARN-6134. [ATSv2 Security] Regenerate delegation token for app just before token expires if app collector is active. Contributed by Varun Saxena --- .../security/TestTimelineAuthFilterForV2.java | 104 +++++++++++++-- .../collector/AppLevelTimelineCollector.java | 35 ++++-- .../NodeTimelineCollectorManager.java | 118 +++++++++++++----- ...V2DelegationTokenSecretManagerService.java | 11 ++ 4 files changed, 222 insertions(+), 46 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java index f1d5185b0b..bc1594c72e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.security; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -27,6 +28,7 @@ import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -51,6 +53,8 @@ import org.apache.hadoop.security.authentication.KerberosTestUtils; import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; import org.apache.hadoop.security.ssl.KeyStoreTestUtil; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineV2Client; @@ -190,6 +194,10 @@ public void initialize() throws Exception { // renewed automatically if app is still alive. conf.setLong( YarnConfiguration.TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL, 100); + // Set token max lifetime to 4 seconds to test if timeline delegation + // token for the app is regenerated automatically if app is still alive. + conf.setLong( + YarnConfiguration.TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME, 4000); } UserGroupInformation.setConfiguration(conf); collectorManager = new DummyNodeTimelineCollectorManager(); @@ -205,9 +213,8 @@ public void initialize() throws Exception { if (!withKerberosLogin) { AppLevelTimelineCollector collector = (AppLevelTimelineCollector)collectorManager.get(appId); - org.apache.hadoop.security.token.Token - token = - collector.getDelegationTokenForApp(); + Token token = + collector.getDelegationTokenForApp(); token.setService(new Text("localhost" + token.getService().toString(). substring(token.getService().toString().indexOf(":")))); UserGroupInformation.getCurrentUser().addToken(token); @@ -304,6 +311,20 @@ private void publishAndVerifyEntity(ApplicationId appId, File entityTypeDir, } } + private boolean publishWithRetries(ApplicationId appId, File entityTypeDir, + String entityType, int numEntities) throws Exception { + for (int i = 0; i < 10; i++) { + try { + publishAndVerifyEntity(appId, entityTypeDir, entityType, numEntities); + } catch (YarnException e) { + Thread.sleep(50); + continue; + } + return true; + } + return false; + } + @Test public void testPutTimelineEntities() throws Exception { final String entityType = "dummy_type"; @@ -325,17 +346,63 @@ public Void call() throws Exception { } }); } else { - publishAndVerifyEntity(appId, entityTypeDir, entityType, 1); + assertTrue("Entities should have been published successfully.", + publishWithRetries(appId, entityTypeDir, entityType, 1)); + + AppLevelTimelineCollector collector = + (AppLevelTimelineCollector) collectorManager.get(appId); + Token token = + collector.getDelegationTokenForApp(); + assertNotNull(token); + // Verify if token is renewed automatically and entities can still be // published. Thread.sleep(1000); - publishAndVerifyEntity(appId, entityTypeDir, entityType, 2); - AppLevelTimelineCollector collector = - (AppLevelTimelineCollector) collectorManager.get(appId); + // Entities should publish successfully after renewal. + assertTrue("Entities should have been published successfully.", + publishWithRetries(appId, entityTypeDir, entityType, 2)); assertNotNull(collector); verify(collectorManager.getTokenManagerService(), atLeastOnce()). renewToken(eq(collector.getDelegationTokenForApp()), any(String.class)); + + // Wait to ensure lifetime of token expires and ensure its regenerated + // automatically. + Thread.sleep(3000); + for (int i = 0; i < 40; i++) { + if (!token.equals(collector.getDelegationTokenForApp())) { + break; + } + Thread.sleep(50); + } + assertNotEquals("Token should have been regenerated.", token, + collector.getDelegationTokenForApp()); + Thread.sleep(1000); + // Try publishing with the old token in UGI. Publishing should fail due + // to invalid token. + try { + publishAndVerifyEntity(appId, entityTypeDir, entityType, 2); + fail("Exception should have been thrown due to Invalid Token."); + } catch (YarnException e) { + assertTrue("Exception thrown should have been due to Invalid Token.", + e.getCause().getMessage().contains("InvalidToken")); + } + + // Update the regenerated token in UGI and retry publishing entities. + Token regeneratedToken = + collector.getDelegationTokenForApp(); + regeneratedToken.setService(new Text("localhost" + + regeneratedToken.getService().toString().substring( + regeneratedToken.getService().toString().indexOf(":")))); + UserGroupInformation.getCurrentUser().addToken(regeneratedToken); + assertTrue("Entities should have been published successfully.", + publishWithRetries(appId, entityTypeDir, entityType, 2)); + // Token was generated twice, once when app collector was created and + // later after token lifetime expiry. + verify(collectorManager.getTokenManagerService(), times(2)). + generateToken(any(UserGroupInformation.class), any(String.class)); + assertEquals(1, ((DummyNodeTimelineCollectorManager) collectorManager). + getTokenExpiredCnt()); } // Wait for async entity to be published. for (int i = 0; i < 50; i++) { @@ -359,14 +426,35 @@ public Void call() throws Exception { private static class DummyNodeTimelineCollectorManager extends NodeTimelineCollectorManager { + private volatile int tokenExpiredCnt = 0; DummyNodeTimelineCollectorManager() { super(); } + private int getTokenExpiredCnt() { + return tokenExpiredCnt; + } + @Override protected TimelineV2DelegationTokenSecretManagerService createTokenManagerService() { - return spy(new TimelineV2DelegationTokenSecretManagerService()); + return spy(new TimelineV2DelegationTokenSecretManagerService() { + @Override + protected AbstractDelegationTokenSecretManager + + createTimelineDelegationTokenSecretManager(long secretKeyInterval, + long tokenMaxLifetime, long tokenRenewInterval, + long tokenRemovalScanInterval) { + return spy(new TimelineV2DelegationTokenSecretManager( + secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 2000L) { + @Override + protected void logExpireToken( + TimelineDelegationTokenIdentifier ident) throws IOException { + tokenExpiredCnt++; + } + }); + } + }); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index 8b8534e750..38221fe98a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -51,7 +51,9 @@ public class AppLevelTimelineCollector extends TimelineCollector { private final TimelineCollectorContext context; private UserGroupInformation currentUser; private Token delegationTokenForApp; - private Future renewalFuture; + private long tokenMaxDate = 0; + private String tokenRenewer; + private Future renewalOrRegenerationFuture; public AppLevelTimelineCollector(ApplicationId appId) { this(appId, null); @@ -75,13 +77,32 @@ public String getAppUser() { void setDelegationTokenAndFutureForApp( Token token, - Future appRenewalFuture) { + Future appRenewalOrRegenerationFuture, long tknMaxDate, + String renewer) { this.delegationTokenForApp = token; - this.renewalFuture = appRenewalFuture; + this.tokenMaxDate = tknMaxDate; + this.tokenRenewer = renewer; + this.renewalOrRegenerationFuture = appRenewalOrRegenerationFuture; } - void setRenewalFutureForApp(Future appRenewalFuture) { - this.renewalFuture = appRenewalFuture; + void setRenewalOrRegenerationFutureForApp( + Future appRenewalOrRegenerationFuture) { + this.renewalOrRegenerationFuture = appRenewalOrRegenerationFuture; + } + + void cancelRenewalOrRegenerationFutureForApp() { + if (renewalOrRegenerationFuture != null && + !renewalOrRegenerationFuture.isDone()) { + renewalOrRegenerationFuture.cancel(true); + } + } + + long getAppDelegationTokenMaxDate() { + return tokenMaxDate; + } + + String getAppDelegationTokenRenewer() { + return tokenRenewer; } @VisibleForTesting @@ -109,9 +130,7 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { - if (renewalFuture != null && !renewalFuture.isDone()) { - renewalFuture.cancel(true); - } + cancelRenewalOrRegenerationFutureForApp(); super.serviceStop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java index 1b1b373d24..68a68f0ded 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java @@ -87,6 +87,8 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { private static final long TIME_BEFORE_RENEW_DATE = 10 * 1000; // 10 seconds. + private static final long TIME_BEFORE_EXPIRY = 5 * 60 * 1000; // 5 minutes. + static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; @VisibleForTesting @@ -176,10 +178,8 @@ public Token generateTokenForAppCollector( public long renewTokenForAppCollector( AppLevelTimelineCollector appCollector) throws IOException { if (appCollector.getDelegationTokenForApp() != null) { - TimelineDelegationTokenIdentifier identifier = - appCollector.getDelegationTokenForApp().decodeIdentifier(); return tokenMgrService.renewToken(appCollector.getDelegationTokenForApp(), - identifier.getRenewer().toString()); + appCollector.getAppDelegationTokenRenewer()); } else { LOG.info("Delegation token not available for renewal for app " + appCollector.getTimelineEntityContext().getAppId()); @@ -196,6 +196,42 @@ public void cancelTokenForAppCollector( } } + private long getRenewalDelay(long renewInterval) { + return ((renewInterval > TIME_BEFORE_RENEW_DATE) ? + renewInterval - TIME_BEFORE_RENEW_DATE : renewInterval); + } + + private long getRegenerationDelay(long tokenMaxDate) { + long regenerateTime = tokenMaxDate - Time.now(); + return ((regenerateTime > TIME_BEFORE_EXPIRY) ? + regenerateTime - TIME_BEFORE_EXPIRY : regenerateTime); + } + + private org.apache.hadoop.yarn.api.records.Token generateTokenAndSetTimer( + ApplicationId appId, AppLevelTimelineCollector appCollector) + throws IOException { + Token timelineToken = + generateTokenForAppCollector(appCollector.getAppUser()); + TimelineDelegationTokenIdentifier tokenId = + timelineToken.decodeIdentifier(); + long renewalDelay = getRenewalDelay(tokenRenewInterval); + long regenerationDelay = getRegenerationDelay(tokenId.getMaxDate()); + if (renewalDelay > 0 || regenerationDelay > 0) { + boolean isTimerForRenewal = renewalDelay < regenerationDelay; + Future renewalOrRegenerationFuture = tokenRenewalExecutor.schedule( + new CollectorTokenRenewer(appId, isTimerForRenewal), + isTimerForRenewal? renewalDelay : regenerationDelay, + TimeUnit.MILLISECONDS); + appCollector.setDelegationTokenAndFutureForApp(timelineToken, + renewalOrRegenerationFuture, tokenId.getMaxDate(), + tokenId.getRenewer().toString()); + } + LOG.info("Generated a new token " + timelineToken + " for app " + appId); + return org.apache.hadoop.yarn.api.records.Token.newInstance( + timelineToken.getIdentifier(), timelineToken.getKind().toString(), + timelineToken.getPassword(), timelineToken.getService().toString()); + } + @Override protected void doPostPut(ApplicationId appId, TimelineCollector collector) { try { @@ -206,19 +242,8 @@ protected void doPostPut(ApplicationId appId, TimelineCollector collector) { if (UserGroupInformation.isSecurityEnabled() && collector instanceof AppLevelTimelineCollector) { AppLevelTimelineCollector appCollector = - (AppLevelTimelineCollector)collector; - Token timelineToken = - generateTokenForAppCollector(appCollector.getAppUser()); - long renewalDelay = (tokenRenewInterval > TIME_BEFORE_RENEW_DATE) ? - tokenRenewInterval - TIME_BEFORE_RENEW_DATE : tokenRenewInterval; - Future renewalFuture = - tokenRenewalExecutor.schedule(new CollectorTokenRenewer(appId), - renewalDelay, TimeUnit.MILLISECONDS); - appCollector.setDelegationTokenAndFutureForApp(timelineToken, - renewalFuture); - token = org.apache.hadoop.yarn.api.records.Token.newInstance( - timelineToken.getIdentifier(), timelineToken.getKind().toString(), - timelineToken.getPassword(), timelineToken.getService().toString()); + (AppLevelTimelineCollector) collector; + token = generateTokenAndSetTimer(appId, appCollector); } // Report to NM if a new collector is added. reportNewCollectorInfoToNM(appId, token); @@ -365,16 +390,54 @@ public String getRestServerBindAddress() { private final class CollectorTokenRenewer implements Runnable { private ApplicationId appId; - private CollectorTokenRenewer(ApplicationId applicationId) { + // Indicates whether timer is for renewal or regeneration of token. + private boolean timerForRenewal = true; + private CollectorTokenRenewer(ApplicationId applicationId, + boolean forRenewal) { appId = applicationId; + timerForRenewal = forRenewal; + } + + private void renewToken(AppLevelTimelineCollector appCollector) + throws IOException { + long newExpirationTime = renewTokenForAppCollector(appCollector); + // Set renewal or regeneration timer based on delay. + long renewalDelay = 0; + if (newExpirationTime > 0) { + LOG.info("Renewed token for " + appId + " with new expiration " + + "timestamp = " + newExpirationTime); + renewalDelay = getRenewalDelay(newExpirationTime - Time.now()); + } + long regenerationDelay = + getRegenerationDelay(appCollector.getAppDelegationTokenMaxDate()); + if (renewalDelay > 0 || regenerationDelay > 0) { + this.timerForRenewal = renewalDelay < regenerationDelay; + Future renewalOrRegenerationFuture = tokenRenewalExecutor.schedule( + this, timerForRenewal ? renewalDelay : regenerationDelay, + TimeUnit.MILLISECONDS); + appCollector.setRenewalOrRegenerationFutureForApp( + renewalOrRegenerationFuture); + } + } + + private void regenerateToken(AppLevelTimelineCollector appCollector) + throws IOException { + org.apache.hadoop.yarn.api.records.Token token = + generateTokenAndSetTimer(appId, appCollector); + // Report to NM if a new collector is added. + try { + reportNewCollectorInfoToNM(appId, token); + } catch (YarnException e) { + LOG.warn("Unable to report regenerated token to NM for " + appId); + } } @Override public void run() { TimelineCollector collector = get(appId); if (collector == null) { - LOG.info("Cannot find active collector while renewing token for " + - appId); + LOG.info("Cannot find active collector while " + (timerForRenewal ? + "renewing" : "regenerating") + " token for " + appId); return; } AppLevelTimelineCollector appCollector = @@ -383,19 +446,14 @@ public void run() { synchronized (collector) { if (!collector.isStopped()) { try { - long newExpirationTime = renewTokenForAppCollector(appCollector); - if (newExpirationTime > 0) { - long renewInterval = newExpirationTime - Time.now(); - long renewalDelay = (renewInterval > TIME_BEFORE_RENEW_DATE) ? - renewInterval - TIME_BEFORE_RENEW_DATE : renewInterval; - LOG.info("Renewed token for " + appId + " with new expiration " + - "timestamp = " + newExpirationTime); - Future renewalFuture = tokenRenewalExecutor.schedule( - this, renewalDelay, TimeUnit.MILLISECONDS); - appCollector.setRenewalFutureForApp(renewalFuture); + if (timerForRenewal) { + renewToken(appCollector); + } else { + regenerateToken(appCollector); } } catch (Exception e) { - LOG.warn("Unable to renew token for " + appId); + LOG.warn("Unable to " + (timerForRenewal ? "renew" : "regenerate") + + " token for " + appId, e); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java index 39f0fc685f..de5ccdc89d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java @@ -20,6 +20,8 @@ import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.io.Text; @@ -73,6 +75,9 @@ public void cancelToken(Token token, public static class TimelineV2DelegationTokenSecretManager extends AbstractDelegationTokenSecretManager { + private static final Log LOG = + LogFactory.getLog(TimelineV2DelegationTokenSecretManager.class); + /** * Create a timeline v2 secret manager. * @param delegationKeyUpdateInterval the number of milliseconds for rolling @@ -111,5 +116,11 @@ public Token generateToken( public TimelineDelegationTokenIdentifier createIdentifier() { return new TimelineDelegationTokenIdentifier(); } + + @Override + protected void logExpireToken(TimelineDelegationTokenIdentifier ident) + throws IOException { + LOG.info("Token " + ident + " expired."); + } } }