diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 275dce248b..afc6cf49d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1289,6 +1289,9 @@ Release 2.8.0 - UNRELEASED HDFS-9009. Send metrics logs to NullAppender by default. (Arpit Agarwal) + HDFS-8964. When validating the edit log, do not read at or beyond the file + offset that is being written (Zhe Zhang via Colin P. McCabe) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java index a46f9cf0ed..e5b9d01bd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java @@ -33,7 +33,8 @@ public class FSEditLogTestUtil { public static long countTransactionsInStream(EditLogInputStream in) throws IOException { - FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in); + FSEditLogLoader.EditLogValidation validation = + FSEditLogLoader.validateEditLog(in, Long.MAX_VALUE); return (validation.getEndTxId() - in.getFirstTxId()) + 1; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index 29530554ab..813f267031 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -151,7 +151,7 @@ public class Journal implements Closeable { EditLogFile latest = scanStorageForLatestEdits(); if (latest != null) { - highestWrittenTxId = latest.getLastTxId(); + updateHighestWrittenTxId(latest.getLastTxId()); } } @@ -266,7 +266,17 @@ public class Journal implements Closeable { synchronized long getHighestWrittenTxId() { return highestWrittenTxId; } - + + /** + * Update the highest Tx ID that has been written to the journal. Also update + * the {@link FileJournalManager#lastReadableTxId} of the underlying fjm. + * @param val The new value + */ + private void updateHighestWrittenTxId(long val) { + highestWrittenTxId = val; + fjm.setLastReadableTxId(val); + } + @VisibleForTesting JournalMetrics getMetricsForTests() { return metrics; @@ -399,7 +409,7 @@ public class Journal implements Closeable { metrics.bytesWritten.incr(records.length); metrics.txnsWritten.incr(numTxns); - highestWrittenTxId = lastTxnId; + updateHighestWrittenTxId(lastTxnId); nextTxId = lastTxnId + 1; } @@ -782,8 +792,8 @@ public class Journal implements Closeable { ": no current segment in place"); // Update the highest txid for lag metrics - highestWrittenTxId = Math.max(segment.getEndTxId(), - highestWrittenTxId); + updateHighestWrittenTxId(Math.max(segment.getEndTxId(), + highestWrittenTxId)); } else { LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + ": old segment " + TextFormat.shortDebugString(currentSegment) + @@ -812,7 +822,7 @@ public class Journal implements Closeable { // If we're shortening the log, update our highest txid // used for lag metrics. if (txnRange(currentSegment).containsLong(highestWrittenTxId)) { - highestWrittenTxId = segment.getEndTxId(); + updateHighestWrittenTxId(segment.getEndTxId()); } } syncedFile = syncLog(reqInfo, segment, fromUrl); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java index 73a162ebb1..3bf0ab4ad6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java @@ -300,8 +300,17 @@ public class EditLogFileInputStream extends EditLogInputStream { return getName(); } - static FSEditLogLoader.EditLogValidation validateEditLog(File file) - throws IOException { + /** + * @param file File being validated. + * @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation + * returns after reading this or a higher ID. + * The file portion beyond this ID is potentially + * being updated. + * @return Result of the validation + * @throws IOException + */ + static FSEditLogLoader.EditLogValidation validateEditLog(File file, + long maxTxIdToValidate) throws IOException { EditLogFileInputStream in; try { in = new EditLogFileInputStream(file); @@ -314,7 +323,7 @@ public class EditLogFileInputStream extends EditLogInputStream { } try { - return FSEditLogLoader.validateEditLog(in); + return FSEditLogLoader.validateEditLog(in, maxTxIdToValidate); } finally { IOUtils.closeStream(in); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index edf88c9b93..e255cff75c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -674,6 +674,16 @@ public class FSEditLog implements LogsPurgeable { synchronized (this) { if (sync) { synctxid = syncStart; + for (JournalManager jm : journalSet.getJournalManagers()) { + /** + * {@link FileJournalManager#lastReadableTxId} is only meaningful + * for file-based journals. Therefore the interface is not added to + * other types of {@link JournalManager}. + */ + if (jm instanceof FileJournalManager) { + ((FileJournalManager)jm).setLastReadableTxId(syncStart); + } + } isSyncRunning = false; } this.notifyAll(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index fc0bb78e0c..bb36ca22cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -1112,8 +1112,14 @@ public class FSEditLogLoader { * If there are invalid or corrupt transactions in the middle of the stream, * validateEditLog will skip over them. * This reads through the stream but does not close it. + * + * @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation + * returns after reading this or a higher ID. + * The file portion beyond this ID is potentially + * being updated. */ - static EditLogValidation validateEditLog(EditLogInputStream in) { + static EditLogValidation validateEditLog(EditLogInputStream in, + long maxTxIdToValidate) { long lastPos = 0; long lastTxId = HdfsServerConstants.INVALID_TXID; long numValid = 0; @@ -1136,6 +1142,10 @@ public class FSEditLogLoader { || op.getTransactionId() > lastTxId) { lastTxId = op.getTransactionId(); } + if (lastTxId >= maxTxIdToValidate) { + break; + } + numValid++; } return new EditLogValidation(lastPos, lastTxId, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index ebd747549e..a1488eb09b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -76,6 +76,15 @@ public class FileJournalManager implements JournalManager { private File currentInProgress = null; + /** + * A FileJournalManager should maintain the largest Tx ID that has been + * safely written to its edit log files. + * It should limit readers to read beyond this ID to avoid potential race + * with ongoing writers. + * Initial value indicates that all transactions can be read. + */ + private long lastReadableTxId = Long.MAX_VALUE; + @VisibleForTesting StoragePurger purger = new NNStorageRetentionManager.DeletionStoragePurger(); @@ -159,6 +168,15 @@ public class FileJournalManager implements JournalManager { this.outputBufferCapacity = size; } + + public long getLastReadableTxId() { + return lastReadableTxId; + } + + public void setLastReadableTxId(long id) { + this.lastReadableTxId = id; + } + @Override public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException { @@ -193,7 +211,7 @@ public class FileJournalManager implements JournalManager { } if (elf.isInProgress()) { try { - elf.validateLog(); + elf.validateLog(getLastReadableTxId()); } catch (IOException e) { LOG.error("got IOException while trying to validate header of " + elf + ". Skipping.", e); @@ -325,11 +343,13 @@ public class FileJournalManager implements JournalManager { (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") + "from among " + elfs.size() + " candidate file(s)"); } - addStreamsToCollectionFromFiles(elfs, streams, fromTxId, inProgressOk); + addStreamsToCollectionFromFiles(elfs, streams, fromTxId, + getLastReadableTxId(), inProgressOk); } static void addStreamsToCollectionFromFiles(Collection elfs, - Collection streams, long fromTxId, boolean inProgressOk) { + Collection streams, long fromTxId, long maxTxIdToValidate, + boolean inProgressOk) { for (EditLogFile elf : elfs) { if (elf.isInProgress()) { if (!inProgressOk) { @@ -340,7 +360,7 @@ public class FileJournalManager implements JournalManager { continue; } try { - elf.validateLog(); + elf.validateLog(maxTxIdToValidate); } catch (IOException e) { LOG.error("got IOException while trying to validate header of " + elf + ". Skipping.", e); @@ -384,7 +404,7 @@ public class FileJournalManager implements JournalManager { continue; } - elf.validateLog(); + elf.validateLog(getLastReadableTxId()); if (elf.hasCorruptHeader()) { elf.moveAsideCorruptFile(); @@ -516,9 +536,14 @@ public class FileJournalManager implements JournalManager { * Find out where the edit log ends. * This will update the lastTxId of the EditLogFile or * mark it as corrupt if it is. + * @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation + * returns after reading this or a higher ID. + * The file portion beyond this ID is potentially + * being updated. */ - public void validateLog() throws IOException { - EditLogValidation val = EditLogFileInputStream.validateEditLog(file); + public void validateLog(long maxTxIdToValidate) throws IOException { + EditLogValidation val = EditLogFileInputStream.validateEditLog(file, + maxTxIdToValidate); this.lastTxId = val.getEndTxId(); this.hasCorruptHeader = val.hasCorruptHeader(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java index 2267853780..e3e0a7d859 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java @@ -907,7 +907,7 @@ public class SecondaryNameNode implements Runnable, throw new RuntimeException(ioe); } FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams, - fromTxId, inProgressOk); + fromTxId, Long.MAX_VALUE, inProgressOk); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java index 9401d076d5..d5e64ae5e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java @@ -88,7 +88,7 @@ public class TestCheckPointForSecurityTokens { for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) { EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd); assertTrue(log.isInProgress()); - log.validateLog(); + log.validateLog(Long.MAX_VALUE); long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1; assertEquals("In-progress log " + log + " should have 5 transactions", 5, numTransactions);; @@ -105,7 +105,7 @@ public class TestCheckPointForSecurityTokens { for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) { EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd); assertTrue(log.isInProgress()); - log.validateLog(); + log.validateLog(Long.MAX_VALUE); long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1; assertEquals("In-progress log " + log + " should only have START txn", 1, numTransactions); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index e59dec400e..0495860cba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -66,6 +66,8 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSInotifyEventInputStream; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -83,6 +85,9 @@ import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.LogManager; +import org.apache.log4j.spi.LoggingEvent; import org.junit.Test; import org.mockito.Mockito; import org.xml.sax.ContentHandler; @@ -1223,7 +1228,8 @@ public class TestEditLog { TXNS_PER_ROLL*11); for (EditLogInputStream edits : editStreams) { - FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(edits); + FSEditLogLoader.EditLogValidation val = + FSEditLogLoader.validateEditLog(edits, Long.MAX_VALUE); long read = (val.getEndTxId() - edits.getFirstTxId()) + 1; LOG.info("Loading edits " + edits + " read " + read); assertEquals(startTxId, edits.getFirstTxId()); @@ -1573,4 +1579,99 @@ public class TestEditLog { } } } + + class TestAppender extends AppenderSkeleton { + private final List log = new ArrayList<>(); + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + protected void append(final LoggingEvent loggingEvent) { + log.add(loggingEvent); + } + + @Override + public void close() { + } + + public List getLog() { + return new ArrayList<>(log); + } + } + + /** + * + * @throws Exception + */ + @Test + public void testReadActivelyUpdatedLog() throws Exception { + final TestAppender appender = new TestAppender(); + LogManager.getRootLogger().addAppender(appender); + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); + // Set single handler thread, so all transactions hit same thread-local ops. + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 1); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + FSImage fsimage = cluster.getNamesystem().getFSImage(); + StorageDirectory sd = fsimage.getStorage().getStorageDir(0); + + final DistributedFileSystem fileSys = cluster.getFileSystem(); + DFSInotifyEventInputStream events = fileSys.getInotifyEventStream(); + fileSys.mkdirs(new Path("/test")); + fileSys.mkdirs(new Path("/test/dir1")); + fileSys.delete(new Path("/test/dir1"), true); + fsimage.getEditLog().logSync(); + fileSys.mkdirs(new Path("/test/dir2")); + + + final File inProgressEdit = NNStorage.getInProgressEditsFile(sd, 1); + assertTrue(inProgressEdit.exists()); + EditLogFileInputStream elis = new EditLogFileInputStream(inProgressEdit); + FSEditLogOp op; + long pos = 0; + + while (true) { + op = elis.readOp(); + if (op != null && op.opCode != FSEditLogOpCodes.OP_INVALID) { + pos = elis.getPosition(); + } else { + break; + } + } + elis.close(); + assertTrue(pos > 0); + + RandomAccessFile rwf = new RandomAccessFile(inProgressEdit, "rw"); + rwf.seek(pos); + assertEquals(rwf.readByte(), (byte) -1); + + rwf.seek(pos + 1); + rwf.writeByte(2); + + rwf.close(); + + events.poll(); + String pattern = "Caught exception after reading (.*) ops"; + Pattern r = Pattern.compile(pattern); + final List log = appender.getLog(); + for (LoggingEvent event : log) { + Matcher m = r.matcher(event.getRenderedMessage()); + if (m.find()) { + fail("Should not try to read past latest syned edit log op"); + } + } + + } finally { + if (cluster != null) { + cluster.shutdown(); + } + LogManager.getRootLogger().removeAppender(appender); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index 55ba379c81..3c3423a591 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@ -318,7 +318,8 @@ public class TestFSEditLogLoader { } finally { rwf.close(); } - EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile); + EditLogValidation validation = + EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE); assertTrue(validation.hasCorruptHeader()); } @@ -333,7 +334,7 @@ public class TestFSEditLogLoader { File logFileBak = new File(testDir, logFile.getName() + ".bak"); Files.copy(logFile, logFileBak); EditLogValidation validation = - EditLogFileInputStream.validateEditLog(logFile); + EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE); assertTrue(!validation.hasCorruptHeader()); // We expect that there will be an OP_START_LOG_SEGMENT, followed by // NUM_TXNS opcodes, followed by an OP_END_LOG_SEGMENT. @@ -346,7 +347,8 @@ public class TestFSEditLogLoader { // Restore backup, corrupt the txn opcode Files.copy(logFileBak, logFile); corruptByteInFile(logFile, txOffset); - validation = EditLogFileInputStream.validateEditLog(logFile); + validation = EditLogFileInputStream.validateEditLog(logFile, + Long.MAX_VALUE); long expectedEndTxId = (txId == (NUM_TXNS + 1)) ? NUM_TXNS : (NUM_TXNS + 1); assertEquals("Failed when corrupting txn opcode at " + txOffset, @@ -363,7 +365,8 @@ public class TestFSEditLogLoader { // Restore backup, corrupt the txn opcode Files.copy(logFileBak, logFile); truncateFile(logFile, txOffset); - validation = EditLogFileInputStream.validateEditLog(logFile); + validation = EditLogFileInputStream.validateEditLog(logFile, + Long.MAX_VALUE); long expectedEndTxId = (txId == 0) ? HdfsServerConstants.INVALID_TXID : (txId - 1); assertEquals("Failed when corrupting txid " + txId + " txn opcode " + @@ -381,7 +384,7 @@ public class TestFSEditLogLoader { // layout flags section. truncateFile(logFile, 8); EditLogValidation validation = - EditLogFileInputStream.validateEditLog(logFile); + EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE); assertTrue(!validation.hasCorruptHeader()); assertEquals(HdfsServerConstants.INVALID_TXID, validation.getEndTxId()); }