diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 240345c654..7e4e8eb3b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1268,7 +1268,7 @@ private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, } @Override // FsDatasetSpi - public synchronized ReplicaHandler recoverAppend( + public ReplicaHandler recoverAppend( ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { LOG.info("Recover failed append to " + b); @@ -1301,7 +1301,7 @@ public synchronized ReplicaHandler recoverAppend( } @Override // FsDatasetSpi - public synchronized Replica recoverClose(ExtendedBlock b, long newGS, + public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { LOG.info("Recover failed close " + b); while (true) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 751089f282..42e80fa8f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -44,8 +44,10 @@ import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import com.google.common.collect.Iterators; import org.apache.commons.logging.Log; @@ -90,6 +92,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.junit.After; import org.junit.Assert; @@ -161,7 +164,7 @@ public class TestBlockRecovery { } private final long - TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L; + TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L; /** * Starts an instance of DataNode @@ -175,11 +178,10 @@ public void startUp() throws IOException, URISyntaxException { conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); - if (currentTestName.getMethodName().equals( - "testInitReplicaRecoveryDoesNotHogLock")) { + if (currentTestName.getMethodName().contains("DoesNotHoldLock")) { // This test requires a very long value for the xceiver stop timeout. conf.setLong(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, - TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS); + TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS); } conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); FileSystem.setDefaultUri(conf, @@ -816,96 +818,216 @@ public void testSafeLength() throws Exception { } } - /** - * Test that initReplicaRecovery does not hold the lock for an unreasonable - * amount of time if a writer is taking a long time to stop. - */ - @Test(timeout=60000) - public void testInitReplicaRecoveryDoesNotHogLock() throws Exception { - if(LOG.isDebugEnabled()) { - LOG.debug("Running " + GenericTestUtils.getMethodName()); + private static class TestStopWorkerSemaphore { + final Semaphore sem; + + final AtomicBoolean gotInterruption = new AtomicBoolean(false); + + TestStopWorkerSemaphore() { + this.sem = new Semaphore(0); } + + /** + * Attempt to acquire a sempahore within a given timeout. + * + * This is useful for unit tests where we need to ignore InterruptedException + * when attempting to take a semaphore, but still want to honor the overall + * test timeout. + * + * @param timeoutMs The timeout in miliseconds. + */ + private void uninterruptiblyAcquire(long timeoutMs) throws Exception { + long startTimeMs = Time.monotonicNow(); + while (true) { + long remTime = startTimeMs + timeoutMs - Time.monotonicNow(); + if (remTime < 0) { + throw new RuntimeException("Failed to acquire the semaphore within " + + timeoutMs + " milliseconds."); + } + try { + if (sem.tryAcquire(1, remTime, TimeUnit.MILLISECONDS)) { + return; + } + } catch (InterruptedException e) { + gotInterruption.set(true); + } + } + } + } + + private interface TestStopWorkerRunnable { + /** + * Return the name of the operation that this runnable performs. + */ + String opName(); + + /** + * Perform the operation. + */ + void run(RecoveringBlock recoveringBlock) throws Exception; + } + + @Test(timeout=90000) + public void testInitReplicaRecoveryDoesNotHoldLock() throws Exception { + testStopWorker(new TestStopWorkerRunnable() { + @Override + public String opName() { + return "initReplicaRecovery"; + } + + @Override + public void run(RecoveringBlock recoveringBlock) throws Exception { + try { + spyDN.initReplicaRecovery(recoveringBlock); + } catch (Exception e) { + if (!e.getMessage().contains("meta does not exist")) { + throw e; + } + } + } + }); + } + + @Test(timeout=90000) + public void testRecoverAppendDoesNotHoldLock() throws Exception { + testStopWorker(new TestStopWorkerRunnable() { + @Override + public String opName() { + return "recoverAppend"; + } + + @Override + public void run(RecoveringBlock recoveringBlock) throws Exception { + try { + ExtendedBlock extBlock = recoveringBlock.getBlock(); + spyDN.getFSDataset().recoverAppend(extBlock, + extBlock.getGenerationStamp() + 1, extBlock.getNumBytes()); + } catch (Exception e) { + if (!e.getMessage().contains( + "Corrupted replica ReplicaBeingWritten")) { + throw e; + } + } + } + }); + } + + @Test(timeout=90000) + public void testRecoverCloseDoesNotHoldLock() throws Exception { + testStopWorker(new TestStopWorkerRunnable() { + @Override + public String opName() { + return "recoverClose"; + } + + @Override + public void run(RecoveringBlock recoveringBlock) throws Exception { + try { + ExtendedBlock extBlock = recoveringBlock.getBlock(); + spyDN.getFSDataset().recoverClose(extBlock, + extBlock.getGenerationStamp() + 1, extBlock.getNumBytes()); + } catch (Exception e) { + if (!e.getMessage().contains( + "Corrupted replica ReplicaBeingWritten")) { + throw e; + } + } + } + }); + } + + /** + * Test that an FsDatasetImpl operation does not hold the lock for an + * unreasonable amount of time if a writer is taking a long time to stop. + */ + private void testStopWorker(final TestStopWorkerRunnable tswr) + throws Exception { + LOG.debug("Running " + currentTestName.getMethodName()); // We need a long value for the data xceiver stop timeout. // Otherwise the timeout will trigger, and we will not have tested that // thread join was done locklessly. Assert.assertEquals( - TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS, + TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS, dn.getDnConf().getXceiverStopTimeout()); - final Semaphore progressParent = new Semaphore(0); - final Semaphore terminateSlowWorker = new Semaphore(0); - final AtomicBoolean failure = new AtomicBoolean(false); + final TestStopWorkerSemaphore progressParent = + new TestStopWorkerSemaphore(); + final TestStopWorkerSemaphore terminateSlowWriter = + new TestStopWorkerSemaphore(); + final AtomicReference failure = + new AtomicReference(null); Collection recoveringBlocks = initRecoveringBlocks(); final RecoveringBlock recoveringBlock = Iterators.get(recoveringBlocks.iterator(), 0); final ExtendedBlock block = recoveringBlock.getBlock(); - Thread slowWorker = new Thread(new Runnable() { + Thread slowWriterThread = new Thread(new Runnable() { @Override public void run() { try { // Register this thread as the writer for the recoveringBlock. - LOG.debug("slowWorker creating rbw"); + LOG.debug("slowWriter creating rbw"); ReplicaHandler replicaHandler = spyDN.data.createRbw(StorageType.DISK, block, false); replicaHandler.close(); - LOG.debug("slowWorker created rbw"); + LOG.debug("slowWriter created rbw"); // Tell the parent thread to start progressing. - progressParent.release(); - while (true) { - try { - terminateSlowWorker.acquire(); - break; - } catch (InterruptedException e) { - // Ignore interrupted exceptions so that the waitingWorker thread - // will have to wait for us. - } - } - LOG.debug("slowWorker exiting"); + progressParent.sem.release(); + terminateSlowWriter.uninterruptiblyAcquire(60000); + LOG.debug("slowWriter exiting"); } catch (Throwable t) { - LOG.error("slowWorker got exception", t); - failure.set(true); + LOG.error("slowWriter got exception", t); + failure.compareAndSet(null, "slowWriter got exception " + + t.getMessage()); } } }); // Start the slow worker thread and wait for it to take ownership of the // ReplicaInPipeline - slowWorker.start(); - while (true) { - try { - progressParent.acquire(); - break; - } catch (InterruptedException e) { - // Ignore interrupted exceptions - } - } + slowWriterThread.start(); + progressParent.uninterruptiblyAcquire(60000); - // Start a worker thread which will wait for the slow worker thread. - Thread waitingWorker = new Thread(new Runnable() { + // Start a worker thread which will attempt to stop the writer. + Thread stopWriterThread = new Thread(new Runnable() { @Override public void run() { try { - // Attempt to terminate the other worker thread and take ownership - // of the ReplicaInPipeline. - LOG.debug("waitingWorker initiating recovery"); - spyDN.initReplicaRecovery(recoveringBlock); - LOG.debug("waitingWorker initiated recovery"); + LOG.debug("initiating " + tswr.opName()); + tswr.run(recoveringBlock); + LOG.debug("finished " + tswr.opName()); } catch (Throwable t) { - GenericTestUtils.assertExceptionContains("meta does not exist", t); + LOG.error("stopWriterThread got unexpected exception for " + + tswr.opName(), t); + failure.compareAndSet(null, "stopWriterThread got unexpected " + + "exception for " + tswr.opName() + ": " + t.getMessage()); } } }); - waitingWorker.start(); + stopWriterThread.start(); - // Do an operation that requires the lock. This should not be blocked - // by the replica recovery in progress. + while (!terminateSlowWriter.gotInterruption.get()) { + // Wait until stopWriterThread attempts to stop our slow writer by sending + // it an InterruptedException. + Thread.sleep(1); + } + + // We know that stopWriterThread is in the process of joining our slow + // writer. It must not hold the lock during this operation. + // In order to test that it does not, we attempt to do an operation that + // requires the lock-- getReplicaString. spyDN.getFSDataset().getReplicaString( recoveringBlock.getBlock().getBlockPoolId(), recoveringBlock.getBlock().getBlockId()); - // Wait for the two worker threads to exit normally. - terminateSlowWorker.release(); - slowWorker.join(); - waitingWorker.join(); - Assert.assertFalse("The slowWriter thread failed.", failure.get()); + // Tell the slow writer to exit, and then wait for all threads to join. + terminateSlowWriter.sem.release(); + slowWriterThread.join(); + stopWriterThread.join(); + + // Check that our worker threads exited cleanly. This is not checked by the + // unit test framework, so we have to do it manually here. + String failureReason = failure.get(); + if (failureReason != null) { + Assert.fail("Thread failure: " + failureReason); + } } }