diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt index 33d3f2ccea..16ccf997fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt @@ -24,3 +24,5 @@ HDFS-3798. Avoid throwing NPE when finalizeSegment() is called on invalid segmen HDFS-3799. QJM: handle empty log segments during recovery (todd) HDFS-3797. QJM: add segment txid as a parameter to journal() RPC (todd) + +HDFS-3800. improvements to QJM fault testing (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java index 6fb1d897b5..8542dd373a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java @@ -61,7 +61,7 @@ public static byte[] createTxnData(int startTxn, int numTxns) throws Exception { } public static void writeSegment(MiniJournalCluster cluster, - QuorumJournalManager qjm, int startTxId, int numTxns, + QuorumJournalManager qjm, long startTxId, int numTxns, boolean finalize) throws IOException { EditLogOutputStream stm = qjm.startLogSegment(startTxId); // Should create in-progress @@ -81,7 +81,7 @@ public static void writeOp(EditLogOutputStream stm, long txid) throws IOExceptio stm.write(op); } - public static void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns) + public static void writeTxns(EditLogOutputStream stm, long startTxId, int numTxns) throws IOException { for (long txid = startTxId; txid < startTxId + numTxns; txid++) { writeOp(stm, txid); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java index 3bbea00776..e66f96643c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java @@ -17,26 +17,39 @@ */ package org.apache.hadoop.hdfs.qjournal.client; +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO; +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID; +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.Closeable; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.SortedSet; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; -import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger; -import org.apache.hadoop.hdfs.qjournal.client.QuorumException; +import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; +import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; +import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.junit.BeforeClass; +import org.apache.hadoop.hdfs.util.Holder; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -46,23 +59,27 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Sets; - -import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID; -import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO; -import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment; +import com.google.common.util.concurrent.MoreExecutors; public class TestQJMWithFaults { private static final Log LOG = LogFactory.getLog( TestQJMWithFaults.class); + private static final String RAND_SEED_PROPERTY = + "TestQJMWithFaults.random-seed"; + + private static final int NUM_WRITER_ITERS = 500; + private static final int SEGMENTS_PER_WRITER = 2; + private static Configuration conf = new Configuration(); static { // Don't retry connections - it just slows down the tests. - conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + + // Make tests run faster by avoiding fsync() + EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); } - private static long MAX_IPC_NUMBER; - /** * Run through the creation of a log without any faults injected, @@ -70,10 +87,10 @@ public class TestQJMWithFaults { * bounds for the other test cases, so they can exhaustively explore * the space of potential failures. */ - @BeforeClass - public static void determineMaxIpcNumber() throws Exception { + private static long determineMaxIpcNumber() throws Exception { Configuration conf = new Configuration(); MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build(); + long ret; try { QuorumJournalManager qjm = createInjectableQJM(cluster); qjm.format(FAKE_NSINFO); @@ -90,11 +107,12 @@ public static void determineMaxIpcNumber() throws Exception { // were no failures. assertEquals(1, ipcCounts.size()); - MAX_IPC_NUMBER = ipcCounts.first(); - LOG.info("Max IPC count = " + MAX_IPC_NUMBER); + ret = ipcCounts.first(); + LOG.info("Max IPC count = " + ret); } finally { cluster.shutdown(); } + return ret; } /** @@ -106,6 +124,8 @@ public static void determineMaxIpcNumber() throws Exception { */ @Test public void testRecoverAfterDoubleFailures() throws Exception { + final long MAX_IPC_NUMBER = determineMaxIpcNumber(); + for (int failA = 1; failA <= MAX_IPC_NUMBER; failA++) { for (int failB = 1; failB <= MAX_IPC_NUMBER; failB++) { String injectionStr = "(" + failA + ", " + failB + ")"; @@ -132,17 +152,16 @@ public void testRecoverAfterDoubleFailures() throws Exception { } // Now should be able to recover - try { - qjm = createInjectableQJM(cluster); - qjm.recoverUnfinalizedSegments(); - writeSegment(cluster, qjm, lastAckedTxn + 1, 3, true); - // TODO: verify log segments - } catch (Throwable t) { - // Test failure! Rethrow with the test setup info so it can be - // easily triaged. - throw new RuntimeException("Test failed with injection: " + injectionStr, - t); - } + qjm = createInjectableQJM(cluster); + long lastRecoveredTxn = QJMTestUtil.recoverAndReturnLastTxn(qjm); + assertTrue(lastRecoveredTxn >= lastAckedTxn); + + writeSegment(cluster, qjm, lastRecoveredTxn + 1, 3, true); + } catch (Throwable t) { + // Test failure! Rethrow with the test setup info so it can be + // easily triaged. + throw new RuntimeException("Test failed with injection: " + injectionStr, + t); } finally { cluster.shutdown(); cluster = null; @@ -150,6 +169,115 @@ public void testRecoverAfterDoubleFailures() throws Exception { } } } + + /** + * Test case in which three JournalNodes randomly flip flop between + * up and down states every time they get an RPC. + * + * The writer keeps track of the latest ACKed edit, and on every + * recovery operation, ensures that it recovers at least to that + * point or higher. Since at any given point, a majority of JNs + * may be injecting faults, any writer operation is allowed to fail, + * so long as the exception message indicates it failed due to injected + * faults. + * + * Given a random seed, the test should be entirely deterministic. + */ + @Test + public void testRandomized() throws Exception { + long seed; + Long userSpecifiedSeed = Long.getLong(RAND_SEED_PROPERTY); + if (userSpecifiedSeed != null) { + LOG.info("Using seed specified in system property"); + seed = userSpecifiedSeed; + + // If the user specifies a seed, then we should gather all the + // IPC trace information so that debugging is easier. This makes + // the test run about 25% slower otherwise. + ((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.ALL); + } else { + seed = new Random().nextLong(); + } + LOG.info("Random seed: " + seed); + + Random r = new Random(seed); + + MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf) + .build(); + + // Format the cluster using a non-faulty QJM. + QuorumJournalManager qjmForInitialFormat = + createInjectableQJM(cluster); + qjmForInitialFormat.format(FAKE_NSINFO); + qjmForInitialFormat.close(); + + try { + long txid = 0; + long lastAcked = 0; + + for (int i = 0; i < NUM_WRITER_ITERS; i++) { + LOG.info("Starting writer " + i + "\n-------------------"); + + QuorumJournalManager qjm = createRandomFaultyQJM(cluster, r); + try { + if (txid > 100) { + qjm.purgeLogsOlderThan(txid - 100); + } + + long recovered; + try { + recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm); + } catch (Throwable t) { + LOG.info("Failed recovery", t); + GenericTestUtils.assertExceptionContains("faking being down", t); + continue; + } + assertTrue("Recovered only up to txnid " + recovered + + " but had gotten an ack for " + lastAcked, + recovered >= lastAcked); + + txid = recovered + 1; + + Holder thrown = new Holder(null); + for (int j = 0; j < SEGMENTS_PER_WRITER; j++) { + lastAcked = writeSegmentUntilCrash(cluster, qjm, txid, 4, thrown); + if (thrown.held != null) { + LOG.info("Failed write", thrown.held); + GenericTestUtils.assertExceptionContains("faking being down", + thrown.held); + break; + } + txid += 4; + } + } finally { + qjm.close(); + } + } + } finally { + cluster.shutdown(); + } + } + + private long writeSegmentUntilCrash(MiniJournalCluster cluster, + QuorumJournalManager qjm, long txid, int numTxns, Holder thrown) { + + long firstTxId = txid; + long lastAcked = txid - 1; + try { + EditLogOutputStream stm = qjm.startLogSegment(txid); + + for (int i = 0; i < numTxns; i++) { + QJMTestUtil.writeTxns(stm, txid++, 1); + lastAcked++; + } + + stm.close(); + qjm.finalizeLogSegment(firstTxId, lastAcked); + } catch (Throwable t) { + thrown.held = t; + } + return lastAcked; + } /** * Run a simple workload of becoming the active writer and writing @@ -179,6 +307,43 @@ private static int doWorkload(MiniJournalCluster cluster, private void failIpcNumber(AsyncLogger logger, int idx) { ((InvocationCountingChannel)logger).failIpcNumber(idx); } + + private static class RandomFaultyChannel extends IPCLoggerChannel { + private final Random random; + private float injectionProbability = 0.1f; + private boolean isUp = true; + + public RandomFaultyChannel(Configuration conf, NamespaceInfo nsInfo, + String journalId, InetSocketAddress addr, long seed) { + super(conf, nsInfo, journalId, addr); + this.random = new Random(seed); + } + + @Override + protected QJournalProtocol createProxy() throws IOException { + QJournalProtocol realProxy = super.createProxy(); + return mockProxy( + new WrapEveryCall(realProxy) { + @Override + void beforeCall(InvocationOnMock invocation) throws Exception { + if (random.nextFloat() < injectionProbability) { + isUp = !isUp; + LOG.info("transitioned " + addr + " to " + + (isUp ? "up" : "down")); + } + + if (!isUp) { + throw new IOException("Injected - faking being down"); + } + } + }); + } + + @Override + protected ExecutorService createExecutor() { + return MoreExecutors.sameThreadExecutor(); + } + } private static class InvocationCountingChannel extends IPCLoggerChannel { private int rpcCount = 0; @@ -211,10 +376,9 @@ private void inject(int beforeRpcNumber, Callable injectedCode) { @Override protected QJournalProtocol createProxy() throws IOException { final QJournalProtocol realProxy = super.createProxy(); - QJournalProtocol mock = Mockito.mock(QJournalProtocol.class, - new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { + QJournalProtocol mock = mockProxy( + new WrapEveryCall(realProxy) { + void beforeCall(InvocationOnMock invocation) throws Exception { rpcCount++; String callStr = "[" + addr + "] " + invocation.getMethod().getName() + "(" + @@ -228,14 +392,44 @@ public Object answer(InvocationOnMock invocation) throws Throwable { } else { LOG.info("IPC call #" + rpcCount + ": " + callStr); } - - return invocation.getMethod().invoke(realProxy, - invocation.getArguments()); } }); return mock; } } + + + private static QJournalProtocol mockProxy(WrapEveryCall wrapper) + throws IOException { + QJournalProtocol mock = Mockito.mock(QJournalProtocol.class, + Mockito.withSettings() + .defaultAnswer(wrapper) + .extraInterfaces(Closeable.class)); + Mockito.doNothing().when((Closeable)mock).close(); + return mock; + } + + private static abstract class WrapEveryCall implements Answer { + private final Object realObj; + WrapEveryCall(Object realObj) { + this.realObj = realObj; + } + + @SuppressWarnings("unchecked") + @Override + public T answer(InvocationOnMock invocation) throws Throwable { + beforeCall(invocation); + + try { + return (T) invocation.getMethod().invoke(realObj, + invocation.getArguments()); + } catch (InvocationTargetException ite) { + throw ite.getCause(); + } + } + + abstract void beforeCall(InvocationOnMock invocation) throws Exception; + } private static QuorumJournalManager createInjectableQJM(MiniJournalCluster cluster) throws IOException, URISyntaxException { @@ -249,4 +443,21 @@ public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory); } + + private static QuorumJournalManager createRandomFaultyQJM( + MiniJournalCluster cluster, final Random seedGenerator) + throws IOException, URISyntaxException { + + AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() { + @Override + public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, + String journalId, InetSocketAddress addr) { + return new RandomFaultyChannel(conf, nsInfo, journalId, addr, + seedGenerator.nextLong()); + } + }; + return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID), + FAKE_NSINFO, spyFactory); + } + }