diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java index 0ef54a87c5..b7d652395c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java @@ -52,12 +52,7 @@ class JNStorage extends Storage { private final StorageDirectory sd; private StorageState state; - private static final List CURRENT_DIR_PURGE_REGEXES = - ImmutableList.of( - Pattern.compile("edits_\\d+-(\\d+)"), - Pattern.compile("edits_inprogress_(\\d+)(?:\\..*)?")); - - private static final List PAXOS_DIR_PURGE_REGEXES = + private static final List PAXOS_DIR_PURGE_REGEXES = ImmutableList.of(Pattern.compile("(\\d+)")); private static final String STORAGE_EDITS_SYNC = "edits.sync"; @@ -181,8 +176,8 @@ File getRoot() { * the given txid. */ void purgeDataOlderThan(long minTxIdToKeep) throws IOException { - purgeMatching(sd.getCurrentDir(), - CURRENT_DIR_PURGE_REGEXES, minTxIdToKeep); + fjm.purgeLogsOlderThan(minTxIdToKeep); + purgeMatching(getOrCreatePaxosDir(), PAXOS_DIR_PURGE_REGEXES, minTxIdToKeep); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index d08a64497f..78394ab152 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -75,7 +75,8 @@ public class FileJournalManager implements JournalManager { private static final Pattern EDITS_INPROGRESS_STALE_REGEX = Pattern.compile( NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+).*(\\S+)"); - private File currentInProgress = null; + @VisibleForTesting + File currentInProgress = null; /** * A FileJournalManager should maintain the largest Tx ID that has been @@ -178,20 +179,50 @@ public void setLastReadableTxId(long id) { this.lastReadableTxId = id; } + /** + * Purges the unnecessary edits and edits_inprogress files. + * + * Edits files that are ending before the minTxIdToKeep are purged. + * Edits in progress files that are starting before minTxIdToKeep are purged. + * Edits in progress files that are marked as empty, trash, corrupted or + * stale by file extension and starting before minTxIdToKeep are purged. + * Edits in progress files that are after minTxIdToKeep, but before the + * current edits in progress files are marked as stale for clarity. + * + * In case file removal or rename is failing a warning is logged, but that + * does not fail the operation. + * + * @param minTxIdToKeep the lowest transaction ID that should be retained + * @throws IOException if listing the storage directory fails. + */ @Override public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException { LOG.info("Purging logs older than " + minTxIdToKeep); File[] files = FileUtil.listFiles(sd.getCurrentDir()); List editLogs = matchEditLogs(files, true); - for (EditLogFile log : editLogs) { - if (log.getFirstTxId() < minTxIdToKeep && - log.getLastTxId() < minTxIdToKeep) { - purger.purgeLog(log); + synchronized (this) { + for (EditLogFile log : editLogs) { + if (log.getFirstTxId() < minTxIdToKeep && + log.getLastTxId() < minTxIdToKeep) { + purger.purgeLog(log); + } else if (isStaleInProgressLog(minTxIdToKeep, log)) { + purger.markStale(log); + } } } } + private boolean isStaleInProgressLog(long minTxIdToKeep, EditLogFile log) { + return log.isInProgress() && + !log.getFile().equals(currentInProgress) && + log.getFirstTxId() >= minTxIdToKeep && + // at last we check if this segment is not already marked as .trash, + // .empty or .corrupted, in which case it does not match the strict + // regex pattern. + EDITS_INPROGRESS_REGEX.matcher(log.getFile().getName()).matches(); + } + /** * Find all editlog segments starting at or above the given txid. * @param firstTxId the txnid which to start looking @@ -596,7 +627,12 @@ public void moveAsideEmptyFile() throws IOException { assert lastTxId == HdfsServerConstants.INVALID_TXID; renameSelf(".empty"); } - + + public void moveAsideStaleInprogressFile() throws IOException { + assert isInProgress; + renameSelf(".stale"); + } + private void renameSelf(String newSuffix) throws IOException { File src = file; File dst = new File(src.getParent(), src.getName() + newSuffix); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java index d0fa00314d..aaaaa7210c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java @@ -207,21 +207,22 @@ private long getImageTxIdToRetain( /** * Interface responsible for disposing of old checkpoints and edit logs. */ - static interface StoragePurger { + interface StoragePurger { void purgeLog(EditLogFile log); void purgeImage(FSImageFile image); + void markStale(EditLogFile log); } static class DeletionStoragePurger implements StoragePurger { @Override public void purgeLog(EditLogFile log) { - LOG.info("Purging old edit log " + log); + LOG.info("Purging old edit log {}", log); deleteOrWarn(log.getFile()); } @Override public void purgeImage(FSImageFile image) { - LOG.info("Purging old image " + image); + LOG.info("Purging old image {}", image); deleteOrWarn(image.getFile()); deleteOrWarn(MD5FileUtils.getDigestFileForFile(image.getFile())); } @@ -230,9 +231,19 @@ private static void deleteOrWarn(File file) { if (!file.delete()) { // It's OK if we fail to delete something -- we'll catch it // next time we swing through this directory. - LOG.warn("Could not delete " + file); + LOG.warn("Could not delete {}", file); } } + + public void markStale(EditLogFile log){ + try { + log.moveAsideStaleInprogressFile(); + } catch (IOException e) { + // It is ok to just log the rename failure and go on, we will try next + // time just as with deletions. + LOG.warn("Could not mark {} as stale", log, e); + } + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java index 546f730a11..7db699ef30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java @@ -25,12 +25,17 @@ import java.io.File; import java.io.IOException; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.ToLongFunction; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; @@ -215,6 +220,8 @@ public void testRetainExtraLogsLimitedSegments() throws IOException { // Segments containing txns upto txId 250 are extra and should be purged. tc.addLog("/foo2/current/" + getFinalizedEditsFileName(1, 100), true); + tc.addLog("/foo2/current/" + getInProgressEditsFileName(101) + ".trash", + true); tc.addLog("/foo2/current/" + getFinalizedEditsFileName(101, 175), true); tc.addLog("/foo2/current/" + getInProgressEditsFileName(176) + ".empty", true); @@ -226,6 +233,8 @@ public void testRetainExtraLogsLimitedSegments() throws IOException { // Only retain 2 extra segments. The 301-350 and 351-400 segments are // considered required, not extra. tc.addLog("/foo2/current/" + getFinalizedEditsFileName(241, 275), false); + tc.addLog("/foo2/current/" + getInProgressEditsFileName(276) + ".trash", + false); tc.addLog("/foo2/current/" + getFinalizedEditsFileName(276, 300), false); tc.addLog("/foo2/current/" + getInProgressEditsFileName(301) + ".empty", false); @@ -236,14 +245,53 @@ public void testRetainExtraLogsLimitedSegments() throws IOException { tc.addLog("/foo2/current/" + getInProgressEditsFileName(401), false); runTest(tc); } + + /* We are checking here the JournalNode environment hence added the paxos + * directory, but as the test here is about the FileJournalManager it happens + * via the NNStorageRetentionManager and that needs the fsImage files as well + * to be present in the folder to calculate the minimum transaction id we want + * to keep based on the config. + */ + @Test + public void testExtraInprogressFilesAreRemovedOrMarkedStale() + throws IOException { + conf.setLong(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, 150); + TestCaseDescription tc = new TestCaseDescription(); + tc.addRoot("/foo", NameNodeDirType.IMAGE_AND_EDITS); + final String PATH = "/foo/current/"; + + tc.addImage(PATH + getImageFileName(200), true); + tc.addImage(PATH + getImageFileName(300), false); + tc.addImage(PATH + getImageFileName(400), false); + + File file = Mockito.spy(new File(PATH + "paxos")); + Mockito.when(file.isDirectory()).thenReturn(true); + tc.addFile(file); + + tc.addLog(PATH + getFinalizedEditsFileName(1,75), true); + tc.addLog(PATH + getInProgressEditsFileName(76), true); + tc.addLog(PATH + getFinalizedEditsFileName(76, 120), true); + tc.addLog(PATH + getInProgressEditsFileName(121) + ".stale", true); + tc.addLog(PATH + getFinalizedEditsFileName(121, 150), true); + // everything down from here should be kept. + tc.addLog(PATH + getInProgressEditsFileName(151), false, true); + tc.addLog(PATH + getFinalizedEditsFileName(151, 320), false); + tc.addLog(PATH + getInProgressEditsFileName(321), false, true); + tc.addLog(PATH + getFinalizedEditsFileName(321, 430), false); + tc.addLog(PATH + getInProgressEditsFileName(431), false); + + runTest(tc); + } private void runTest(TestCaseDescription tc) throws IOException { StoragePurger mockPurger = Mockito.mock(NNStorageRetentionManager.StoragePurger.class); ArgumentCaptor imagesPurgedCaptor = - ArgumentCaptor.forClass(FSImageFile.class); + ArgumentCaptor.forClass(FSImageFile.class); ArgumentCaptor logsPurgedCaptor = - ArgumentCaptor.forClass(EditLogFile.class); + ArgumentCaptor.forClass(EditLogFile.class); + ArgumentCaptor staleLogsCaptor = + ArgumentCaptor.forClass(EditLogFile.class); // Ask the manager to purge files we don't need any more new NNStorageRetentionManager(conf, @@ -255,31 +303,43 @@ private void runTest(TestCaseDescription tc) throws IOException { .purgeImage(imagesPurgedCaptor.capture()); Mockito.verify(mockPurger, Mockito.atLeast(0)) .purgeLog(logsPurgedCaptor.capture()); + Mockito.verify(mockPurger, Mockito.atLeast(0)) + .markStale(staleLogsCaptor.capture()); + Set capturedPaths = Sets.newLinkedHashSet(); // Check images - Set purgedPaths = Sets.newLinkedHashSet(); - for (FSImageFile purged : imagesPurgedCaptor.getAllValues()) { - purgedPaths.add(fileToPath(purged.getFile())); - } - Assert.assertEquals( + for (FSImageFile captured : imagesPurgedCaptor.getAllValues()) { + capturedPaths.add(fileToPath(captured.getFile())); + } + Assert.assertEquals("Image file check.", Joiner.on(",").join(filesToPaths(tc.expectedPurgedImages)), - Joiner.on(",").join(purgedPaths)); + Joiner.on(",").join(capturedPaths)); - // Check images - purgedPaths.clear(); - for (EditLogFile purged : logsPurgedCaptor.getAllValues()) { - purgedPaths.add(fileToPath(purged.getFile())); - } - Assert.assertEquals( + capturedPaths.clear(); + // Check edit logs, and also in progress edits older than minTxIdToKeep + for (EditLogFile captured : logsPurgedCaptor.getAllValues()) { + capturedPaths.add(fileToPath(captured.getFile())); + } + Assert.assertEquals("Check old edits are removed.", Joiner.on(",").join(filesToPaths(tc.expectedPurgedLogs)), - Joiner.on(",").join(purgedPaths)); + Joiner.on(",").join(capturedPaths)); + + capturedPaths.clear(); + // Check in progress edits to keep are marked as stale + for (EditLogFile captured : staleLogsCaptor.getAllValues()) { + capturedPaths.add(fileToPath(captured.getFile())); + } + Assert.assertEquals("Check unnecessary but kept edits are marked stale", + Joiner.on(",").join(filesToPaths(tc.expectedStaleLogs)), + Joiner.on(",").join(capturedPaths)); } - + private class TestCaseDescription { private final Map dirRoots = Maps.newLinkedHashMap(); private final Set expectedPurgedLogs = Sets.newLinkedHashSet(); private final Set expectedPurgedImages = Sets.newLinkedHashSet(); - + private final Set expectedStaleLogs = Sets.newLinkedHashSet(); + private class FakeRoot { final NameNodeDirType type; final List files; @@ -307,13 +367,20 @@ private void addFile(File file) { } } } - + void addLog(String path, boolean expectPurge) { + addLog(path, expectPurge, false); + } + + void addLog(String path, boolean expectPurge, boolean expectStale) { File file = new File(path); addFile(file); if (expectPurge) { expectedPurgedLogs.add(file); } + if (expectStale) { + expectedStaleLogs.add(file); + } } void addImage(String path, boolean expectPurge) { @@ -331,7 +398,22 @@ NNStorage mockStorage() throws IOException { } return mockStorageForDirs(sds.toArray(new StorageDirectory[0])); } - + + private File findLastInProgressEdit(FakeRoot root){ + Pattern p = Pattern.compile( + NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)"); + ToLongFunction fileNameToTxId = + f -> { + Matcher m = p.matcher(f.getName()); + return m.matches() ? + Long.parseLong(m.group(1)): + HdfsServerConstants.INVALID_TXID; + }; + return root.files.stream(). + sorted(Comparator.comparingLong(fileNameToTxId).reversed()). + findFirst().orElse(null); + } + @SuppressWarnings("unchecked") public FSEditLog mockEditLog(StoragePurger purger) throws IOException { final List jms = Lists.newArrayList(); @@ -342,36 +424,28 @@ public FSEditLog mockEditLog(StoragePurger purger) throws IOException { // passing null NNStorage for unit test because it does not use it FileJournalManager fjm = new FileJournalManager(conf, root.mockStorageDir(), null); + fjm.currentInProgress = findLastInProgressEdit(root); fjm.purger = purger; jms.add(fjm); journalSet.add(fjm, false); } FSEditLog mockLog = Mockito.mock(FSEditLog.class); - Mockito.doAnswer(new Answer() { - - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - assert args.length == 1; - long txId = (Long) args[0]; - - for (JournalManager jm : jms) { - jm.purgeLogsOlderThan(txId); - } - return null; + Mockito.doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 1; + long txId = (Long) args[0]; + for (JournalManager jm : jms) { + jm.purgeLogsOlderThan(txId); } + return null; }).when(mockLog).purgeLogsOlderThan(Mockito.anyLong()); - Mockito.doAnswer(new Answer() { - - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - journalSet.selectInputStreams((Collection)args[0], - (Long)args[1], (Boolean)args[2], (Boolean)args[3]); - return null; - } + Mockito.doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + journalSet.selectInputStreams((Collection)args[0], + (Long)args[1], (Boolean)args[2], (Boolean)args[3]); + return null; }).when(mockLog).selectInputStreams(Mockito.anyCollection(), Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean()); return mockLog; @@ -402,19 +476,16 @@ private static Collection filesToPaths(Collection files) { return paths; } - private static NNStorage mockStorageForDirs(final StorageDirectory ... mockDirs) + private static NNStorage mockStorageForDirs(final StorageDirectory... mockDirs) throws IOException { NNStorage mockStorage = Mockito.mock(NNStorage.class); - Mockito.doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - FSImageStorageInspector inspector = + Mockito.doAnswer(invocation -> { + FSImageStorageInspector inspector = (FSImageStorageInspector) invocation.getArguments()[0]; - for (StorageDirectory sd : mockDirs) { - inspector.inspectDirectory(sd); - } - return null; + for (StorageDirectory sd : mockDirs) { + inspector.inspectDirectory(sd); } + return null; }).when(mockStorage).inspectStorageDirs(any()); return mockStorage; }