diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1522f1ec40..f75fb925b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -717,6 +717,9 @@ Release 2.4.0 - UNRELEASED HDFS-6065. HDFS zero-copy reads should return null on EOF when doing ZCR (cmccabe) + HDFS-5064. Standby checkpoints should not block concurrent readers. + (atm via wang) + BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index a70e303350..d2c9805dd4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -1355,7 +1355,11 @@ public void updateLastAppliedTxIdFromWritten() { this.lastAppliedTxId = editLog.getLastWrittenTxId(); } - public synchronized long getMostRecentCheckpointTxId() { + // Should be OK for this to not be synchronized since all of the places which + // mutate this value are themselves synchronized so it shouldn't be possible + // to see this flop back and forth. In the worst case this will just return an + // old value. + public long getMostRecentCheckpointTxId() { return storage.getMostRecentCheckpointTxId(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 56f556627a..98f095aea0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -116,6 +116,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.NotCompliantMBeanException; @@ -1355,20 +1356,47 @@ public void readLock() { this.fsLock.readLock().lock(); } @Override + public void longReadLockInterruptibly() throws InterruptedException { + this.fsLock.longReadLock().lockInterruptibly(); + try { + this.fsLock.readLock().lockInterruptibly(); + } catch (InterruptedException ie) { + // In the event we're interrupted while getting the normal FSNS read lock, + // release the long read lock. + this.fsLock.longReadLock().unlock(); + throw ie; + } + } + @Override + public void longReadUnlock() { + this.fsLock.readLock().unlock(); + this.fsLock.longReadLock().unlock(); + } + @Override public void readUnlock() { this.fsLock.readLock().unlock(); } @Override public void writeLock() { + this.fsLock.longReadLock().lock(); this.fsLock.writeLock().lock(); } @Override public void writeLockInterruptibly() throws InterruptedException { - this.fsLock.writeLock().lockInterruptibly(); + this.fsLock.longReadLock().lockInterruptibly(); + try { + this.fsLock.writeLock().lockInterruptibly(); + } catch (InterruptedException ie) { + // In the event we're interrupted while getting the normal FSNS write + // lock, release the long read lock. + this.fsLock.longReadLock().unlock(); + throw ie; + } } @Override public void writeUnlock() { this.fsLock.writeLock().unlock(); + this.fsLock.longReadLock().unlock(); } @Override public boolean hasWriteLock() { @@ -6838,9 +6866,14 @@ void setFsLockForTests(ReentrantReadWriteLock lock) { } @VisibleForTesting - ReentrantReadWriteLock getFsLockForTests() { + public ReentrantReadWriteLock getFsLockForTests() { return fsLock.coarseLock; } + + @VisibleForTesting + public ReentrantLock getLongReadLockForTests() { + return fsLock.longReadLock; + } @VisibleForTesting public SafeModeInfo getSafeModeInfoForTests() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java index a2a1648e45..a1b9477a69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java @@ -21,6 +21,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; @@ -33,6 +34,24 @@ class FSNamesystemLock implements ReadWriteLock { @VisibleForTesting protected ReentrantReadWriteLock coarseLock; + /** + * When locking the FSNS for a read that may take a long time, we take this + * lock before taking the regular FSNS read lock. All writers also take this + * lock before taking the FSNS write lock. Regular (short) readers do not + * take this lock at all, instead relying solely on the synchronization of the + * regular FSNS lock. + * + * This scheme ensures that: + * 1) In the case of normal (fast) ops, readers proceed concurrently and + * writers are not starved. + * 2) In the case of long read ops, short reads are allowed to proceed + * concurrently during the duration of the long read. + * + * See HDFS-5064 for more context. + */ + @VisibleForTesting + protected ReentrantLock longReadLock = new ReentrantLock(true); + FSNamesystemLock(boolean fair) { this.coarseLock = new ReentrantReadWriteLock(fair); } @@ -46,6 +65,10 @@ public Lock readLock() { public Lock writeLock() { return coarseLock.writeLock(); } + + public Lock longReadLock() { + return longReadLock; + } public int getReadHoldCount() { return coarseLock.getReadHoldCount(); @@ -58,4 +81,4 @@ public int getWriteHoldCount() { public boolean isWriteLockedByCurrentThread() { return coarseLock.isWriteLockedByCurrentThread(); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java index 661aa00b27..0a5594b6b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java @@ -126,7 +126,7 @@ public boolean isOfType(StorageDirType type) { * recent fsimage file. This does not include any transactions * that have since been written to the edit log. */ - protected long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID; + protected volatile long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID; /** * Time of the last checkpoint, in milliseconds since the epoch. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java index 6ed4d8b7ab..7aa6077a8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java @@ -151,7 +151,7 @@ private void doCheckpoint() throws InterruptedException, IOException { final long txid; final NameNodeFile imageType; - namesystem.writeLockInterruptibly(); + namesystem.longReadLockInterruptibly(); try { assert namesystem.getEditLog().isOpenForRead() : "Standby Checkpointer should only attempt a checkpoint when " + @@ -182,7 +182,7 @@ private void doCheckpoint() throws InterruptedException, IOException { assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" + thisCheckpointTxId + " but instead saved at txid=" + txid; } finally { - namesystem.writeUnlock(); + namesystem.longReadUnlock(); } // Upload the saved checkpoint back to the active diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/RwLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/RwLock.java index 36e240108b..2792460d53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/RwLock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/RwLock.java @@ -21,6 +21,15 @@ public interface RwLock { /** Acquire read lock. */ public void readLock(); + + /** + * Acquire the long read lock, unless interrupted while waiting. The long + * read lock should also serve to block all concurrent writers. + **/ + void longReadLockInterruptibly() throws InterruptedException; + + /** Release the long read lock. */ + public void longReadUnlock(); /** Release read lock. */ public void readUnlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java index 0041f1427c..f3616bdebf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -25,6 +26,7 @@ import java.io.IOException; import java.io.OutputStream; import java.net.URI; +import java.net.URL; import java.util.List; import org.apache.commons.logging.Log; @@ -33,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.server.namenode.FSImage; @@ -311,6 +314,67 @@ public void testStandbyExceptionThrownDuringCheckpoint() throws Exception { assertTrue("SBN should have finished checkpointing.", answerer.getFireCount() == 1 && answerer.getResultCount() == 1); } + + @Test(timeout=300000) + public void testReadsAllowedDuringCheckpoint() throws Exception { + + // Set it up so that we know when the SBN checkpoint starts and ends. + FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1); + DelayAnswer answerer = new DelayAnswer(LOG); + Mockito.doAnswer(answerer).when(spyImage1) + .saveNamespace(Mockito.any(FSNamesystem.class), + Mockito.any(NameNodeFile.class), + Mockito.any(Canceler.class)); + + // Perform some edits and wait for a checkpoint to start on the SBN. + doEdits(0, 1000); + nn0.getRpcServer().rollEditLog(); + answerer.waitForCall(); + assertTrue("SBN is not performing checkpoint but it should be.", + answerer.getFireCount() == 1 && answerer.getResultCount() == 0); + + // Make sure that the lock has actually been taken by the checkpointing + // thread. + ThreadUtil.sleepAtLeastIgnoreInterrupts(1000); + + // Perform an RPC that needs to take the write lock. + Thread t = new Thread() { + @Override + public void run() { + try { + nn1.getRpcServer().restoreFailedStorage("false"); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + t.start(); + + // Make sure that our thread is waiting for the lock. + ThreadUtil.sleepAtLeastIgnoreInterrupts(1000); + + assertFalse(nn1.getNamesystem().getFsLockForTests().hasQueuedThreads()); + assertFalse(nn1.getNamesystem().getFsLockForTests().isWriteLocked()); + assertTrue(nn1.getNamesystem().getLongReadLockForTests().hasQueuedThreads()); + + // Get /jmx of the standby NN web UI, which will cause the FSNS read lock to + // be taken. + String pageContents = DFSTestUtil.urlGet(new URL("http://" + + nn1.getHttpAddress().getHostName() + ":" + + nn1.getHttpAddress().getPort() + "/jmx")); + assertTrue(pageContents.contains("NumLiveDataNodes")); + + // Make sure that the checkpoint is still going on, implying that the client + // RPC to the SBN happened during the checkpoint. + assertTrue("SBN should have still been checkpointing.", + answerer.getFireCount() == 1 && answerer.getResultCount() == 0); + answerer.proceed(); + answerer.waitForResult(); + assertTrue("SBN should have finished checkpointing.", + answerer.getFireCount() == 1 && answerer.getResultCount() == 1); + + t.join(); + } private void doEdits(int start, int stop) throws IOException { for (int i = start; i < stop; i++) {