HDFS-5064. Standby checkpoints should not block concurrent readers. Contributed by Aaron Twining Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1575448 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1c4047b0e4
commit
90b399c4bd
@ -717,6 +717,9 @@ Release 2.4.0 - UNRELEASED
|
||||
HDFS-6065. HDFS zero-copy reads should return null on EOF when doing ZCR
|
||||
(cmccabe)
|
||||
|
||||
HDFS-5064. Standby checkpoints should not block concurrent readers.
|
||||
(atm via wang)
|
||||
|
||||
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
|
||||
|
@ -1355,7 +1355,11 @@ public void updateLastAppliedTxIdFromWritten() {
|
||||
this.lastAppliedTxId = editLog.getLastWrittenTxId();
|
||||
}
|
||||
|
||||
public synchronized long getMostRecentCheckpointTxId() {
|
||||
// Should be OK for this to not be synchronized since all of the places which
|
||||
// mutate this value are themselves synchronized so it shouldn't be possible
|
||||
// to see this flop back and forth. In the worst case this will just return an
|
||||
// old value.
|
||||
public long getMostRecentCheckpointTxId() {
|
||||
return storage.getMostRecentCheckpointTxId();
|
||||
}
|
||||
}
|
||||
|
@ -116,6 +116,7 @@
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import javax.management.NotCompliantMBeanException;
|
||||
@ -1355,20 +1356,47 @@ public void readLock() {
|
||||
this.fsLock.readLock().lock();
|
||||
}
|
||||
@Override
|
||||
public void longReadLockInterruptibly() throws InterruptedException {
|
||||
this.fsLock.longReadLock().lockInterruptibly();
|
||||
try {
|
||||
this.fsLock.readLock().lockInterruptibly();
|
||||
} catch (InterruptedException ie) {
|
||||
// In the event we're interrupted while getting the normal FSNS read lock,
|
||||
// release the long read lock.
|
||||
this.fsLock.longReadLock().unlock();
|
||||
throw ie;
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void longReadUnlock() {
|
||||
this.fsLock.readLock().unlock();
|
||||
this.fsLock.longReadLock().unlock();
|
||||
}
|
||||
@Override
|
||||
public void readUnlock() {
|
||||
this.fsLock.readLock().unlock();
|
||||
}
|
||||
@Override
|
||||
public void writeLock() {
|
||||
this.fsLock.longReadLock().lock();
|
||||
this.fsLock.writeLock().lock();
|
||||
}
|
||||
@Override
|
||||
public void writeLockInterruptibly() throws InterruptedException {
|
||||
this.fsLock.longReadLock().lockInterruptibly();
|
||||
try {
|
||||
this.fsLock.writeLock().lockInterruptibly();
|
||||
} catch (InterruptedException ie) {
|
||||
// In the event we're interrupted while getting the normal FSNS write
|
||||
// lock, release the long read lock.
|
||||
this.fsLock.longReadLock().unlock();
|
||||
throw ie;
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void writeUnlock() {
|
||||
this.fsLock.writeLock().unlock();
|
||||
this.fsLock.longReadLock().unlock();
|
||||
}
|
||||
@Override
|
||||
public boolean hasWriteLock() {
|
||||
@ -6838,10 +6866,15 @@ void setFsLockForTests(ReentrantReadWriteLock lock) {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
ReentrantReadWriteLock getFsLockForTests() {
|
||||
public ReentrantReadWriteLock getFsLockForTests() {
|
||||
return fsLock.coarseLock;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ReentrantLock getLongReadLockForTests() {
|
||||
return fsLock.longReadLock;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public SafeModeInfo getSafeModeInfoForTests() {
|
||||
return safeMode;
|
||||
|
@ -21,6 +21,7 @@
|
||||
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
@ -33,6 +34,24 @@ class FSNamesystemLock implements ReadWriteLock {
|
||||
@VisibleForTesting
|
||||
protected ReentrantReadWriteLock coarseLock;
|
||||
|
||||
/**
|
||||
* When locking the FSNS for a read that may take a long time, we take this
|
||||
* lock before taking the regular FSNS read lock. All writers also take this
|
||||
* lock before taking the FSNS write lock. Regular (short) readers do not
|
||||
* take this lock at all, instead relying solely on the synchronization of the
|
||||
* regular FSNS lock.
|
||||
*
|
||||
* This scheme ensures that:
|
||||
* 1) In the case of normal (fast) ops, readers proceed concurrently and
|
||||
* writers are not starved.
|
||||
* 2) In the case of long read ops, short reads are allowed to proceed
|
||||
* concurrently during the duration of the long read.
|
||||
*
|
||||
* See HDFS-5064 for more context.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected ReentrantLock longReadLock = new ReentrantLock(true);
|
||||
|
||||
FSNamesystemLock(boolean fair) {
|
||||
this.coarseLock = new ReentrantReadWriteLock(fair);
|
||||
}
|
||||
@ -47,6 +66,10 @@ public Lock writeLock() {
|
||||
return coarseLock.writeLock();
|
||||
}
|
||||
|
||||
public Lock longReadLock() {
|
||||
return longReadLock;
|
||||
}
|
||||
|
||||
public int getReadHoldCount() {
|
||||
return coarseLock.getReadHoldCount();
|
||||
}
|
||||
|
@ -126,7 +126,7 @@ public boolean isOfType(StorageDirType type) {
|
||||
* recent fsimage file. This does not include any transactions
|
||||
* that have since been written to the edit log.
|
||||
*/
|
||||
protected long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID;
|
||||
protected volatile long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID;
|
||||
|
||||
/**
|
||||
* Time of the last checkpoint, in milliseconds since the epoch.
|
||||
|
@ -151,7 +151,7 @@ private void doCheckpoint() throws InterruptedException, IOException {
|
||||
final long txid;
|
||||
final NameNodeFile imageType;
|
||||
|
||||
namesystem.writeLockInterruptibly();
|
||||
namesystem.longReadLockInterruptibly();
|
||||
try {
|
||||
assert namesystem.getEditLog().isOpenForRead() :
|
||||
"Standby Checkpointer should only attempt a checkpoint when " +
|
||||
@ -182,7 +182,7 @@ private void doCheckpoint() throws InterruptedException, IOException {
|
||||
assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
|
||||
thisCheckpointTxId + " but instead saved at txid=" + txid;
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.longReadUnlock();
|
||||
}
|
||||
|
||||
// Upload the saved checkpoint back to the active
|
||||
|
@ -22,6 +22,15 @@ public interface RwLock {
|
||||
/** Acquire read lock. */
|
||||
public void readLock();
|
||||
|
||||
/**
|
||||
* Acquire the long read lock, unless interrupted while waiting. The long
|
||||
* read lock should also serve to block all concurrent writers.
|
||||
**/
|
||||
void longReadLockInterruptibly() throws InterruptedException;
|
||||
|
||||
/** Release the long read lock. */
|
||||
public void longReadUnlock();
|
||||
|
||||
/** Release read lock. */
|
||||
public void readUnlock();
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
@ -25,6 +26,7 @@
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -33,6 +35,7 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||
@ -312,6 +315,67 @@ public void testStandbyExceptionThrownDuringCheckpoint() throws Exception {
|
||||
answerer.getFireCount() == 1 && answerer.getResultCount() == 1);
|
||||
}
|
||||
|
||||
@Test(timeout=300000)
|
||||
public void testReadsAllowedDuringCheckpoint() throws Exception {
|
||||
|
||||
// Set it up so that we know when the SBN checkpoint starts and ends.
|
||||
FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1);
|
||||
DelayAnswer answerer = new DelayAnswer(LOG);
|
||||
Mockito.doAnswer(answerer).when(spyImage1)
|
||||
.saveNamespace(Mockito.any(FSNamesystem.class),
|
||||
Mockito.any(NameNodeFile.class),
|
||||
Mockito.any(Canceler.class));
|
||||
|
||||
// Perform some edits and wait for a checkpoint to start on the SBN.
|
||||
doEdits(0, 1000);
|
||||
nn0.getRpcServer().rollEditLog();
|
||||
answerer.waitForCall();
|
||||
assertTrue("SBN is not performing checkpoint but it should be.",
|
||||
answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
|
||||
|
||||
// Make sure that the lock has actually been taken by the checkpointing
|
||||
// thread.
|
||||
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
|
||||
|
||||
// Perform an RPC that needs to take the write lock.
|
||||
Thread t = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
nn1.getRpcServer().restoreFailedStorage("false");
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
|
||||
// Make sure that our thread is waiting for the lock.
|
||||
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
|
||||
|
||||
assertFalse(nn1.getNamesystem().getFsLockForTests().hasQueuedThreads());
|
||||
assertFalse(nn1.getNamesystem().getFsLockForTests().isWriteLocked());
|
||||
assertTrue(nn1.getNamesystem().getLongReadLockForTests().hasQueuedThreads());
|
||||
|
||||
// Get /jmx of the standby NN web UI, which will cause the FSNS read lock to
|
||||
// be taken.
|
||||
String pageContents = DFSTestUtil.urlGet(new URL("http://" +
|
||||
nn1.getHttpAddress().getHostName() + ":" +
|
||||
nn1.getHttpAddress().getPort() + "/jmx"));
|
||||
assertTrue(pageContents.contains("NumLiveDataNodes"));
|
||||
|
||||
// Make sure that the checkpoint is still going on, implying that the client
|
||||
// RPC to the SBN happened during the checkpoint.
|
||||
assertTrue("SBN should have still been checkpointing.",
|
||||
answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
|
||||
answerer.proceed();
|
||||
answerer.waitForResult();
|
||||
assertTrue("SBN should have finished checkpointing.",
|
||||
answerer.getFireCount() == 1 && answerer.getResultCount() == 1);
|
||||
|
||||
t.join();
|
||||
}
|
||||
|
||||
private void doEdits(int start, int stop) throws IOException {
|
||||
for (int i = start; i < stop; i++) {
|
||||
Path p = new Path("/test" + i);
|
||||
|
Loading…
Reference in New Issue
Block a user