diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt index 0a62182139..6ec8a4c7de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt @@ -10,3 +10,5 @@ HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs (todd) HDFS-3693. JNStorage should read its storage info even before a writer becomes active (todd) HDFS-3725. Fix QJM startup when individual JNs have gaps (todd) + +HDFS-3741. Exhaustive failure injection test for skipped RPCs (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java index fc08c0fdda..0544f30246 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java @@ -17,14 +17,17 @@ */ package org.apache.hadoop.hdfs.qjournal.client; +import java.net.InetSocketAddress; import java.net.URL; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import com.google.common.util.concurrent.ListenableFuture; @@ -43,6 +46,11 @@ */ interface AsyncLogger { + interface Factory { + AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, + String journalId, InetSocketAddress addr); + } + /** * Send a batch of edits to the logger. * @param firstTxnId the first txid of the edits. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java index 2e0d4ab0be..a89200700c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java @@ -25,7 +25,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java index 4fad042004..1ff945c38e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java @@ -65,7 +65,7 @@ public class IPCLoggerChannel implements AsyncLogger { private final Configuration conf; - private final InetSocketAddress addr; + protected final InetSocketAddress addr; private QJournalProtocol proxy; private final ListeningExecutorService executor; @@ -87,7 +87,16 @@ public class IPCLoggerChannel implements AsyncLogger { * overflows and it starts to treat the logger as having errored. */ private final int queueSizeLimitBytes; + + static final Factory FACTORY = new AsyncLogger.Factory() { + @Override + public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, + String journalId, InetSocketAddress addr) { + return new IPCLoggerChannel(conf, nsInfo, journalId, addr); + } + }; + public IPCLoggerChannel(Configuration conf, NamespaceInfo nsInfo, String journalId, @@ -131,15 +140,18 @@ public void close() { protected QJournalProtocol getProxy() throws IOException { if (proxy != null) return proxy; - + proxy = createProxy(); + return proxy; + } + + protected QJournalProtocol createProxy() throws IOException { RPC.setProtocolEngine(conf, QJournalProtocolPB.class, ProtobufRpcEngine.class); QJournalProtocolPB pbproxy = RPC.getProxy( QJournalProtocolPB.class, RPC.getProtocolVersion(QJournalProtocolPB.class), addr, conf); - proxy = new QJournalProtocolTranslatorPB(pbproxy); - return proxy; + return new QJournalProtocolTranslatorPB(pbproxy); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index 3c7f7ea19d..786e9f571f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -34,8 +34,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; -import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; @@ -78,14 +76,20 @@ public class QuorumJournalManager implements JournalManager { private final AsyncLoggerSet loggers; - public QuorumJournalManager(Configuration conf, + QuorumJournalManager(Configuration conf, URI uri, NamespaceInfo nsInfo) throws IOException { + this(conf, uri, nsInfo, IPCLoggerChannel.FACTORY); + } + + QuorumJournalManager(Configuration conf, + URI uri, NamespaceInfo nsInfo, + AsyncLogger.Factory loggerFactory) throws IOException { Preconditions.checkArgument(conf != null, "must be configured"); this.conf = conf; this.uri = uri; this.nsInfo = nsInfo; - this.loggers = new AsyncLoggerSet(createLoggers()); + this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory)); // Configure timeouts. this.startSegmentTimeoutMs = conf.getInt( @@ -106,6 +110,11 @@ public QuorumJournalManager(Configuration conf, } + protected List createLoggers( + AsyncLogger.Factory factory) throws IOException { + return createLoggers(conf, uri, nsInfo, factory); + } + static String parseJournalId(URI uri) { String path = uri.getPath(); Preconditions.checkArgument(path != null && !path.isEmpty(), @@ -234,17 +243,14 @@ public int compare( } }; - protected List createLoggers() throws IOException { - return createLoggers(conf, uri, nsInfo); - } - static List createLoggers(Configuration conf, - URI uri, NamespaceInfo nsInfo) throws IOException { + URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory) + throws IOException { List ret = Lists.newArrayList(); List addrs = getLoggerAddresses(uri); String jid = parseJournalId(uri); for (InetSocketAddress addr : addrs) { - ret.add(new IPCLoggerChannel(conf, nsInfo, jid, addr)); + ret.add(factory.createLogger(conf, nsInfo, jid, addr)); } return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java index 761bdab0a9..54e081e931 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java @@ -17,13 +17,31 @@ */ package org.apache.hadoop.hdfs.qjournal; -import java.util.Arrays; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; +import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; +import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes; +import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.DataOutputBuffer; public abstract class QJMTestUtil { + public static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo( + 12345, "mycluster", "my-bp", 0L, 0); + public static final String JID = "test-journal"; public static byte[] createTxnData(int startTxn, int numTxns) throws Exception { DataOutputBuffer buf = new DataOutputBuffer(); @@ -38,4 +56,82 @@ public static byte[] createTxnData(int startTxn, int numTxns) throws Exception { return Arrays.copyOf(buf.getData(), buf.getLength()); } + public static void writeSegment(MiniJournalCluster cluster, + QuorumJournalManager qjm, int startTxId, int numTxns, + boolean finalize) throws IOException { + EditLogOutputStream stm = qjm.startLogSegment(startTxId); + // Should create in-progress + assertExistsInQuorum(cluster, + NNStorage.getInProgressEditsFileName(startTxId)); + + writeTxns(stm, startTxId, numTxns); + if (finalize) { + stm.close(); + qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1); + } + } + + public static void writeOp(EditLogOutputStream stm, long txid) throws IOException { + FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid); + op.setTransactionId(txid); + stm.write(op); + } + + public static void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns) + throws IOException { + for (long txid = startTxId; txid < startTxId + numTxns; txid++) { + writeOp(stm, txid); + } + stm.setReadyToFlush(); + stm.flush(); + } + + /** + * Verify that the given list of streams contains exactly the range of + * transactions specified, inclusive. + */ + public static void verifyEdits(List streams, + int firstTxnId, int lastTxnId) throws IOException { + + Iterator iter = streams.iterator(); + assertTrue(iter.hasNext()); + EditLogInputStream stream = iter.next(); + + for (int expected = firstTxnId; + expected <= lastTxnId; + expected++) { + + FSEditLogOp op = stream.readOp(); + while (op == null) { + assertTrue("Expected to find txid " + expected + ", " + + "but no more streams available to read from", + iter.hasNext()); + stream = iter.next(); + op = stream.readOp(); + } + + assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode); + assertEquals(expected, op.getTransactionId()); + } + + assertNull(stream.readOp()); + assertFalse("Expected no more txns after " + lastTxnId + + " but more streams are available", iter.hasNext()); + } + + + public static void assertExistsInQuorum(MiniJournalCluster cluster, + String fname) { + int count = 0; + for (int i = 0; i < 3; i++) { + File dir = cluster.getCurrentDir(i, JID); + if (new File(dir, fname).exists()) { + count++; + } + } + assertTrue("File " + fname + " should exist in a quorum of dirs", + count >= cluster.getQuorumSize()); + } + + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java index 265b7603b2..4cad9bef14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; -import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster.Builder; import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger; import org.apache.hadoop.hdfs.qjournal.client.AsyncLoggerSet; import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; @@ -59,7 +58,8 @@ public void testSingleThreaded() throws IOException { // With no failures or contention, epochs should increase one-by-one for (int i = 0; i < 5; i++) { AsyncLoggerSet als = new AsyncLoggerSet( - QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO)); + QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO, + IPCLoggerChannel.FACTORY)); als.createNewUniqueEpoch(FAKE_NSINFO); assertEquals(i + 1, als.getEpoch()); } @@ -69,7 +69,8 @@ public void testSingleThreaded() throws IOException { // skipping some for (int i = 0; i < 20; i++) { AsyncLoggerSet als = new AsyncLoggerSet( - makeFaulty(QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO))); + makeFaulty(QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO, + IPCLoggerChannel.FACTORY))); long newEpoch = -1; while (true) { try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java new file mode 100644 index 0000000000..08020c6025 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.client; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.Callable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; +import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger; +import org.apache.hadoop.hdfs.qjournal.client.QuorumException; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID; +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO; +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment; + + +public class TestQJMWithFaults { + private static final Log LOG = LogFactory.getLog( + TestQJMWithFaults.class); + + private static Configuration conf = new Configuration(); + static { + // Don't retry connections - it just slows down the tests. + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + } + private static long MAX_IPC_NUMBER; + + + /** + * Run through the creation of a log without any faults injected, + * and count how many RPCs are made to each node. This sets the + * bounds for the other test cases, so they can exhaustively explore + * the space of potential failures. + */ + @BeforeClass + public static void determineMaxIpcNumber() throws Exception { + Configuration conf = new Configuration(); + MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build(); + try { + QuorumJournalManager qjm = createInjectableQJM(cluster); + doWorkload(cluster, qjm); + + SortedSet ipcCounts = Sets.newTreeSet(); + for (AsyncLogger l : qjm.getLoggerSetForTests().getLoggersForTests()) { + InvocationCountingChannel ch = (InvocationCountingChannel)l; + ch.waitForAllPendingCalls(); + ipcCounts.add(ch.getRpcCount()); + } + + // All of the loggers should have sent the same number of RPCs, since there + // were no failures. + assertEquals(1, ipcCounts.size()); + + MAX_IPC_NUMBER = ipcCounts.first(); + LOG.info("Max IPC count = " + MAX_IPC_NUMBER); + } finally { + cluster.shutdown(); + } + } + + /** + * Sets up two of the nodes to each drop a single RPC, at all + * possible combinations of RPCs. This may result in the + * active writer failing to write. After this point, a new writer + * should be able to recover and continue writing without + * data loss. + */ + @Test + public void testRecoverAfterDoubleFailures() throws Exception { + for (int failA = 1; failA <= MAX_IPC_NUMBER; failA++) { + for (int failB = 1; failB <= MAX_IPC_NUMBER; failB++) { + String injectionStr = "(" + failA + ", " + failB + ")"; + + LOG.info("\n\n-------------------------------------------\n" + + "Beginning test, failing at " + injectionStr + "\n" + + "-------------------------------------------\n\n"); + + MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf) + .build(); + try { + QuorumJournalManager qjm; + qjm = createInjectableQJM(cluster); + List loggers = qjm.getLoggerSetForTests().getLoggersForTests(); + failIpcNumber(loggers.get(0), failA); + failIpcNumber(loggers.get(1), failB); + int lastAckedTxn = doWorkload(cluster, qjm); + + if (lastAckedTxn < 6) { + LOG.info("Failed after injecting failures at " + injectionStr + + ". This is expected since we injected a failure in the " + + "majority."); + } + + // Now should be able to recover + try { + qjm = createInjectableQJM(cluster); + qjm.recoverUnfinalizedSegments(); + writeSegment(cluster, qjm, lastAckedTxn + 1, 3, true); + // TODO: verify log segments + } catch (Throwable t) { + // Test failure! Rethrow with the test setup info so it can be + // easily triaged. + throw new RuntimeException("Test failed with injection: " + injectionStr, + t); + } + } finally { + cluster.shutdown(); + cluster = null; + } + } + } + } + + /** + * Run a simple workload of becoming the active writer and writing + * two log segments: 1-3 and 4-6. + */ + private static int doWorkload(MiniJournalCluster cluster, + QuorumJournalManager qjm) throws IOException { + int lastAcked = 0; + try { + qjm.recoverUnfinalizedSegments(); + writeSegment(cluster, qjm, 1, 3, true); + lastAcked = 3; + writeSegment(cluster, qjm, 4, 3, true); + lastAcked = 6; + } catch (QuorumException qe) { + LOG.info("Failed to write at txid " + lastAcked, + qe); + } + return lastAcked; + } + + /** + * Inject a failure at the given IPC number, such that the JN never + * receives the RPC. The client side sees an IOException. Future + * IPCs after this number will be received as usual. + */ + private void failIpcNumber(AsyncLogger logger, int idx) { + ((InvocationCountingChannel)logger).failIpcNumber(idx); + } + + private static class InvocationCountingChannel extends IPCLoggerChannel { + private int rpcCount = 0; + private Map> injections = Maps.newHashMap(); + + public InvocationCountingChannel(Configuration conf, NamespaceInfo nsInfo, + String journalId, InetSocketAddress addr) { + super(conf, nsInfo, journalId, addr); + } + + int getRpcCount() { + return rpcCount; + } + + void failIpcNumber(final int idx) { + Preconditions.checkArgument(idx > 0, + "id must be positive"); + inject(idx, new Callable() { + @Override + public Void call() throws Exception { + throw new IOException("injected failed IPC at " + idx); + } + }); + } + + private void inject(int beforeRpcNumber, Callable injectedCode) { + injections.put(beforeRpcNumber, injectedCode); + } + + @Override + protected QJournalProtocol createProxy() throws IOException { + final QJournalProtocol realProxy = super.createProxy(); + QJournalProtocol mock = Mockito.mock(QJournalProtocol.class, + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + rpcCount++; + String callStr = "[" + addr + "] " + + invocation.getMethod().getName() + "(" + + Joiner.on(", ").join(invocation.getArguments()) + ")"; + + Callable inject = injections.get(rpcCount); + if (inject != null) { + LOG.info("Injecting code before IPC #" + rpcCount + ": " + + callStr); + inject.call(); + } else { + LOG.info("IPC call #" + rpcCount + ": " + callStr); + } + + return invocation.getMethod().invoke(realProxy, + invocation.getArguments()); + } + }); + return mock; + } + } + + private static QuorumJournalManager createInjectableQJM(MiniJournalCluster cluster) + throws IOException, URISyntaxException { + AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() { + @Override + public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, + String journalId, InetSocketAddress addr) { + return new InvocationCountingChannel(conf, nsInfo, journalId, addr); + } + }; + return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID), + FAKE_NSINFO, spyFactory); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index f285e35cc3..96b04198d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -18,14 +18,18 @@ package org.apache.hadoop.hdfs.qjournal.client; import static org.junit.Assert.*; +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID; +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO; +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment; +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns; +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits; import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URISyntaxException; -import java.util.Iterator; import java.util.List; -import java.util.SortedSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,14 +37,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; -import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger; -import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel; -import org.apache.hadoop.hdfs.qjournal.client.QuorumException; -import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; +import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import org.apache.hadoop.hdfs.server.namenode.NNStorage; @@ -55,7 +54,6 @@ import org.mockito.Mockito; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; /** * Functional tests for QuorumJournalManager. @@ -65,9 +63,6 @@ public class TestQuorumJournalManager { private static final Log LOG = LogFactory.getLog( TestQuorumJournalManager.class); - private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo( - 12345, "mycluster", "my-bp", 0L, 0); - private static final String JID = "testQuorumJournalManager"; private MiniJournalCluster cluster; private Configuration conf; private QuorumJournalManager qjm; @@ -95,18 +90,20 @@ public void setup() throws Exception { @After public void shutdown() throws IOException { - cluster.shutdown(); + if (cluster != null) { + cluster.shutdown(); + } } @Test public void testSingleWriter() throws Exception { - writeSegment(qjm, 1, 3, true); + writeSegment(cluster, qjm, 1, 3, true); // Should be finalized checkRecovery(cluster, 1, 3); // Start a new segment - writeSegment(qjm, 4, 1, true); + writeSegment(cluster, qjm, 4, 1, true); // Should be finalized checkRecovery(cluster, 4, 4); @@ -119,7 +116,7 @@ public void testReaderWhileAnotherWrites() throws Exception { List streams = Lists.newArrayList(); readerQjm.selectInputStreams(streams, 0, false); assertEquals(0, streams.size()); - writeSegment(qjm, 1, 3, true); + writeSegment(cluster, qjm, 1, 3, true); readerQjm.selectInputStreams(streams, 0, false); try { @@ -138,7 +135,7 @@ public void testReaderWhileAnotherWrites() throws Exception { // Ensure correct results when there is a stream in-progress, but we don't // ask for in-progress. - writeSegment(qjm, 4, 3, false); + writeSegment(cluster, qjm, 4, 3, false); readerQjm.selectInputStreams(streams, 0, false); try { assertEquals(1, streams.size()); @@ -178,13 +175,13 @@ public void testReaderWhileAnotherWrites() throws Exception { */ @Test public void testOneJNMissingSegments() throws Exception { - writeSegment(qjm, 1, 3, true); + writeSegment(cluster, qjm, 1, 3, true); waitForAllPendingCalls(qjm.getLoggerSetForTests()); cluster.getJournalNode(0).stopAndJoin(0); - writeSegment(qjm, 4, 3, true); + writeSegment(cluster, qjm, 4, 3, true); waitForAllPendingCalls(qjm.getLoggerSetForTests()); cluster.restartJournalNode(0); - writeSegment(qjm, 7, 3, true); + writeSegment(cluster, qjm, 7, 3, true); waitForAllPendingCalls(qjm.getLoggerSetForTests()); cluster.getJournalNode(1).stopAndJoin(0); @@ -199,37 +196,6 @@ public void testOneJNMissingSegments() throws Exception { } } - /** - * TODO: this test needs to be fleshed out to be an exhaustive failure test - * @throws Exception - */ - @Test - public void testOrchestratedFailures() throws Exception { - writeSegment(qjm, 1, 3, true); - writeSegment(qjm, 4, 3, true); - - SortedSet serials = Sets.newTreeSet(); - for (AsyncLogger l : qjm.getLoggerSetForTests().getLoggersForTests()) { - IPCLoggerChannel ch = (IPCLoggerChannel)l; - ch.waitForAllPendingCalls(); - serials.add(ch.getNextIpcSerial()); - } - - // All of the loggers should have sent the same number of RPCs, since there - // were no failures. - assertEquals(1, serials.size()); - - long maxSerial = serials.first(); - LOG.info("Max IPC serial = " + maxSerial); - - cluster.shutdown(); - - cluster = new MiniJournalCluster.Builder(conf) - .build(); - qjm = createSpyingQJM(); - spies = qjm.getLoggerSetForTests().getLoggersForTests(); - - } /** * Test case where a new writer picks up from an old one with no failures @@ -238,8 +204,8 @@ public void testOrchestratedFailures() throws Exception { */ @Test public void testChangeWritersLogsInSync() throws Exception { - writeSegment(qjm, 1, 3, false); - assertExistsInQuorum(cluster, + writeSegment(cluster, qjm, 1, 3, false); + QJMTestUtil.assertExistsInQuorum(cluster, NNStorage.getInProgressEditsFileName(1)); // Make a new QJM @@ -301,7 +267,7 @@ private void doOutOfSyncTest(int missingOnRecoveryIdx, qe); } - assertExistsInQuorum(cluster, + QJMTestUtil.assertExistsInQuorum(cluster, NNStorage.getInProgressEditsFileName(1)); // Shut down the specified JN, so it's not present during recovery. @@ -320,7 +286,7 @@ private void failLoggerAtTxn(AsyncLogger spy, long txid) { .when(spy).sendEdits( Mockito.eq(txid), Mockito.eq(1), Mockito.any()); } - + /** * edit lengths [3,4,5] * first recovery: @@ -389,7 +355,7 @@ public void testRecoverAfterIncompleteRecovery() throws Exception { @Test public void testPurgeLogs() throws Exception { for (int txid = 1; txid <= 5; txid++) { - writeSegment(qjm, txid, 1, true); + writeSegment(cluster, qjm, txid, 1, true); } File curDir = cluster.getCurrentDir(0, JID); GenericTestUtils.assertGlobEquals(curDir, "edits_.*", @@ -428,78 +394,18 @@ public void testPurgeLogs() throws Exception { private QuorumJournalManager createSpyingQJM() throws IOException, URISyntaxException { - return new QuorumJournalManager( - conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO) { - @Override - protected List createLoggers() throws IOException { - List realLoggers = super.createLoggers(); - List spies = Lists.newArrayList(); - for (AsyncLogger logger : realLoggers) { - spies.add(Mockito.spy(logger)); - } - return spies; - } - }; - } - - private void writeSegment(QuorumJournalManager qjm, - int startTxId, int numTxns, boolean finalize) throws IOException { - EditLogOutputStream stm = qjm.startLogSegment(startTxId); - // Should create in-progress - assertExistsInQuorum(cluster, - NNStorage.getInProgressEditsFileName(startTxId)); - - writeTxns(stm, startTxId, numTxns); - if (finalize) { - stm.close(); - qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1); - } - } - - private void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns) - throws IOException { - for (long txid = startTxId; txid < startTxId + numTxns; txid++) { - TestQuorumJournalManagerUnit.writeOp(stm, txid); - } - stm.setReadyToFlush(); - stm.flush(); - } - - /** - * Verify that the given list of streams contains exactly the range of - * transactions specified, inclusive. - */ - private void verifyEdits(List streams, - int firstTxnId, int lastTxnId) throws IOException { - - Iterator iter = streams.iterator(); - assertTrue(iter.hasNext()); - EditLogInputStream stream = iter.next(); - - for (int expected = firstTxnId; - expected <= lastTxnId; - expected++) { - - FSEditLogOp op = stream.readOp(); - while (op == null) { - assertTrue("Expected to find txid " + expected + ", " + - "but no more streams available to read from", - iter.hasNext()); - stream = iter.next(); - op = stream.readOp(); + AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() { + @Override + public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, + String journalId, InetSocketAddress addr) { + return Mockito.spy(IPCLoggerChannel.FACTORY.createLogger( + conf, nsInfo, journalId, addr)); } - - assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode); - assertEquals(expected, op.getTransactionId()); - } - - assertNull(stream.readOp()); - assertFalse("Expected no more txns after " + lastTxnId + - " but more streams are available", iter.hasNext()); + }; + return new QuorumJournalManager( + conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory); } - - private static void waitForAllPendingCalls(AsyncLoggerSet als) throws InterruptedException { for (AsyncLogger l : als.getLoggersForTests()) { @@ -508,19 +414,6 @@ private static void waitForAllPendingCalls(AsyncLoggerSet als) } } - private void assertExistsInQuorum(MiniJournalCluster cluster, - String fname) { - int count = 0; - for (int i = 0; i < 3; i++) { - File dir = cluster.getCurrentDir(i, JID); - if (new File(dir, fname).exists()) { - count++; - } - } - assertTrue("File " + fname + " should exist in a quorum of dirs", - count >= cluster.getQuorumSize()); - } - private void checkRecovery(MiniJournalCluster cluster, long segmentTxId, long expectedEndTxId) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java index e879a72921..b25097b631 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java @@ -32,8 +32,6 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; -import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; @@ -47,6 +45,8 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp; + /** * True unit tests for QuorumJournalManager */ @@ -70,7 +70,7 @@ public void setup() throws Exception { qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) { @Override - protected List createLoggers() { + protected List createLoggers(AsyncLogger.Factory factory) { return spyLoggers; } }; @@ -192,10 +192,4 @@ private EditLogOutputStream createLogSegment() throws IOException { EditLogOutputStream stm = qjm.startLogSegment(1); return stm; } - - static void writeOp(EditLogOutputStream stm, long txid) throws IOException { - FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid); - op.setTransactionId(txid); - stm.write(op); - } }