YARN-3336. FileSystem memory leak in DelegationTokenRenewer.

This commit is contained in:
cnauroth 2015-03-23 10:45:50 -07:00
parent 7e6f384dd7
commit 6ca1f12024
3 changed files with 41 additions and 5 deletions

View File

@ -819,6 +819,9 @@ Release 2.7.0 - UNRELEASED
YARN-3384. TestLogAggregationService.verifyContainerLogs fails after YARN-3384. TestLogAggregationService.verifyContainerLogs fails after
YARN-2777. (Naganarasimha G R via ozawa) YARN-2777. (Naganarasimha G R via ozawa)
YARN-3336. FileSystem memory leak in DelegationTokenRenewer.
(Zhihai Xu via cnauroth)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -605,6 +605,7 @@ private void requestNewHdfsDelegationToken(ApplicationId applicationId,
rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer); rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
} }
@VisibleForTesting
protected Token<?>[] obtainSystemTokensForUser(String user, protected Token<?>[] obtainSystemTokensForUser(String user,
final Credentials credentials) throws IOException, InterruptedException { final Credentials credentials) throws IOException, InterruptedException {
// Get new hdfs tokens on behalf of this user // Get new hdfs tokens on behalf of this user
@ -615,8 +616,16 @@ protected Token<?>[] obtainSystemTokensForUser(String user,
proxyUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() { proxyUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
@Override @Override
public Token<?>[] run() throws Exception { public Token<?>[] run() throws Exception {
return FileSystem.get(getConfig()).addDelegationTokens( FileSystem fs = FileSystem.get(getConfig());
UserGroupInformation.getLoginUser().getUserName(), credentials); try {
return fs.addDelegationTokens(
UserGroupInformation.getLoginUser().getUserName(),
credentials);
} finally {
// Close the FileSystem created by the new proxy user,
// So that we don't leave an entry in the FileSystem cache
fs.close();
}
} }
}); });
return newTokens; return newTokens;

View File

@ -287,9 +287,16 @@ public String toString() {
* exception * exception
*/ */
static class MyFS extends DistributedFileSystem { static class MyFS extends DistributedFileSystem {
private static AtomicInteger instanceCounter = new AtomicInteger();
public MyFS() {} public MyFS() {
public void close() {} instanceCounter.incrementAndGet();
}
public void close() {
instanceCounter.decrementAndGet();
}
public static int getInstanceCounter() {
return instanceCounter.get();
}
@Override @Override
public void initialize(URI uri, Configuration conf) throws IOException {} public void initialize(URI uri, Configuration conf) throws IOException {}
@ -299,6 +306,11 @@ public MyToken getDelegationToken(String renewer) throws IOException {
LOG.info("Called MYDFS.getdelegationtoken " + result); LOG.info("Called MYDFS.getdelegationtoken " + result);
return result; return result;
} }
public Token<?>[] addDelegationTokens(
final String renewer, Credentials credentials) throws IOException {
return new Token<?>[0];
}
} }
/** /**
@ -1022,4 +1034,16 @@ public void testAppSubmissionWithPreviousToken() throws Exception{
// app2 completes, app1 is still running, check the token is not cancelled // app2 completes, app1 is still running, check the token is not cancelled
Assert.assertFalse(Renewer.cancelled); Assert.assertFalse(Renewer.cancelled);
} }
// Test FileSystem memory leak in obtainSystemTokensForUser.
@Test
public void testFSLeakInObtainSystemTokensForUser() throws Exception{
Credentials credentials = new Credentials();
String user = "test";
int oldCounter = MyFS.getInstanceCounter();
delegationTokenRenewer.obtainSystemTokensForUser(user, credentials);
delegationTokenRenewer.obtainSystemTokensForUser(user, credentials);
delegationTokenRenewer.obtainSystemTokensForUser(user, credentials);
Assert.assertEquals(oldCounter, MyFS.getInstanceCounter());
}
} }