HDFS-13112. Token expiration edits may cause log corruption or deadlock. Contributed by Daryn Sharp.
This commit is contained in:
parent
a53d62ab26
commit
47473952e5
@ -21,7 +21,6 @@
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
@ -366,34 +365,58 @@ public synchronized int getNumberOfKeys() {
|
||||
@Override //AbstractDelegationTokenManager
|
||||
protected void logUpdateMasterKey(DelegationKey key)
|
||||
throws IOException {
|
||||
synchronized (noInterruptsLock) {
|
||||
try {
|
||||
// The edit logging code will fail catastrophically if it
|
||||
// is interrupted during a logSync, since the interrupt
|
||||
// closes the edit log files. Doing this inside the
|
||||
// above lock and then checking interruption status
|
||||
// prevents this bug.
|
||||
if (Thread.interrupted()) {
|
||||
throw new InterruptedIOException(
|
||||
"Interrupted before updating master key");
|
||||
// fsn lock will prevent being interrupted when stopping
|
||||
// the secret manager.
|
||||
namesystem.readLockInterruptibly();
|
||||
try {
|
||||
// this monitor isn't necessary if stopped while holding write lock
|
||||
// but for safety, guard against a stop with read lock.
|
||||
synchronized (noInterruptsLock) {
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
return; // leave flag set so secret monitor exits.
|
||||
}
|
||||
namesystem.logUpdateMasterKey(key);
|
||||
}
|
||||
} finally {
|
||||
namesystem.readUnlock();
|
||||
}
|
||||
namesystem.logUpdateMasterKey(key);
|
||||
} catch (InterruptedException ie) {
|
||||
// AbstractDelegationTokenManager may crash if an exception is thrown.
|
||||
// The interrupt flag will be detected when it attempts to sleep.
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@Override //AbstractDelegationTokenManager
|
||||
protected void logExpireToken(final DelegationTokenIdentifier dtId)
|
||||
throws IOException {
|
||||
synchronized (noInterruptsLock) {
|
||||
try {
|
||||
// The edit logging code will fail catastrophically if it
|
||||
// is interrupted during a logSync, since the interrupt
|
||||
// closes the edit log files. Doing this inside the
|
||||
// above lock and then checking interruption status
|
||||
// prevents this bug.
|
||||
if (Thread.interrupted()) {
|
||||
throw new InterruptedIOException(
|
||||
"Interrupted before expiring delegation token");
|
||||
// fsn lock will prevent being interrupted when stopping
|
||||
// the secret manager.
|
||||
namesystem.readLockInterruptibly();
|
||||
try {
|
||||
// this monitor isn't necessary if stopped while holding write lock
|
||||
// but for safety, guard against a stop with read lock.
|
||||
synchronized (noInterruptsLock) {
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
return; // leave flag set so secret monitor exits.
|
||||
}
|
||||
namesystem.logExpireDelegationToken(dtId);
|
||||
}
|
||||
} finally {
|
||||
namesystem.readUnlock();
|
||||
}
|
||||
namesystem.logExpireDelegationToken(dtId);
|
||||
} catch (InterruptedException ie) {
|
||||
// AbstractDelegationTokenManager may crash if an exception is thrown.
|
||||
// The interrupt flag will be detected when it attempts to sleep.
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1580,6 +1580,10 @@ public void readLock() {
|
||||
this.fsLock.readLock();
|
||||
}
|
||||
@Override
|
||||
public void readLockInterruptibly() throws InterruptedException {
|
||||
this.fsLock.readLockInterruptibly();
|
||||
}
|
||||
@Override
|
||||
public void readUnlock() {
|
||||
this.fsLock.readUnlock();
|
||||
}
|
||||
@ -5675,9 +5679,9 @@ public void logUpdateMasterKey(DelegationKey key) {
|
||||
assert !isInSafeMode() :
|
||||
"this should never be called while in safemode, since we stop " +
|
||||
"the DT manager before entering safemode!";
|
||||
// No need to hold FSN lock since we don't access any internal
|
||||
// structures, and this is stopped before the FSN shuts itself
|
||||
// down, etc.
|
||||
// edit log rolling is not thread-safe and must be protected by the
|
||||
// fsn lock. not updating namespace so read lock is sufficient.
|
||||
assert hasReadLock();
|
||||
getEditLog().logUpdateMasterKey(key);
|
||||
getEditLog().logSync();
|
||||
}
|
||||
@ -5691,9 +5695,10 @@ public void logExpireDelegationToken(DelegationTokenIdentifier id) {
|
||||
assert !isInSafeMode() :
|
||||
"this should never be called while in safemode, since we stop " +
|
||||
"the DT manager before entering safemode!";
|
||||
// No need to hold FSN lock since we don't access any internal
|
||||
// structures, and this is stopped before the FSN shuts itself
|
||||
// down, etc.
|
||||
// edit log rolling is not thread-safe and must be protected by the
|
||||
// fsn lock. not updating namespace so read lock is sufficient.
|
||||
assert hasReadLock();
|
||||
// do not logSync so expiration edits are batched
|
||||
getEditLog().logCancelDelegationToken(id);
|
||||
}
|
||||
|
||||
|
@ -145,6 +145,13 @@ public void readLock() {
|
||||
}
|
||||
}
|
||||
|
||||
public void readLockInterruptibly() throws InterruptedException {
|
||||
coarseLock.readLock().lockInterruptibly();
|
||||
if (coarseLock.getReadHoldCount() == 1) {
|
||||
readLockHeldTimeStampNanos.set(timer.monotonicNowNanos());
|
||||
}
|
||||
}
|
||||
|
||||
public void readUnlock() {
|
||||
readUnlock(OP_NAME_OTHER);
|
||||
}
|
||||
|
@ -21,7 +21,10 @@
|
||||
public interface RwLock {
|
||||
/** Acquire read lock. */
|
||||
public void readLock();
|
||||
|
||||
|
||||
/** Acquire read lock, unless interrupted while waiting */
|
||||
void readLockInterruptibly() throws InterruptedException;
|
||||
|
||||
/** Release read lock. */
|
||||
public void readUnlock();
|
||||
|
||||
|
@ -24,6 +24,7 @@
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
@ -37,7 +38,11 @@
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
@ -180,8 +185,25 @@ public void testEditsForCancelOnTokenExpire() throws IOException,
|
||||
Text renewer = new Text(UserGroupInformation.getCurrentUser().getUserName());
|
||||
FSImage fsImage = mock(FSImage.class);
|
||||
FSEditLog log = mock(FSEditLog.class);
|
||||
doReturn(log).when(fsImage).getEditLog();
|
||||
doReturn(log).when(fsImage).getEditLog();
|
||||
// verify that the namesystem read lock is held while logging token
|
||||
// expirations. the namesystem is not updated, so write lock is not
|
||||
// necessary, but the lock is required because edit log rolling is not
|
||||
// thread-safe.
|
||||
final AtomicReference<FSNamesystem> fsnRef = new AtomicReference<>();
|
||||
doAnswer(
|
||||
new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
// fsn claims read lock if either read or write locked.
|
||||
Assert.assertTrue(fsnRef.get().hasReadLock());
|
||||
Assert.assertFalse(fsnRef.get().hasWriteLock());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
).when(log).logCancelDelegationToken(any(DelegationTokenIdentifier.class));
|
||||
FSNamesystem fsn = new FSNamesystem(conf, fsImage);
|
||||
fsnRef.set(fsn);
|
||||
|
||||
DelegationTokenSecretManager dtsm = fsn.getDelegationTokenSecretManager();
|
||||
try {
|
||||
|
Loading…
Reference in New Issue
Block a user