HDFS-13051. Fix dead lock during async editlog rolling if edit queue is full. Contributed by Daryn Sharp.

This commit is contained in:
Xiao Chen 2018-09-10 22:14:02 -07:00
parent 96892c469b
commit 8e54da1511
2 changed files with 215 additions and 4 deletions

View File

@ -24,7 +24,9 @@
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -145,15 +147,68 @@ public boolean logEdit() {
edit.logSyncWait();
}
// draining permits is intended to provide a high priority reservation.
// however, release of outstanding permits must be postponed until
// drained permits are restored to avoid starvation. logic has some races
// but is good enough to serve its purpose.
private Semaphore overflowMutex = new Semaphore(8){
private AtomicBoolean draining = new AtomicBoolean();
private AtomicInteger pendingReleases = new AtomicInteger();
@Override
public int drainPermits() {
draining.set(true);
return super.drainPermits();
}
// while draining, count the releases until release(int)
private void tryRelease(int permits) {
pendingReleases.getAndAdd(permits);
if (!draining.get()) {
super.release(pendingReleases.getAndSet(0));
}
}
@Override
public void release() {
tryRelease(1);
}
@Override
public void release(int permits) {
draining.set(false);
tryRelease(permits);
}
};
private void enqueueEdit(Edit edit) {
if (LOG.isDebugEnabled()) {
LOG.debug("logEdit " + edit);
}
try {
if (!editPendingQ.offer(edit, 1, TimeUnit.SECONDS)) {
// not checking for overflow yet to avoid penalizing performance of
// the common case. if there is persistent overflow, a mutex will be
// use to throttle contention on the queue.
if (!editPendingQ.offer(edit)) {
Preconditions.checkState(
isSyncThreadAlive(), "sync thread is not alive");
editPendingQ.put(edit);
if (Thread.holdsLock(this)) {
// if queue is full, synchronized caller must immediately relinquish
// the monitor before re-offering to avoid deadlock with sync thread
// which needs the monitor to write transactions.
int permits = overflowMutex.drainPermits();
try {
do {
this.wait(1000); // will be notified by next logSync.
} while (!editPendingQ.offer(edit));
} finally {
overflowMutex.release(permits);
}
} else {
// mutex will throttle contention during persistent overflow.
overflowMutex.acquire();
try {
editPendingQ.put(edit);
} finally {
overflowMutex.release();
}
}
}
} catch (Throwable t) {
// should never happen! failure to enqueue an edit is fatal

View File

@ -28,10 +28,17 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@ -52,6 +59,7 @@
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.mockito.Mockito;
import org.slf4j.event.Level;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -566,5 +574,153 @@ public void run() {
LOG.info("Closing nn");
if(namesystem != null) namesystem.close();
}
}
}
@Test(timeout=180000)
public void testDeadlock() throws Throwable {
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.INFO);
Configuration conf = getConf();
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
DFSTestUtil.formatNameNode(conf);
final FSNamesystem namesystem = FSNamesystem.loadFromDisk(conf);
final AtomicBoolean done = new AtomicBoolean(false);
final Semaphore blockerSemaphore = new Semaphore(0);
final CountDownLatch startSpamLatch = new CountDownLatch(1);
ExecutorService executor = Executors.newCachedThreadPool();
try {
final FSEditLog editLog = namesystem.getEditLog();
FSEditLogOp.OpInstanceCache cache = editLog.cache.get();
final FSEditLogOp op = FSEditLogOp.SetOwnerOp.getInstance(cache)
.setSource("/").setUser("u").setGroup("g");
// don't reset fields so instance can be reused.
final FSEditLogOp reuseOp = Mockito.spy(op);
Mockito.doNothing().when(reuseOp).reset();
// only job is spam edits. it will fill the queue when the test
// loop injects the blockingOp.
Future[] logSpammers = new Future[16];
for (int i=0; i < logSpammers.length; i++) {
final int ii = i;
logSpammers[i] = executor.submit(new Callable() {
@Override
public Void call() throws Exception {
Thread.currentThread().setName("Log spammer " + ii);
// wait until a blocking edit op notifies us to go.
startSpamLatch.await();
for (int i = 0; !done.get() && i < 1000000; i++) {
// do not logSync here because we need to congest the queue.
editLog.logEdit(reuseOp);
if (i % 2048 == 0) {
LOG.info("thread[" + ii +"] edits=" + i);
}
}
assertTrue("too many edits", done.get());
return null;
}
});
}
// the tx id is set while the edit log monitor is held, so this will
// effectively stall the async processing thread which will cause the
// edit queue to fill up.
final FSEditLogOp blockingOp = Mockito.spy(op);
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
// flip the latch to unleash the spamming threads to congest
// the queue.
startSpamLatch.countDown();
// wait until unblocked after a synchronized thread is started.
blockerSemaphore.acquire();
invocation.callRealMethod();
return null;
}
}
).when(blockingOp).setTransactionId(Mockito.anyLong());
// don't reset fields so instance can be reused.
Mockito.doNothing().when(blockingOp).reset();
// repeatedly overflow the queue and verify it doesn't deadlock.
for (int i = 0; i < 8; i++) {
// when the blockingOp is logged, it triggers the latch to unleash the
// spammers to overflow the edit queue, then waits for a permit
// from blockerSemaphore that will be released at the bottom of
// this loop.
Future blockingEdit = executor.submit(new Callable() {
@Override
public Void call() throws Exception {
Thread.currentThread().setName("Log blocker");
editLog.logEdit(blockingOp);
editLog.logSync();
return null;
}
});
// wait for spammers to seize up the edit log.
long startTxId = editLog.getLastWrittenTxIdWithoutLock();
final long[] txIds = { startTxId, startTxId, startTxId };
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
txIds[0] = txIds[1];
txIds[1] = txIds[2];
txIds[2] = editLog.getLastWrittenTxIdWithoutLock();
return (txIds[0] == txIds[1] &&
txIds[1] == txIds[2] &&
txIds[2] > startTxId);
}
}, 100, 10000);
// callers that synchronize on the edit log while the queue is full
// are prone to deadlock if the locking is incorrect. at this point:
// 1. the blocking edit is holding the log's monitor.
// 2. the spammers have filled the queue.
// 3. the spammers are blocked waiting to queue another edit.
// Now we'll start another thread to synchronize on the log (simulates
// what log rolling does), unblock the op currently holding the
// monitor, and ensure deadlock does not occur.
CountDownLatch readyLatch = new CountDownLatch(1);
Future synchedEdits = executor.submit(new Callable() {
@Override
public Void call() throws Exception {
Thread.currentThread().setName("Log synchronizer");
// the sync is CRUCIAL for this test. it's what causes edit
// log rolling to deadlock when queue is full.
readyLatch.countDown();
synchronized (editLog) {
editLog.logEdit(reuseOp);
editLog.logSync();
}
return null;
}
});
// unblock the edit jammed in setting its txid. queued edits should
// start flowing and the synced edits should complete.
readyLatch.await();
blockerSemaphore.release();
blockingEdit.get();
synchedEdits.get();
}
// tell spammers to stop.
done.set(true);
for (int i=0; i < logSpammers.length; i++) {
logSpammers[i].get();
}
// just make sure everything can be synced.
editLog.logSyncAll();
} finally {
LOG.info("Closing nn");
executor.shutdownNow();
if (namesystem != null) {
namesystem.getFSImage().getStorage().close();
namesystem.close();
}
}
}
}