YARN-6134. [ATSv2 Security] Regenerate delegation token for app just before token expires if app collector is active. Contributed by Varun Saxena
This commit is contained in:
parent
e276c75ec1
commit
7fd6ae2479
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.yarn.server.timelineservice.security;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
@ -27,6 +28,7 @@
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ -51,6 +53,8 @@
|
||||
import org.apache.hadoop.security.authentication.KerberosTestUtils;
|
||||
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
|
||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||
@ -190,6 +194,10 @@ public void initialize() throws Exception {
|
||||
// renewed automatically if app is still alive.
|
||||
conf.setLong(
|
||||
YarnConfiguration.TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL, 100);
|
||||
// Set token max lifetime to 4 seconds to test if timeline delegation
|
||||
// token for the app is regenerated automatically if app is still alive.
|
||||
conf.setLong(
|
||||
YarnConfiguration.TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME, 4000);
|
||||
}
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
collectorManager = new DummyNodeTimelineCollectorManager();
|
||||
@ -205,9 +213,8 @@ public void initialize() throws Exception {
|
||||
if (!withKerberosLogin) {
|
||||
AppLevelTimelineCollector collector =
|
||||
(AppLevelTimelineCollector)collectorManager.get(appId);
|
||||
org.apache.hadoop.security.token.Token
|
||||
<TimelineDelegationTokenIdentifier> token =
|
||||
collector.getDelegationTokenForApp();
|
||||
Token<TimelineDelegationTokenIdentifier> token =
|
||||
collector.getDelegationTokenForApp();
|
||||
token.setService(new Text("localhost" + token.getService().toString().
|
||||
substring(token.getService().toString().indexOf(":"))));
|
||||
UserGroupInformation.getCurrentUser().addToken(token);
|
||||
@ -304,6 +311,20 @@ private void publishAndVerifyEntity(ApplicationId appId, File entityTypeDir,
|
||||
}
|
||||
}
|
||||
|
||||
private boolean publishWithRetries(ApplicationId appId, File entityTypeDir,
|
||||
String entityType, int numEntities) throws Exception {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try {
|
||||
publishAndVerifyEntity(appId, entityTypeDir, entityType, numEntities);
|
||||
} catch (YarnException e) {
|
||||
Thread.sleep(50);
|
||||
continue;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutTimelineEntities() throws Exception {
|
||||
final String entityType = "dummy_type";
|
||||
@ -325,17 +346,63 @@ public Void call() throws Exception {
|
||||
}
|
||||
});
|
||||
} else {
|
||||
publishAndVerifyEntity(appId, entityTypeDir, entityType, 1);
|
||||
assertTrue("Entities should have been published successfully.",
|
||||
publishWithRetries(appId, entityTypeDir, entityType, 1));
|
||||
|
||||
AppLevelTimelineCollector collector =
|
||||
(AppLevelTimelineCollector) collectorManager.get(appId);
|
||||
Token<TimelineDelegationTokenIdentifier> token =
|
||||
collector.getDelegationTokenForApp();
|
||||
assertNotNull(token);
|
||||
|
||||
// Verify if token is renewed automatically and entities can still be
|
||||
// published.
|
||||
Thread.sleep(1000);
|
||||
publishAndVerifyEntity(appId, entityTypeDir, entityType, 2);
|
||||
AppLevelTimelineCollector collector =
|
||||
(AppLevelTimelineCollector) collectorManager.get(appId);
|
||||
// Entities should publish successfully after renewal.
|
||||
assertTrue("Entities should have been published successfully.",
|
||||
publishWithRetries(appId, entityTypeDir, entityType, 2));
|
||||
assertNotNull(collector);
|
||||
verify(collectorManager.getTokenManagerService(), atLeastOnce()).
|
||||
renewToken(eq(collector.getDelegationTokenForApp()),
|
||||
any(String.class));
|
||||
|
||||
// Wait to ensure lifetime of token expires and ensure its regenerated
|
||||
// automatically.
|
||||
Thread.sleep(3000);
|
||||
for (int i = 0; i < 40; i++) {
|
||||
if (!token.equals(collector.getDelegationTokenForApp())) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(50);
|
||||
}
|
||||
assertNotEquals("Token should have been regenerated.", token,
|
||||
collector.getDelegationTokenForApp());
|
||||
Thread.sleep(1000);
|
||||
// Try publishing with the old token in UGI. Publishing should fail due
|
||||
// to invalid token.
|
||||
try {
|
||||
publishAndVerifyEntity(appId, entityTypeDir, entityType, 2);
|
||||
fail("Exception should have been thrown due to Invalid Token.");
|
||||
} catch (YarnException e) {
|
||||
assertTrue("Exception thrown should have been due to Invalid Token.",
|
||||
e.getCause().getMessage().contains("InvalidToken"));
|
||||
}
|
||||
|
||||
// Update the regenerated token in UGI and retry publishing entities.
|
||||
Token<TimelineDelegationTokenIdentifier> regeneratedToken =
|
||||
collector.getDelegationTokenForApp();
|
||||
regeneratedToken.setService(new Text("localhost" +
|
||||
regeneratedToken.getService().toString().substring(
|
||||
regeneratedToken.getService().toString().indexOf(":"))));
|
||||
UserGroupInformation.getCurrentUser().addToken(regeneratedToken);
|
||||
assertTrue("Entities should have been published successfully.",
|
||||
publishWithRetries(appId, entityTypeDir, entityType, 2));
|
||||
// Token was generated twice, once when app collector was created and
|
||||
// later after token lifetime expiry.
|
||||
verify(collectorManager.getTokenManagerService(), times(2)).
|
||||
generateToken(any(UserGroupInformation.class), any(String.class));
|
||||
assertEquals(1, ((DummyNodeTimelineCollectorManager) collectorManager).
|
||||
getTokenExpiredCnt());
|
||||
}
|
||||
// Wait for async entity to be published.
|
||||
for (int i = 0; i < 50; i++) {
|
||||
@ -359,14 +426,35 @@ public Void call() throws Exception {
|
||||
|
||||
private static class DummyNodeTimelineCollectorManager extends
|
||||
NodeTimelineCollectorManager {
|
||||
private volatile int tokenExpiredCnt = 0;
|
||||
DummyNodeTimelineCollectorManager() {
|
||||
super();
|
||||
}
|
||||
|
||||
private int getTokenExpiredCnt() {
|
||||
return tokenExpiredCnt;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TimelineV2DelegationTokenSecretManagerService
|
||||
createTokenManagerService() {
|
||||
return spy(new TimelineV2DelegationTokenSecretManagerService());
|
||||
return spy(new TimelineV2DelegationTokenSecretManagerService() {
|
||||
@Override
|
||||
protected AbstractDelegationTokenSecretManager
|
||||
<TimelineDelegationTokenIdentifier>
|
||||
createTimelineDelegationTokenSecretManager(long secretKeyInterval,
|
||||
long tokenMaxLifetime, long tokenRenewInterval,
|
||||
long tokenRemovalScanInterval) {
|
||||
return spy(new TimelineV2DelegationTokenSecretManager(
|
||||
secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 2000L) {
|
||||
@Override
|
||||
protected void logExpireToken(
|
||||
TimelineDelegationTokenIdentifier ident) throws IOException {
|
||||
tokenExpiredCnt++;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -51,7 +51,9 @@ public class AppLevelTimelineCollector extends TimelineCollector {
|
||||
private final TimelineCollectorContext context;
|
||||
private UserGroupInformation currentUser;
|
||||
private Token<TimelineDelegationTokenIdentifier> delegationTokenForApp;
|
||||
private Future<?> renewalFuture;
|
||||
private long tokenMaxDate = 0;
|
||||
private String tokenRenewer;
|
||||
private Future<?> renewalOrRegenerationFuture;
|
||||
|
||||
public AppLevelTimelineCollector(ApplicationId appId) {
|
||||
this(appId, null);
|
||||
@ -75,13 +77,32 @@ public String getAppUser() {
|
||||
|
||||
void setDelegationTokenAndFutureForApp(
|
||||
Token<TimelineDelegationTokenIdentifier> token,
|
||||
Future<?> appRenewalFuture) {
|
||||
Future<?> appRenewalOrRegenerationFuture, long tknMaxDate,
|
||||
String renewer) {
|
||||
this.delegationTokenForApp = token;
|
||||
this.renewalFuture = appRenewalFuture;
|
||||
this.tokenMaxDate = tknMaxDate;
|
||||
this.tokenRenewer = renewer;
|
||||
this.renewalOrRegenerationFuture = appRenewalOrRegenerationFuture;
|
||||
}
|
||||
|
||||
void setRenewalFutureForApp(Future<?> appRenewalFuture) {
|
||||
this.renewalFuture = appRenewalFuture;
|
||||
void setRenewalOrRegenerationFutureForApp(
|
||||
Future<?> appRenewalOrRegenerationFuture) {
|
||||
this.renewalOrRegenerationFuture = appRenewalOrRegenerationFuture;
|
||||
}
|
||||
|
||||
void cancelRenewalOrRegenerationFutureForApp() {
|
||||
if (renewalOrRegenerationFuture != null &&
|
||||
!renewalOrRegenerationFuture.isDone()) {
|
||||
renewalOrRegenerationFuture.cancel(true);
|
||||
}
|
||||
}
|
||||
|
||||
long getAppDelegationTokenMaxDate() {
|
||||
return tokenMaxDate;
|
||||
}
|
||||
|
||||
String getAppDelegationTokenRenewer() {
|
||||
return tokenRenewer;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@ -109,9 +130,7 @@ protected void serviceStart() throws Exception {
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
if (renewalFuture != null && !renewalFuture.isDone()) {
|
||||
renewalFuture.cancel(true);
|
||||
}
|
||||
cancelRenewalOrRegenerationFutureForApp();
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
|
@ -87,6 +87,8 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
||||
|
||||
private static final long TIME_BEFORE_RENEW_DATE = 10 * 1000; // 10 seconds.
|
||||
|
||||
private static final long TIME_BEFORE_EXPIRY = 5 * 60 * 1000; // 5 minutes.
|
||||
|
||||
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
|
||||
|
||||
@VisibleForTesting
|
||||
@ -176,10 +178,8 @@ public Token<TimelineDelegationTokenIdentifier> generateTokenForAppCollector(
|
||||
public long renewTokenForAppCollector(
|
||||
AppLevelTimelineCollector appCollector) throws IOException {
|
||||
if (appCollector.getDelegationTokenForApp() != null) {
|
||||
TimelineDelegationTokenIdentifier identifier =
|
||||
appCollector.getDelegationTokenForApp().decodeIdentifier();
|
||||
return tokenMgrService.renewToken(appCollector.getDelegationTokenForApp(),
|
||||
identifier.getRenewer().toString());
|
||||
appCollector.getAppDelegationTokenRenewer());
|
||||
} else {
|
||||
LOG.info("Delegation token not available for renewal for app " +
|
||||
appCollector.getTimelineEntityContext().getAppId());
|
||||
@ -196,6 +196,42 @@ public void cancelTokenForAppCollector(
|
||||
}
|
||||
}
|
||||
|
||||
private long getRenewalDelay(long renewInterval) {
|
||||
return ((renewInterval > TIME_BEFORE_RENEW_DATE) ?
|
||||
renewInterval - TIME_BEFORE_RENEW_DATE : renewInterval);
|
||||
}
|
||||
|
||||
private long getRegenerationDelay(long tokenMaxDate) {
|
||||
long regenerateTime = tokenMaxDate - Time.now();
|
||||
return ((regenerateTime > TIME_BEFORE_EXPIRY) ?
|
||||
regenerateTime - TIME_BEFORE_EXPIRY : regenerateTime);
|
||||
}
|
||||
|
||||
private org.apache.hadoop.yarn.api.records.Token generateTokenAndSetTimer(
|
||||
ApplicationId appId, AppLevelTimelineCollector appCollector)
|
||||
throws IOException {
|
||||
Token<TimelineDelegationTokenIdentifier> timelineToken =
|
||||
generateTokenForAppCollector(appCollector.getAppUser());
|
||||
TimelineDelegationTokenIdentifier tokenId =
|
||||
timelineToken.decodeIdentifier();
|
||||
long renewalDelay = getRenewalDelay(tokenRenewInterval);
|
||||
long regenerationDelay = getRegenerationDelay(tokenId.getMaxDate());
|
||||
if (renewalDelay > 0 || regenerationDelay > 0) {
|
||||
boolean isTimerForRenewal = renewalDelay < regenerationDelay;
|
||||
Future<?> renewalOrRegenerationFuture = tokenRenewalExecutor.schedule(
|
||||
new CollectorTokenRenewer(appId, isTimerForRenewal),
|
||||
isTimerForRenewal? renewalDelay : regenerationDelay,
|
||||
TimeUnit.MILLISECONDS);
|
||||
appCollector.setDelegationTokenAndFutureForApp(timelineToken,
|
||||
renewalOrRegenerationFuture, tokenId.getMaxDate(),
|
||||
tokenId.getRenewer().toString());
|
||||
}
|
||||
LOG.info("Generated a new token " + timelineToken + " for app " + appId);
|
||||
return org.apache.hadoop.yarn.api.records.Token.newInstance(
|
||||
timelineToken.getIdentifier(), timelineToken.getKind().toString(),
|
||||
timelineToken.getPassword(), timelineToken.getService().toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
|
||||
try {
|
||||
@ -206,19 +242,8 @@ protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
|
||||
if (UserGroupInformation.isSecurityEnabled() &&
|
||||
collector instanceof AppLevelTimelineCollector) {
|
||||
AppLevelTimelineCollector appCollector =
|
||||
(AppLevelTimelineCollector)collector;
|
||||
Token<TimelineDelegationTokenIdentifier> timelineToken =
|
||||
generateTokenForAppCollector(appCollector.getAppUser());
|
||||
long renewalDelay = (tokenRenewInterval > TIME_BEFORE_RENEW_DATE) ?
|
||||
tokenRenewInterval - TIME_BEFORE_RENEW_DATE : tokenRenewInterval;
|
||||
Future<?> renewalFuture =
|
||||
tokenRenewalExecutor.schedule(new CollectorTokenRenewer(appId),
|
||||
renewalDelay, TimeUnit.MILLISECONDS);
|
||||
appCollector.setDelegationTokenAndFutureForApp(timelineToken,
|
||||
renewalFuture);
|
||||
token = org.apache.hadoop.yarn.api.records.Token.newInstance(
|
||||
timelineToken.getIdentifier(), timelineToken.getKind().toString(),
|
||||
timelineToken.getPassword(), timelineToken.getService().toString());
|
||||
(AppLevelTimelineCollector) collector;
|
||||
token = generateTokenAndSetTimer(appId, appCollector);
|
||||
}
|
||||
// Report to NM if a new collector is added.
|
||||
reportNewCollectorInfoToNM(appId, token);
|
||||
@ -365,16 +390,54 @@ public String getRestServerBindAddress() {
|
||||
|
||||
private final class CollectorTokenRenewer implements Runnable {
|
||||
private ApplicationId appId;
|
||||
private CollectorTokenRenewer(ApplicationId applicationId) {
|
||||
// Indicates whether timer is for renewal or regeneration of token.
|
||||
private boolean timerForRenewal = true;
|
||||
private CollectorTokenRenewer(ApplicationId applicationId,
|
||||
boolean forRenewal) {
|
||||
appId = applicationId;
|
||||
timerForRenewal = forRenewal;
|
||||
}
|
||||
|
||||
private void renewToken(AppLevelTimelineCollector appCollector)
|
||||
throws IOException {
|
||||
long newExpirationTime = renewTokenForAppCollector(appCollector);
|
||||
// Set renewal or regeneration timer based on delay.
|
||||
long renewalDelay = 0;
|
||||
if (newExpirationTime > 0) {
|
||||
LOG.info("Renewed token for " + appId + " with new expiration " +
|
||||
"timestamp = " + newExpirationTime);
|
||||
renewalDelay = getRenewalDelay(newExpirationTime - Time.now());
|
||||
}
|
||||
long regenerationDelay =
|
||||
getRegenerationDelay(appCollector.getAppDelegationTokenMaxDate());
|
||||
if (renewalDelay > 0 || regenerationDelay > 0) {
|
||||
this.timerForRenewal = renewalDelay < regenerationDelay;
|
||||
Future<?> renewalOrRegenerationFuture = tokenRenewalExecutor.schedule(
|
||||
this, timerForRenewal ? renewalDelay : regenerationDelay,
|
||||
TimeUnit.MILLISECONDS);
|
||||
appCollector.setRenewalOrRegenerationFutureForApp(
|
||||
renewalOrRegenerationFuture);
|
||||
}
|
||||
}
|
||||
|
||||
private void regenerateToken(AppLevelTimelineCollector appCollector)
|
||||
throws IOException {
|
||||
org.apache.hadoop.yarn.api.records.Token token =
|
||||
generateTokenAndSetTimer(appId, appCollector);
|
||||
// Report to NM if a new collector is added.
|
||||
try {
|
||||
reportNewCollectorInfoToNM(appId, token);
|
||||
} catch (YarnException e) {
|
||||
LOG.warn("Unable to report regenerated token to NM for " + appId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
TimelineCollector collector = get(appId);
|
||||
if (collector == null) {
|
||||
LOG.info("Cannot find active collector while renewing token for " +
|
||||
appId);
|
||||
LOG.info("Cannot find active collector while " + (timerForRenewal ?
|
||||
"renewing" : "regenerating") + " token for " + appId);
|
||||
return;
|
||||
}
|
||||
AppLevelTimelineCollector appCollector =
|
||||
@ -383,19 +446,14 @@ public void run() {
|
||||
synchronized (collector) {
|
||||
if (!collector.isStopped()) {
|
||||
try {
|
||||
long newExpirationTime = renewTokenForAppCollector(appCollector);
|
||||
if (newExpirationTime > 0) {
|
||||
long renewInterval = newExpirationTime - Time.now();
|
||||
long renewalDelay = (renewInterval > TIME_BEFORE_RENEW_DATE) ?
|
||||
renewInterval - TIME_BEFORE_RENEW_DATE : renewInterval;
|
||||
LOG.info("Renewed token for " + appId + " with new expiration " +
|
||||
"timestamp = " + newExpirationTime);
|
||||
Future<?> renewalFuture = tokenRenewalExecutor.schedule(
|
||||
this, renewalDelay, TimeUnit.MILLISECONDS);
|
||||
appCollector.setRenewalFutureForApp(renewalFuture);
|
||||
if (timerForRenewal) {
|
||||
renewToken(appCollector);
|
||||
} else {
|
||||
regenerateToken(appCollector);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Unable to renew token for " + appId);
|
||||
LOG.warn("Unable to " + (timerForRenewal ? "renew" : "regenerate") +
|
||||
" token for " + appId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,8 @@
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
@ -73,6 +75,9 @@ public void cancelToken(Token<TimelineDelegationTokenIdentifier> token,
|
||||
public static class TimelineV2DelegationTokenSecretManager extends
|
||||
AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier> {
|
||||
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TimelineV2DelegationTokenSecretManager.class);
|
||||
|
||||
/**
|
||||
* Create a timeline v2 secret manager.
|
||||
* @param delegationKeyUpdateInterval the number of milliseconds for rolling
|
||||
@ -111,5 +116,11 @@ public Token<TimelineDelegationTokenIdentifier> generateToken(
|
||||
public TimelineDelegationTokenIdentifier createIdentifier() {
|
||||
return new TimelineDelegationTokenIdentifier();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void logExpireToken(TimelineDelegationTokenIdentifier ident)
|
||||
throws IOException {
|
||||
LOG.info("Token " + ident + " expired.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user