diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt index e305dd1995..0a62182139 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt @@ -8,3 +8,5 @@ HDFS-3694. Fix getEditLogManifest to fetch httpPort if necessary (todd) HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs (todd) HDFS-3693. JNStorage should read its storage info even before a writer becomes active (todd) + +HDFS-3725. Fix QJM startup when individual JNs have gaps (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java index 60e41a615e..0b3a0318f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java @@ -40,8 +40,8 @@ public RemoteEditLogManifest(List logs) { /** - * Check that the logs are contiguous and non-overlapping - * sequences of transactions, in sorted order + * Check that the logs are non-overlapping sequences of transactions, + * in sorted order. They do not need to be contiguous. * @throws IllegalStateException if incorrect */ private void checkState() { @@ -50,8 +50,10 @@ private void checkState() { RemoteEditLog prev = null; for (RemoteEditLog log : logs) { if (prev != null) { - if (log.getStartTxId() != prev.getEndTxId() + 1) { - throw new IllegalStateException("Invalid log manifest:" + this); + if (log.getStartTxId() <= prev.getEndTxId()) { + throw new IllegalStateException( + "Invalid log manifest (log " + log + " overlaps " + prev + ")\n" + + this); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index 7accd73f73..f285e35cc3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -23,12 +23,15 @@ import java.io.File; import java.io.IOException; import java.net.URISyntaxException; +import java.util.Iterator; import java.util.List; import java.util.SortedSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger; import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel; @@ -43,7 +46,9 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -67,10 +72,17 @@ public class TestQuorumJournalManager { private Configuration conf; private QuorumJournalManager qjm; private List spies; + + static { + ((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.ALL); + } @Before public void setup() throws Exception { conf = new Configuration(); + // Don't retry connections - it just slows down the tests. + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + cluster = new MiniJournalCluster.Builder(conf) .build(); @@ -117,11 +129,7 @@ public void testReaderWhileAnotherWrites() throws Exception { assertEquals(1, stream.getFirstTxId()); assertEquals(3, stream.getLastTxId()); - for (int i = 1; i <= 3; i++) { - FSEditLogOp op = stream.readOp(); - assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode); - assertEquals(i, op.getTransactionId()); - } + verifyEdits(streams, 1, 3); assertNull(stream.readOp()); } finally { IOUtils.cleanup(LOG, streams.toArray(new Closeable[0])); @@ -137,6 +145,7 @@ public void testReaderWhileAnotherWrites() throws Exception { EditLogInputStream stream = streams.get(0); assertEquals(1, stream.getFirstTxId()); assertEquals(3, stream.getLastTxId()); + verifyEdits(streams, 1, 3); } finally { IOUtils.cleanup(LOG, streams.toArray(new Closeable[0])); streams.clear(); @@ -153,12 +162,43 @@ public void testReaderWhileAnotherWrites() throws Exception { assertEquals(2, streams.size()); assertEquals(4, streams.get(1).getFirstTxId()); assertEquals(6, streams.get(1).getLastTxId()); + + verifyEdits(streams, 1, 6); } finally { IOUtils.cleanup(LOG, streams.toArray(new Closeable[0])); streams.clear(); } } + /** + * Regression test for HDFS-3725. One of the journal nodes is down + * during the writing of one segment, then comes back up later to + * take part in a later segment. Thus, its local edits are + * not a contiguous sequence. This should be handled correctly. + */ + @Test + public void testOneJNMissingSegments() throws Exception { + writeSegment(qjm, 1, 3, true); + waitForAllPendingCalls(qjm.getLoggerSetForTests()); + cluster.getJournalNode(0).stopAndJoin(0); + writeSegment(qjm, 4, 3, true); + waitForAllPendingCalls(qjm.getLoggerSetForTests()); + cluster.restartJournalNode(0); + writeSegment(qjm, 7, 3, true); + waitForAllPendingCalls(qjm.getLoggerSetForTests()); + cluster.getJournalNode(1).stopAndJoin(0); + + QuorumJournalManager readerQjm = createSpyingQJM(); + List streams = Lists.newArrayList(); + try { + readerQjm.selectInputStreams(streams, 1, false); + verifyEdits(streams, 1, 9); + } finally { + IOUtils.cleanup(LOG, streams.toArray(new Closeable[0])); + readerQjm.close(); + } + } + /** * TODO: this test needs to be fleshed out to be an exhaustive failure test * @throws Exception @@ -425,6 +465,41 @@ private void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns) stm.flush(); } + /** + * Verify that the given list of streams contains exactly the range of + * transactions specified, inclusive. + */ + private void verifyEdits(List streams, + int firstTxnId, int lastTxnId) throws IOException { + + Iterator iter = streams.iterator(); + assertTrue(iter.hasNext()); + EditLogInputStream stream = iter.next(); + + for (int expected = firstTxnId; + expected <= lastTxnId; + expected++) { + + FSEditLogOp op = stream.readOp(); + while (op == null) { + assertTrue("Expected to find txid " + expected + ", " + + "but no more streams available to read from", + iter.hasNext()); + stream = iter.next(); + op = stream.readOp(); + } + + assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode); + assertEquals(expected, op.getTransactionId()); + } + + assertNull(stream.readOp()); + assertFalse("Expected no more txns after " + lastTxnId + + " but more streams are available", iter.hasNext()); + } + + + private static void waitForAllPendingCalls(AsyncLoggerSet als) throws InterruptedException { for (AsyncLogger l : als.getLoggersForTests()) {