diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index cea18b7f00..b6ddd22d76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -893,7 +893,8 @@ public long loadEdits(Iterable editStreams, StartupProgress prog = NameNode.getStartupProgress(); prog.beginPhase(Phase.LOADING_EDITS); - long prevLastAppliedTxId = lastAppliedTxId; + long prevLastAppliedTxId = lastAppliedTxId; + long remainingReadTxns = maxTxnsToRead; try { FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId); @@ -910,8 +911,8 @@ public long loadEdits(Iterable editStreams, (lastAppliedTxId + 1) + logSuppressed); } try { - loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead, - startOpt, recovery); + remainingReadTxns -= loader.loadFSEdits(editIn, lastAppliedTxId + 1, + remainingReadTxns, startOpt, recovery); } finally { // Update lastAppliedTxId even in case of error, since some ops may // have been successfully applied before the error. @@ -922,6 +923,9 @@ public long loadEdits(Iterable editStreams, && recovery != null) { lastAppliedTxId = editIn.getLastTxId(); } + if (remainingReadTxns <= 0) { + break; + } } } finally { FSEditLog.closeAllStreams(editStreams); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index bf67ddd6bd..64cb16e458 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -307,6 +307,87 @@ private long testLoad(byte[] data, FSNamesystem namesys) throws IOException { return loader.loadFSEdits(new EditLogByteInputStream(data), 1); } + @Test + public void testMultiStreamsLoadEditWithConfMaxTxns() + throws IOException { + Configuration conf = getConf(); + MiniDFSCluster cluster = null; + FileSystem fileSystem = null; + FSImage writeFsImage = null; + try { + cluster = new MiniDFSCluster + .Builder(conf) + .numDataNodes(NUM_DATA_NODES) + .build(); + cluster.waitActive(); + fileSystem = cluster.getFileSystem(); + final FSNamesystem namesystem = cluster.getNamesystem(); + writeFsImage = namesystem.getFSImage(); + for (Iterator it = cluster.getNameDirs(0) + .iterator(); it.hasNext();) { + File dir = new File(it.next().getPath()); + System.out.println(dir); + } + // Roll log so new output buffer size takes effect + // we should now be writing to edits_inprogress_3 + long originalLastInodeId = namesystem.dir.getLastInodeId(); + // Reopen some files as for append + Transactions trans = new Transactions( + namesystem, NUM_TRANSACTIONS, NUM_TRANSACTIONS / 2); + trans.run(); + // Roll another time to finalize edits_inprogress_3 + writeFsImage.rollEditLog(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + Transactions trans1 = new Transactions( + namesystem, NUM_TRANSACTIONS, NUM_TRANSACTIONS / 2); + trans1.run(); + writeFsImage.rollEditLog(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + namesystem.dir.resetLastInodeIdWithoutChecking(originalLastInodeId); + for(Iterator it = writeFsImage.getStorage(). + dirIterator(NameNodeDirType.EDITS); it.hasNext();){ + long expectedTxns = (2 * NUM_TRANSACTIONS) + 2; + File editFile = NNStorage.getFinalizedEditsFile(it.next(), + 1, expectedTxns); + File editFile1 = NNStorage.getFinalizedEditsFile(it.next(), + 203, 404); + assertTrue("Expect " + editFile + " exists", editFile.exists()); + assertTrue("Expect " + editFile1 + " exists", editFile1.exists()); + EditLogFileInputStream editLogFileInputStream1 = + new EditLogFileInputStream(editFile, 1, 202, false); + EditLogFileInputStream editLogFileInputStream2 = + new EditLogFileInputStream(editFile1, 203, 404, false); + List editStreams = Lists.newArrayList(); + editStreams.add(editLogFileInputStream1); + editStreams.add(editLogFileInputStream2); + FSImage readFsImage = new FSImage(conf); + try { + readFsImage.loadEdits(editStreams, namesystem, 100, null, null); + } catch (Exception e){ + LOG.error("There appears to be an out-of-order edit in the edit log", + e.getMessage()); + fail("no exception should be thrown"); + } finally { + if (readFsImage != null) { + readFsImage.close(); + } + } + } + } finally { + try { + if(fileSystem != null) { + fileSystem.close(); + } + if(cluster != null) { + cluster.shutdown(); + } + if(writeFsImage != null){ + writeFsImage.close(); + } + } catch (Throwable t) { + LOG.error("Couldn't shut down cleanly", t); + } + } + } + /** * Simple test for writing to and rolling the edit log. */