YARN-10348. Allow RM to always cancel tokens after app completes. Contributed by

Jim Brennan

(cherry picked from commit 48f90115b5)
This commit is contained in:
Eric Badger 2020-07-13 23:12:18 +00:00
parent 7044a007b3
commit 41bcef9486
4 changed files with 93 additions and 3 deletions

View File

@ -748,6 +748,9 @@ public static boolean isAclEnabled(Configuration conf) {
RM_PREFIX + "delegation-token.max-conf-size-bytes";
public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES =
12800;
public static final String RM_DELEGATION_TOKEN_ALWAYS_CANCEL =
RM_PREFIX + "delegation-token.always-cancel";
public static final boolean DEFAULT_RM_DELEGATION_TOKEN_ALWAYS_CANCEL = false;
public static final String RM_DT_RENEWER_THREAD_TIMEOUT =
RM_PREFIX + "delegation-token-renewer.thread-timeout";

View File

@ -804,6 +804,16 @@
<value>12800</value>
</property>
<property>
<description>If true, ResourceManager will always try to cancel delegation
tokens after the application completes, even if the client sets
shouldCancelAtEnd false. References to delegation tokens are tracked,
so they will not be canceled until all sub-tasks are done using them.
</description>
<name>yarn.resourcemanager.delegation-token.always-cancel</name>
<value>false</value>
</property>
<property>
<description>If true, ResourceManager will have proxy-user privileges.
Use case: In a secure cluster, YARN requires the user hdfs delegation-tokens to

View File

@ -115,6 +115,7 @@ public class DelegationTokenRenewer extends AbstractService {
private volatile boolean isServiceStarted;
private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
private boolean alwaysCancelDelegationTokens;
private boolean tokenKeepAliveEnabled;
private boolean hasProxyUserPrivileges;
private long credentialsValidTimeRemaining;
@ -137,6 +138,9 @@ public DelegationTokenRenewer() {
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.alwaysCancelDelegationTokens =
conf.getBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_ALWAYS_CANCEL);
this.hasProxyUserPrivileges =
conf.getBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
YarnConfiguration.DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED);
@ -268,7 +272,7 @@ protected void serviceStop() {
*
*/
@VisibleForTesting
protected static class DelegationTokenToRenew {
protected class DelegationTokenToRenew {
public final Token<?> token;
public final Collection<ApplicationId> referringAppIds;
public final Configuration conf;
@ -298,7 +302,7 @@ public DelegationTokenToRenew(Collection<ApplicationId> applicationIds,
this.conf = conf;
this.expirationDate = expirationDate;
this.timerTask = null;
this.shouldCancelAtEnd = shouldCancelAtEnd;
this.shouldCancelAtEnd = shouldCancelAtEnd | alwaysCancelDelegationTokens;
}
public void setTimerTask(RenewalTimerTask tTask) {

View File

@ -217,6 +217,8 @@ public void setUp() throws Exception {
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
conf.set("override_token_expire_time", "3000");
conf.setBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
false);
UserGroupInformation.setConfiguration(conf);
eventQueue = new LinkedBlockingQueue<Event>();
dispatcher = new AsyncDispatcher(eventQueue);
@ -608,6 +610,77 @@ public void testDTRenewalWithNoCancel () throws Exception {
token1.renew(conf);
}
/**
* Basic idea of the test:
* 1. Verify that YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL = true
* overrides shouldCancelAtEnd
* 2. register a token for 2 seconds with shouldCancelAtEnd = false
* 3. cancel it immediately
* 4. check that token was canceled
* @throws IOException
* @throws URISyntaxException
*/
@Test(timeout=60000)
public void testDTRenewalWithNoCancelAlwaysCancel() throws Exception {
Configuration lconf = new Configuration(conf);
lconf.setBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
true);
DelegationTokenRenewer localDtr =
createNewDelegationTokenRenewer(lconf, counter);
RMContext mockContext = mock(RMContext.class);
when(mockContext.getSystemCredentialsForApps()).thenReturn(
new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
when(mockContext.getDelegationTokenRenewer()).thenReturn(
localDtr);
when(mockContext.getDispatcher()).thenReturn(dispatcher);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
localDtr.setDelegationTokenRenewerPoolTracker(false);
localDtr.setRMContext(mockContext);
localDtr.init(lconf);
localDtr.start();
MyFS dfs = (MyFS)FileSystem.get(lconf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+lconf.hashCode());
Credentials ts = new Credentials();
MyToken token1 = dfs.getDelegationToken("user1");
//to cause this one to be set for renew in 2 secs
Renewer.tokenToRenewIn2Sec = token1;
LOG.info("token="+token1+" should be renewed for 2 secs");
String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0";
ts.addToken(new Text(nn1), token1);
ApplicationId applicationId = BuilderUtils.newApplicationId(0, 1);
localDtr.addApplicationAsync(applicationId, ts, false, "user",
new Configuration());
waitForEventsToGetProcessed(localDtr);
localDtr.applicationFinished(applicationId);
waitForEventsToGetProcessed(localDtr);
int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
} catch (InterruptedException e) {}
LOG.info("Counter = " + Renewer.counter + ";t="+ Renewer.lastRenewed);
// counter and the token should still be the old ones
assertEquals("renew wasn't called as many times as expected",
numberOfExpectedRenewals, Renewer.counter);
// The token should have been cancelled at this point. Renewal will fail.
try {
token1.renew(lconf);
fail("Renewal of cancelled token should have failed");
} catch (InvalidToken ite) {}
}
/**
* Basic idea of the test:
* 0. Setup token KEEP_ALIVE
@ -1616,7 +1689,7 @@ protected Token<?>[] obtainSystemTokensForUser(String user,
// Ensure incrTokenSequenceNo has been called for new token request
Mockito.verify(mockContext, Mockito.times(1)).incrTokenSequenceNo();
DelegationTokenToRenew dttr = new DelegationTokenToRenew(appIds,
DelegationTokenToRenew dttr = dtr.new DelegationTokenToRenew(appIds,
expectedToken, conf, 1000, false, "user1");
dtr.requestNewHdfsDelegationTokenIfNeeded(dttr);