HDFS-10267. Extra "synchronized" on FsDatasetImpl#recoverAppend and FsDatasetImpl#recoverClose

This commit is contained in:
Colin Patrick Mccabe 2016-04-06 12:36:54 -07:00
parent 3be1ab485f
commit 4bd7cbc29d
2 changed files with 181 additions and 59 deletions

View File

@ -1268,7 +1268,7 @@ private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS,
}
@Override // FsDatasetSpi
public synchronized ReplicaHandler recoverAppend(
public ReplicaHandler recoverAppend(
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
LOG.info("Recover failed append to " + b);
@ -1301,7 +1301,7 @@ public synchronized ReplicaHandler recoverAppend(
}
@Override // FsDatasetSpi
public synchronized Replica recoverClose(ExtendedBlock b, long newGS,
public Replica recoverClose(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException {
LOG.info("Recover failed close " + b);
while (true) {

View File

@ -44,8 +44,10 @@
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Iterators;
import org.apache.commons.logging.Log;
@ -90,6 +92,7 @@
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
@ -161,7 +164,7 @@ public class TestBlockRecovery {
}
private final long
TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L;
TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L;
/**
* Starts an instance of DataNode
@ -175,11 +178,10 @@ public void startUp() throws IOException, URISyntaxException {
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
if (currentTestName.getMethodName().equals(
"testInitReplicaRecoveryDoesNotHogLock")) {
if (currentTestName.getMethodName().contains("DoesNotHoldLock")) {
// This test requires a very long value for the xceiver stop timeout.
conf.setLong(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS);
TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS);
}
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
FileSystem.setDefaultUri(conf,
@ -816,96 +818,216 @@ public void testSafeLength() throws Exception {
}
}
/**
* Test that initReplicaRecovery does not hold the lock for an unreasonable
* amount of time if a writer is taking a long time to stop.
*/
@Test(timeout=60000)
public void testInitReplicaRecoveryDoesNotHogLock() throws Exception {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
private static class TestStopWorkerSemaphore {
final Semaphore sem;
final AtomicBoolean gotInterruption = new AtomicBoolean(false);
TestStopWorkerSemaphore() {
this.sem = new Semaphore(0);
}
/**
* Attempt to acquire a sempahore within a given timeout.
*
* This is useful for unit tests where we need to ignore InterruptedException
* when attempting to take a semaphore, but still want to honor the overall
* test timeout.
*
* @param timeoutMs The timeout in miliseconds.
*/
private void uninterruptiblyAcquire(long timeoutMs) throws Exception {
long startTimeMs = Time.monotonicNow();
while (true) {
long remTime = startTimeMs + timeoutMs - Time.monotonicNow();
if (remTime < 0) {
throw new RuntimeException("Failed to acquire the semaphore within " +
timeoutMs + " milliseconds.");
}
try {
if (sem.tryAcquire(1, remTime, TimeUnit.MILLISECONDS)) {
return;
}
} catch (InterruptedException e) {
gotInterruption.set(true);
}
}
}
}
private interface TestStopWorkerRunnable {
/**
* Return the name of the operation that this runnable performs.
*/
String opName();
/**
* Perform the operation.
*/
void run(RecoveringBlock recoveringBlock) throws Exception;
}
@Test(timeout=90000)
public void testInitReplicaRecoveryDoesNotHoldLock() throws Exception {
testStopWorker(new TestStopWorkerRunnable() {
@Override
public String opName() {
return "initReplicaRecovery";
}
@Override
public void run(RecoveringBlock recoveringBlock) throws Exception {
try {
spyDN.initReplicaRecovery(recoveringBlock);
} catch (Exception e) {
if (!e.getMessage().contains("meta does not exist")) {
throw e;
}
}
}
});
}
@Test(timeout=90000)
public void testRecoverAppendDoesNotHoldLock() throws Exception {
testStopWorker(new TestStopWorkerRunnable() {
@Override
public String opName() {
return "recoverAppend";
}
@Override
public void run(RecoveringBlock recoveringBlock) throws Exception {
try {
ExtendedBlock extBlock = recoveringBlock.getBlock();
spyDN.getFSDataset().recoverAppend(extBlock,
extBlock.getGenerationStamp() + 1, extBlock.getNumBytes());
} catch (Exception e) {
if (!e.getMessage().contains(
"Corrupted replica ReplicaBeingWritten")) {
throw e;
}
}
}
});
}
@Test(timeout=90000)
public void testRecoverCloseDoesNotHoldLock() throws Exception {
testStopWorker(new TestStopWorkerRunnable() {
@Override
public String opName() {
return "recoverClose";
}
@Override
public void run(RecoveringBlock recoveringBlock) throws Exception {
try {
ExtendedBlock extBlock = recoveringBlock.getBlock();
spyDN.getFSDataset().recoverClose(extBlock,
extBlock.getGenerationStamp() + 1, extBlock.getNumBytes());
} catch (Exception e) {
if (!e.getMessage().contains(
"Corrupted replica ReplicaBeingWritten")) {
throw e;
}
}
}
});
}
/**
* Test that an FsDatasetImpl operation does not hold the lock for an
* unreasonable amount of time if a writer is taking a long time to stop.
*/
private void testStopWorker(final TestStopWorkerRunnable tswr)
throws Exception {
LOG.debug("Running " + currentTestName.getMethodName());
// We need a long value for the data xceiver stop timeout.
// Otherwise the timeout will trigger, and we will not have tested that
// thread join was done locklessly.
Assert.assertEquals(
TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS,
TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS,
dn.getDnConf().getXceiverStopTimeout());
final Semaphore progressParent = new Semaphore(0);
final Semaphore terminateSlowWorker = new Semaphore(0);
final AtomicBoolean failure = new AtomicBoolean(false);
final TestStopWorkerSemaphore progressParent =
new TestStopWorkerSemaphore();
final TestStopWorkerSemaphore terminateSlowWriter =
new TestStopWorkerSemaphore();
final AtomicReference<String> failure =
new AtomicReference<String>(null);
Collection<RecoveringBlock> recoveringBlocks =
initRecoveringBlocks();
final RecoveringBlock recoveringBlock =
Iterators.get(recoveringBlocks.iterator(), 0);
final ExtendedBlock block = recoveringBlock.getBlock();
Thread slowWorker = new Thread(new Runnable() {
Thread slowWriterThread = new Thread(new Runnable() {
@Override
public void run() {
try {
// Register this thread as the writer for the recoveringBlock.
LOG.debug("slowWorker creating rbw");
LOG.debug("slowWriter creating rbw");
ReplicaHandler replicaHandler =
spyDN.data.createRbw(StorageType.DISK, block, false);
replicaHandler.close();
LOG.debug("slowWorker created rbw");
LOG.debug("slowWriter created rbw");
// Tell the parent thread to start progressing.
progressParent.release();
while (true) {
try {
terminateSlowWorker.acquire();
break;
} catch (InterruptedException e) {
// Ignore interrupted exceptions so that the waitingWorker thread
// will have to wait for us.
}
}
LOG.debug("slowWorker exiting");
progressParent.sem.release();
terminateSlowWriter.uninterruptiblyAcquire(60000);
LOG.debug("slowWriter exiting");
} catch (Throwable t) {
LOG.error("slowWorker got exception", t);
failure.set(true);
LOG.error("slowWriter got exception", t);
failure.compareAndSet(null, "slowWriter got exception " +
t.getMessage());
}
}
});
// Start the slow worker thread and wait for it to take ownership of the
// ReplicaInPipeline
slowWorker.start();
while (true) {
try {
progressParent.acquire();
break;
} catch (InterruptedException e) {
// Ignore interrupted exceptions
}
}
slowWriterThread.start();
progressParent.uninterruptiblyAcquire(60000);
// Start a worker thread which will wait for the slow worker thread.
Thread waitingWorker = new Thread(new Runnable() {
// Start a worker thread which will attempt to stop the writer.
Thread stopWriterThread = new Thread(new Runnable() {
@Override
public void run() {
try {
// Attempt to terminate the other worker thread and take ownership
// of the ReplicaInPipeline.
LOG.debug("waitingWorker initiating recovery");
spyDN.initReplicaRecovery(recoveringBlock);
LOG.debug("waitingWorker initiated recovery");
LOG.debug("initiating " + tswr.opName());
tswr.run(recoveringBlock);
LOG.debug("finished " + tswr.opName());
} catch (Throwable t) {
GenericTestUtils.assertExceptionContains("meta does not exist", t);
LOG.error("stopWriterThread got unexpected exception for " +
tswr.opName(), t);
failure.compareAndSet(null, "stopWriterThread got unexpected " +
"exception for " + tswr.opName() + ": " + t.getMessage());
}
}
});
waitingWorker.start();
stopWriterThread.start();
// Do an operation that requires the lock. This should not be blocked
// by the replica recovery in progress.
while (!terminateSlowWriter.gotInterruption.get()) {
// Wait until stopWriterThread attempts to stop our slow writer by sending
// it an InterruptedException.
Thread.sleep(1);
}
// We know that stopWriterThread is in the process of joining our slow
// writer. It must not hold the lock during this operation.
// In order to test that it does not, we attempt to do an operation that
// requires the lock-- getReplicaString.
spyDN.getFSDataset().getReplicaString(
recoveringBlock.getBlock().getBlockPoolId(),
recoveringBlock.getBlock().getBlockId());
// Wait for the two worker threads to exit normally.
terminateSlowWorker.release();
slowWorker.join();
waitingWorker.join();
Assert.assertFalse("The slowWriter thread failed.", failure.get());
// Tell the slow writer to exit, and then wait for all threads to join.
terminateSlowWriter.sem.release();
slowWriterThread.join();
stopWriterThread.join();
// Check that our worker threads exited cleanly. This is not checked by the
// unit test framework, so we have to do it manually here.
String failureReason = failure.get();
if (failureReason != null) {
Assert.fail("Thread failure: " + failureReason);
}
}
}