HDFS-4643. Fix flakiness in TestQuorumJournalManager. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1466253 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2013-04-09 21:34:12 +00:00
parent 7d00d3d20f
commit f0351527d5
4 changed files with 51 additions and 24 deletions

View File

@ -20,6 +20,9 @@
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Random;
@ -340,4 +343,23 @@ public static void assertValueWithinRange(long expectedMin, long expectedMax,
Assert.assertTrue("Expected " + actual + " to be in range (" + expectedMin + ","
+ expectedMax + ")", expectedMin <= actual && actual <= expectedMax);
}
/**
* Assert that there are no threads running whose name matches the
* given regular expression.
* @param regex the regex to match against
*/
public static void assertNoThreadsMatching(String regex) {
Pattern pattern = Pattern.compile(regex);
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20);
for (ThreadInfo info : infos) {
if (info == null) continue;
if (pattern.matcher(info.getThreadName()).matches()) {
Assert.fail("Leaked thread: " + info + "\n" +
Joiner.on("\n").join(info.getStackTrace()));
}
}
}
}

View File

@ -498,6 +498,8 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4669. TestBlockPoolManager fails using IBM java. (Tian Hong Wang via
suresh)
HDFS-4643. Fix flakiness in TestQuorumJournalManager. (todd)
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -75,6 +75,8 @@ public class TestQuorumJournalManager {
private QuorumJournalManager qjm;
private List<AsyncLogger> spies;
private List<QuorumJournalManager> toClose = Lists.newLinkedList();
static {
((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.ALL);
}
@ -98,11 +100,26 @@ public void setup() throws Exception {
@After
public void shutdown() throws IOException {
IOUtils.cleanup(LOG, toClose.toArray(new Closeable[0]));
// Should not leak clients between tests -- this can cause flaky tests.
// (See HDFS-4643)
GenericTestUtils.assertNoThreadsMatching(".*IPC Client.*");
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Enqueue a QJM for closing during shutdown. This makes the code a little
* easier to follow, with fewer try..finally clauses necessary.
*/
private QuorumJournalManager closeLater(QuorumJournalManager qjm) {
toClose.add(qjm);
return qjm;
}
@Test
public void testSingleWriter() throws Exception {
writeSegment(cluster, qjm, 1, 3, true);
@ -119,8 +136,8 @@ public void testSingleWriter() throws Exception {
@Test
public void testFormat() throws Exception {
QuorumJournalManager qjm = new QuorumJournalManager(
conf, cluster.getQuorumJournalURI("testFormat-jid"), FAKE_NSINFO);
QuorumJournalManager qjm = closeLater(new QuorumJournalManager(
conf, cluster.getQuorumJournalURI("testFormat-jid"), FAKE_NSINFO));
assertFalse(qjm.hasSomeData());
qjm.format(FAKE_NSINFO);
assertTrue(qjm.hasSomeData());
@ -128,8 +145,7 @@ public void testFormat() throws Exception {
@Test
public void testReaderWhileAnotherWrites() throws Exception {
QuorumJournalManager readerQjm = createSpyingQJM();
QuorumJournalManager readerQjm = closeLater(createSpyingQJM());
List<EditLogInputStream> streams = Lists.newArrayList();
readerQjm.selectInputStreams(streams, 0, false);
assertEquals(0, streams.size());
@ -251,8 +267,8 @@ public void testCrashAtBeginningOfSegment() throws Exception {
// Make a new QJM
qjm = new QuorumJournalManager(
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO);
qjm = closeLater(new QuorumJournalManager(
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO));
qjm.recoverUnfinalizedSegments();
checkRecovery(cluster, 1, 3);
@ -364,8 +380,8 @@ public void testChangeWritersLogsInSync() throws Exception {
NNStorage.getInProgressEditsFileName(1));
// Make a new QJM
qjm = new QuorumJournalManager(
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO);
qjm = closeLater(new QuorumJournalManager(
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO));
qjm.recoverUnfinalizedSegments();
checkRecovery(cluster, 1, 3);
}
@ -902,8 +918,8 @@ protected ExecutorService createExecutor() {
return Mockito.spy(logger);
}
};
return new QuorumJournalManager(
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory);
return closeLater(new QuorumJournalManager(
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory));
}
private static void waitForAllPendingCalls(AsyncLoggerSet als)

View File

@ -30,8 +30,6 @@
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
@ -122,18 +120,7 @@ public void setUp() throws IOException {
@After
public void checkForSNNThreads() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20);
for (ThreadInfo info : infos) {
if (info == null) continue;
LOG.info("Check thread: " + info.getThreadName());
if (info.getThreadName().contains("SecondaryNameNode")) {
fail("Leaked thread: " + info + "\n" +
Joiner.on("\n").join(info.getStackTrace()));
}
}
LOG.info("--------");
GenericTestUtils.assertNoThreadsMatching(".*SecondaryNameNode.*");
}
static void checkFile(FileSystem fileSys, Path name, int repl)