HDFS-4477. Secondary namenode may retain old tokens. Contributed by Daryn Sharp.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1467307 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c2592021f3
commit
f863543206
@ -27,8 +27,10 @@ import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
@ -144,6 +146,10 @@ extends AbstractDelegationTokenIdentifier>
|
||||
return;
|
||||
}
|
||||
|
||||
protected void logExpireToken(TokenIdent ident) throws IOException {
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the current master key
|
||||
* This is called once by startThreads before tokenRemoverThread is created,
|
||||
@ -363,15 +369,25 @@ extends AbstractDelegationTokenIdentifier>
|
||||
}
|
||||
|
||||
/** Remove expired delegation tokens from cache */
|
||||
private synchronized void removeExpiredToken() {
|
||||
private void removeExpiredToken() throws IOException {
|
||||
long now = Time.now();
|
||||
Iterator<DelegationTokenInformation> i = currentTokens.values().iterator();
|
||||
while (i.hasNext()) {
|
||||
long renewDate = i.next().getRenewDate();
|
||||
if (now > renewDate) {
|
||||
i.remove();
|
||||
Set<TokenIdent> expiredTokens = new HashSet<TokenIdent>();
|
||||
synchronized (this) {
|
||||
Iterator<Map.Entry<TokenIdent, DelegationTokenInformation>> i =
|
||||
currentTokens.entrySet().iterator();
|
||||
while (i.hasNext()) {
|
||||
Map.Entry<TokenIdent, DelegationTokenInformation> entry = i.next();
|
||||
long renewDate = entry.getValue().getRenewDate();
|
||||
if (renewDate < now) {
|
||||
expiredTokens.add(entry.getKey());
|
||||
i.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
// don't hold lock on 'this' to avoid edit log updates blocking token ops
|
||||
for (TokenIdent ident : expiredTokens) {
|
||||
logExpireToken(ident);
|
||||
}
|
||||
}
|
||||
|
||||
public void stopThreads() {
|
||||
|
@ -2488,6 +2488,8 @@ Release 0.23.8 - UNRELEASED
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HDFS-4477. Secondary namenode may retain old tokens (daryn via kihwal)
|
||||
|
||||
Release 0.23.7 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -310,6 +310,23 @@ public class DelegationTokenSecretManager
|
||||
namesystem.logUpdateMasterKey(key);
|
||||
}
|
||||
}
|
||||
|
||||
@Override //AbstractDelegationTokenManager
|
||||
protected void logExpireToken(final DelegationTokenIdentifier dtId)
|
||||
throws IOException {
|
||||
synchronized (noInterruptsLock) {
|
||||
// The edit logging code will fail catastrophically if it
|
||||
// is interrupted during a logSync, since the interrupt
|
||||
// closes the edit log files. Doing this inside the
|
||||
// above lock and then checking interruption status
|
||||
// prevents this bug.
|
||||
if (Thread.interrupted()) {
|
||||
throw new InterruptedIOException(
|
||||
"Interrupted before expiring delegation token");
|
||||
}
|
||||
namesystem.logExpireDelegationToken(dtId);
|
||||
}
|
||||
}
|
||||
|
||||
/** A utility method for creating credentials. */
|
||||
public static Credentials createCredentials(final NameNode namenode,
|
||||
|
@ -5358,6 +5358,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||
getEditLog().logSync();
|
||||
}
|
||||
|
||||
/**
|
||||
* Log the cancellation of expired tokens to edit logs
|
||||
*
|
||||
* @param id token identifier to cancel
|
||||
*/
|
||||
public void logExpireDelegationToken(DelegationTokenIdentifier id) {
|
||||
assert !isInSafeMode() :
|
||||
"this should never be called while in safemode, since we stop " +
|
||||
"the DT manager before entering safemode!";
|
||||
// No need to hold FSN lock since we don't access any internal
|
||||
// structures, and this is stopped before the FSN shuts itself
|
||||
// down, etc.
|
||||
getEditLog().logCancelDelegationToken(id);
|
||||
}
|
||||
|
||||
private void logReassignLease(String leaseHolder, String src,
|
||||
String newHolder) {
|
||||
assert hasWriteLock();
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.File;
|
||||
@ -30,12 +31,14 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.junit.Test;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
* This class tests the creation and validation of a checkpoint.
|
||||
@ -163,4 +166,70 @@ public class TestSecurityTokenEditLog {
|
||||
if(cluster != null) cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testEditsForCancelOnTokenExpire() throws IOException,
|
||||
InterruptedException {
|
||||
long renewInterval = 2000;
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
|
||||
conf.setLong(DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, renewInterval);
|
||||
conf.setLong(DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, renewInterval*2);
|
||||
|
||||
Text renewer = new Text(UserGroupInformation.getCurrentUser().getUserName());
|
||||
FSImage fsImage = mock(FSImage.class);
|
||||
FSEditLog log = mock(FSEditLog.class);
|
||||
doReturn(log).when(fsImage).getEditLog();
|
||||
FSNamesystem fsn = new FSNamesystem(conf, fsImage);
|
||||
|
||||
DelegationTokenSecretManager dtsm = fsn.getDelegationTokenSecretManager();
|
||||
try {
|
||||
dtsm.startThreads();
|
||||
|
||||
// get two tokens
|
||||
Token<DelegationTokenIdentifier> token1 = fsn.getDelegationToken(renewer);
|
||||
Token<DelegationTokenIdentifier> token2 = fsn.getDelegationToken(renewer);
|
||||
DelegationTokenIdentifier ident1 =
|
||||
(DelegationTokenIdentifier)token1.decodeIdentifier();
|
||||
DelegationTokenIdentifier ident2 =
|
||||
(DelegationTokenIdentifier)token2.decodeIdentifier();
|
||||
|
||||
// verify we got the tokens
|
||||
verify(log, times(1)).logGetDelegationToken(eq(ident1), anyLong());
|
||||
verify(log, times(1)).logGetDelegationToken(eq(ident2), anyLong());
|
||||
|
||||
// this is a little tricky because DTSM doesn't let us set scan interval
|
||||
// so need to periodically sleep, then stop/start threads to force scan
|
||||
|
||||
// renew first token 1/2 to expire
|
||||
Thread.sleep(renewInterval/2);
|
||||
fsn.renewDelegationToken(token2);
|
||||
verify(log, times(1)).logRenewDelegationToken(eq(ident2), anyLong());
|
||||
// force scan and give it a little time to complete
|
||||
dtsm.stopThreads(); dtsm.startThreads();
|
||||
Thread.sleep(250);
|
||||
// no token has expired yet
|
||||
verify(log, times(0)).logCancelDelegationToken(eq(ident1));
|
||||
verify(log, times(0)).logCancelDelegationToken(eq(ident2));
|
||||
|
||||
// sleep past expiration of 1st non-renewed token
|
||||
Thread.sleep(renewInterval/2);
|
||||
dtsm.stopThreads(); dtsm.startThreads();
|
||||
Thread.sleep(250);
|
||||
// non-renewed token should have implicitly been cancelled
|
||||
verify(log, times(1)).logCancelDelegationToken(eq(ident1));
|
||||
verify(log, times(0)).logCancelDelegationToken(eq(ident2));
|
||||
|
||||
// sleep past expiration of 2nd renewed token
|
||||
Thread.sleep(renewInterval/2);
|
||||
dtsm.stopThreads(); dtsm.startThreads();
|
||||
Thread.sleep(250);
|
||||
// both tokens should have been implicitly cancelled by now
|
||||
verify(log, times(1)).logCancelDelegationToken(eq(ident1));
|
||||
verify(log, times(1)).logCancelDelegationToken(eq(ident2));
|
||||
} finally {
|
||||
dtsm.stopThreads();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user