diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java index 0ddf287f35..f1d5185b0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java @@ -24,6 +24,7 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -183,6 +184,13 @@ public void initialize() throws Exception { conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, HttpConfig.Policy.HTTP_ONLY.name()); } + if (!withKerberosLogin) { + // For timeline delegation token based access, set delegation token renew + // interval to 100 ms. to test if timeline delegation token for the app is + // renewed automatically if app is still alive. + conf.setLong( + YarnConfiguration.TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL, 100); + } UserGroupInformation.setConfiguration(conf); collectorManager = new DummyNodeTimelineCollectorManager(); auxService = PerNodeTimelineCollectorsAuxService.launchServer( @@ -282,12 +290,12 @@ private static TimelineEntity readEntityFile(File entityFile) } private void publishAndVerifyEntity(ApplicationId appId, File entityTypeDir, - String entityType) throws Exception { + String entityType, int numEntities) throws Exception { TimelineV2Client client = createTimelineClientForUGI(appId); try { // Sync call. Results available immediately. client.putEntities(createEntity("entity1", entityType)); - assertEquals(1, entityTypeDir.listFiles().length); + assertEquals(numEntities, entityTypeDir.listFiles().length); verifyEntity(entityTypeDir, "entity1", entityType); // Async call. client.putEntitiesAsync(createEntity("entity2", entityType)); @@ -312,12 +320,22 @@ public void testPutTimelineEntities() throws Exception { KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable() { @Override public Void call() throws Exception { - publishAndVerifyEntity(appId, entityTypeDir, entityType); + publishAndVerifyEntity(appId, entityTypeDir, entityType, 1); return null; } }); } else { - publishAndVerifyEntity(appId, entityTypeDir, entityType); + publishAndVerifyEntity(appId, entityTypeDir, entityType, 1); + // Verify if token is renewed automatically and entities can still be + // published. + Thread.sleep(1000); + publishAndVerifyEntity(appId, entityTypeDir, entityType, 2); + AppLevelTimelineCollector collector = + (AppLevelTimelineCollector) collectorManager.get(appId); + assertNotNull(collector); + verify(collectorManager.getTokenManagerService(), atLeastOnce()). + renewToken(eq(collector.getDelegationTokenForApp()), + any(String.class)); } // Wait for async entity to be published. for (int i = 0; i < 50; i++) { @@ -330,6 +348,7 @@ public Void call() throws Exception { verifyEntity(entityTypeDir, "entity2", entityType); AppLevelTimelineCollector collector = (AppLevelTimelineCollector)collectorManager.get(appId); + assertNotNull(collector); auxService.removeApplication(appId); verify(collectorManager.getTokenManagerService()).cancelToken( eq(collector.getDelegationTokenForApp()), any(String.class)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index 08ac89487e..8b8534e750 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; +import java.util.concurrent.Future; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -49,6 +51,7 @@ public class AppLevelTimelineCollector extends TimelineCollector { private final TimelineCollectorContext context; private UserGroupInformation currentUser; private Token delegationTokenForApp; + private Future renewalFuture; public AppLevelTimelineCollector(ApplicationId appId) { this(appId, null); @@ -70,9 +73,15 @@ public String getAppUser() { return appUser; } - void setDelegationTokenForApp( - Token token) { + void setDelegationTokenAndFutureForApp( + Token token, + Future appRenewalFuture) { this.delegationTokenForApp = token; + this.renewalFuture = appRenewalFuture; + } + + void setRenewalFutureForApp(Future appRenewalFuture) { + this.renewalFuture = appRenewalFuture; } @VisibleForTesting @@ -100,6 +109,9 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { + if (renewalFuture != null && !renewalFuture.isDone()) { + renewalFuture.cancel(true); + } super.serviceStop(); } @@ -107,5 +119,4 @@ protected void serviceStop() throws Exception { public TimelineCollectorContext getTimelineEntityContext() { return context; } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java index 02362b2dc6..1b1b373d24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java @@ -23,6 +23,9 @@ import java.net.URI; import java.util.LinkedHashSet; import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -32,6 +35,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Time; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -50,6 +54,7 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +81,12 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { private UserGroupInformation loginUGI; + private ScheduledThreadPoolExecutor tokenRenewalExecutor; + + private long tokenRenewInterval; + + private static final long TIME_BEFORE_RENEW_DATE = 10 * 1000; // 10 seconds. + static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; @VisibleForTesting @@ -93,6 +104,9 @@ protected void serviceInit(Configuration conf) throws Exception { tokenMgrService = createTokenManagerService(); addService(tokenMgrService); this.loginUGI = UserGroupInformation.getCurrentUser(); + tokenRenewInterval = conf.getLong( + YarnConfiguration.TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL, + YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL); super.serviceInit(conf); } @@ -109,6 +123,9 @@ protected void serviceStart() throws Exception { } this.loginUGI = UserGroupInformation.getLoginUser(); } + tokenRenewalExecutor = new ScheduledThreadPoolExecutor( + 1, new ThreadFactoryBuilder().setNameFormat( + "App Collector Token Renewal thread").build()); super.serviceStart(); startWebApp(); } @@ -139,6 +156,9 @@ protected void serviceStop() throws Exception { if (timelineRestServer != null) { timelineRestServer.stop(); } + if (tokenRenewalExecutor != null) { + tokenRenewalExecutor.shutdownNow(); + } super.serviceStop(); } @@ -152,6 +172,21 @@ public Token generateTokenForAppCollector( return token; } + @VisibleForTesting + public long renewTokenForAppCollector( + AppLevelTimelineCollector appCollector) throws IOException { + if (appCollector.getDelegationTokenForApp() != null) { + TimelineDelegationTokenIdentifier identifier = + appCollector.getDelegationTokenForApp().decodeIdentifier(); + return tokenMgrService.renewToken(appCollector.getDelegationTokenForApp(), + identifier.getRenewer().toString()); + } else { + LOG.info("Delegation token not available for renewal for app " + + appCollector.getTimelineEntityContext().getAppId()); + return -1; + } + } + @VisibleForTesting public void cancelTokenForAppCollector( AppLevelTimelineCollector appCollector) throws IOException { @@ -174,13 +209,19 @@ protected void doPostPut(ApplicationId appId, TimelineCollector collector) { (AppLevelTimelineCollector)collector; Token timelineToken = generateTokenForAppCollector(appCollector.getAppUser()); - appCollector.setDelegationTokenForApp(timelineToken); + long renewalDelay = (tokenRenewInterval > TIME_BEFORE_RENEW_DATE) ? + tokenRenewInterval - TIME_BEFORE_RENEW_DATE : tokenRenewInterval; + Future renewalFuture = + tokenRenewalExecutor.schedule(new CollectorTokenRenewer(appId), + renewalDelay, TimeUnit.MILLISECONDS); + appCollector.setDelegationTokenAndFutureForApp(timelineToken, + renewalFuture); token = org.apache.hadoop.yarn.api.records.Token.newInstance( timelineToken.getIdentifier(), timelineToken.getKind().toString(), timelineToken.getPassword(), timelineToken.getService().toString()); } // Report to NM if a new collector is added. - reportNewCollectorToNM(appId, token); + reportNewCollectorInfoToNM(appId, token); } catch (YarnException | IOException e) { // throw exception here as it cannot be used if failed communicate with NM LOG.error("Failed to communicate with NM Collector Service for " + appId); @@ -192,7 +233,7 @@ protected void doPostPut(ApplicationId appId, TimelineCollector collector) { protected void postRemove(ApplicationId appId, TimelineCollector collector) { if (collector instanceof AppLevelTimelineCollector) { try { - cancelTokenForAppCollector((AppLevelTimelineCollector)collector); + cancelTokenForAppCollector((AppLevelTimelineCollector) collector); } catch (IOException e) { LOG.warn("Failed to cancel token for app collector with appId " + appId, e); @@ -244,7 +285,7 @@ private void startWebApp() { timelineRestServerBindAddress); } - private void reportNewCollectorToNM(ApplicationId appId, + private void reportNewCollectorInfoToNM(ApplicationId appId, org.apache.hadoop.yarn.api.records.Token token) throws YarnException, IOException { ReportNewCollectorInfoRequest request = @@ -321,4 +362,43 @@ protected CollectorNodemanagerProtocol getNMCollectorService() { public String getRestServerBindAddress() { return timelineRestServerBindAddress; } + + private final class CollectorTokenRenewer implements Runnable { + private ApplicationId appId; + private CollectorTokenRenewer(ApplicationId applicationId) { + appId = applicationId; + } + + @Override + public void run() { + TimelineCollector collector = get(appId); + if (collector == null) { + LOG.info("Cannot find active collector while renewing token for " + + appId); + return; + } + AppLevelTimelineCollector appCollector = + (AppLevelTimelineCollector) collector; + + synchronized (collector) { + if (!collector.isStopped()) { + try { + long newExpirationTime = renewTokenForAppCollector(appCollector); + if (newExpirationTime > 0) { + long renewInterval = newExpirationTime - Time.now(); + long renewalDelay = (renewInterval > TIME_BEFORE_RENEW_DATE) ? + renewInterval - TIME_BEFORE_RENEW_DATE : renewInterval; + LOG.info("Renewed token for " + appId + " with new expiration " + + "timestamp = " + newExpirationTime); + Future renewalFuture = tokenRenewalExecutor.schedule( + this, renewalDelay, TimeUnit.MILLISECONDS); + appCollector.setRenewalFutureForApp(renewalFuture); + } + } catch (Exception e) { + LOG.warn("Unable to renew token for " + appId); + } + } + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 306806fe0a..8202431459 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -63,6 +63,8 @@ public abstract class TimelineCollector extends CompositeService { private volatile boolean readyToAggregate = false; + private volatile boolean isStopped = false; + public TimelineCollector(String name) { super(name); } @@ -79,9 +81,14 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { + isStopped = true; super.serviceStop(); } + boolean isStopped() { + return isStopped; + } + protected void setWriter(TimelineWriter w) { this.writer = w; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 972bc012b6..7909a2e82f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -184,9 +184,11 @@ public boolean remove(ApplicationId appId) { if (collector == null) { LOG.error("the collector for " + appId + " does not exist!"); } else { - postRemove(appId, collector); - // stop the service to do clean up - collector.stop(); + synchronized (collector) { + postRemove(appId, collector); + // stop the service to do clean up + collector.stop(); + } LOG.info("The collector service for " + appId + " was removed"); } return collector != null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java index de7db58211..39f0fc685f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java @@ -34,6 +34,7 @@ */ public class TimelineV2DelegationTokenSecretManagerService extends TimelineDelgationTokenSecretManagerService { + public TimelineV2DelegationTokenSecretManagerService() { super(TimelineV2DelegationTokenSecretManagerService.class.getName()); } @@ -54,6 +55,11 @@ public Token generateToken( getTimelineDelegationTokenSecretManager()).generateToken(ugi, renewer); } + public long renewToken(Token token, + String renewer) throws IOException { + return getTimelineDelegationTokenSecretManager().renewToken(token, renewer); + } + public void cancelToken(Token token, String canceller) throws IOException { getTimelineDelegationTokenSecretManager().cancelToken(token, canceller);