From 8a3e64c77f73998326e9d72df75597bb0ad7b857 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Wed, 19 Sep 2012 23:40:59 +0000 Subject: [PATCH] HDFS-3956. QJM: purge temporary files when no longer within retention period. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1387817 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES.HDFS-3077.txt | 2 + .../hdfs/qjournal/server/JNStorage.java | 57 +++++++++++++++++++ .../hadoop/hdfs/qjournal/server/Journal.java | 29 +--------- .../qjournal/client/TestQJMWithFaults.java | 10 ++-- .../client/TestQuorumJournalManager.java | 6 ++ 5 files changed, 74 insertions(+), 30 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt index 00dac9e346..13a2fd045e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt @@ -80,3 +80,5 @@ HDFS-3943. QJM: remove currently-unused md5sum field (todd) HDFS-3950. QJM: misc TODO cleanup, improved log messages, etc. (todd) HDFS-3955. QJM: Make acceptRecovery() atomic. (todd) + +HDFS-3956. QJM: purge temporary files when no longer within retention period (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java index c949f20ae7..bb28e62ab0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java @@ -19,7 +19,11 @@ import java.io.File; import java.io.IOException; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.Storage; @@ -28,6 +32,8 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import com.google.common.collect.ImmutableList; + /** * A {@link Storage} implementation for the {@link JournalNode}. * @@ -39,6 +45,15 @@ class JNStorage extends Storage { private final FileJournalManager fjm; private final StorageDirectory sd; private StorageState state; + + + private static final List CURRENT_DIR_PURGE_REGEXES = + ImmutableList.of( + Pattern.compile("edits_\\d+-(\\d+)"), + Pattern.compile("edits_inprogress_(\\d+)(?:\\..*)?")); + + private static final List PAXOS_DIR_PURGE_REGEXES = + ImmutableList.of(Pattern.compile("(\\d+)")); /** * @param logDir the path to the directory in which data will be stored @@ -111,6 +126,48 @@ File getPaxosFile(long segmentTxId) { File getPaxosDir() { return new File(sd.getCurrentDir(), "paxos"); } + + /** + * Remove any log files and associated paxos files which are older than + * the given txid. + */ + void purgeDataOlderThan(long minTxIdToKeep) throws IOException { + purgeMatching(sd.getCurrentDir(), + CURRENT_DIR_PURGE_REGEXES, minTxIdToKeep); + purgeMatching(getPaxosDir(), PAXOS_DIR_PURGE_REGEXES, minTxIdToKeep); + } + + /** + * Purge files in the given directory which match any of the set of patterns. + * The patterns must have a single numeric capture group which determines + * the associated transaction ID of the file. Only those files for which + * the transaction ID is less than the minTxIdToKeep parameter + * are removed. + */ + private static void purgeMatching(File dir, List patterns, + long minTxIdToKeep) throws IOException { + + for (File f : FileUtil.listFiles(dir)) { + if (!f.isFile()) continue; + + for (Pattern p : patterns) { + Matcher matcher = p.matcher(f.getName()); + if (matcher.matches()) { + // This parsing will always succeed since the group(1) is + // /\d+/ in the regex itself. + long txid = Long.valueOf(matcher.group(1)); + if (txid < minTxIdToKeep) { + LOG.info("Purging no-longer needed file " + txid); + if (!f.delete()) { + LOG.warn("Unable to delete no-longer-needed data " + + f); + } + break; + } + } + } + } + } void format(NamespaceInfo nsInfo) throws IOException { setStorageInfo(nsInfo); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index 498ca7bbc4..9a15fc1b59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -27,6 +27,8 @@ import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -595,8 +597,7 @@ public synchronized void purgeLogsOlderThan(RequestInfo reqInfo, checkFormatted(); checkRequest(reqInfo); - fjm.purgeLogsOlderThan(minTxIdToKeep); - purgePaxosDecisionsOlderThan(minTxIdToKeep); + storage.purgeDataOlderThan(minTxIdToKeep); } /** @@ -614,30 +615,6 @@ private void purgePaxosDecision(long segmentTxId) throws IOException { } } - private void purgePaxosDecisionsOlderThan(long minTxIdToKeep) - throws IOException { - File dir = storage.getPaxosDir(); - for (File f : FileUtil.listFiles(dir)) { - if (!f.isFile()) continue; - - long txid; - try { - txid = Long.valueOf(f.getName()); - } catch (NumberFormatException nfe) { - LOG.warn("Unexpected non-numeric file name for " + f.getAbsolutePath()); - continue; - } - - if (txid < minTxIdToKeep) { - if (!f.delete()) { - LOG.warn("Unable to delete no-longer-needed paxos decision record " + - f); - } - } - } - } - - /** * @see QJournalProtocol#getEditLogManifest(String, long) */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java index 7a1ea69e1d..74daa58c77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java @@ -234,10 +234,6 @@ public void testRandomized() throws Exception { QuorumJournalManager qjm = createRandomFaultyQJM(cluster, r); try { - if (txid > 100) { - qjm.purgeLogsOlderThan(txid - 100); - } - long recovered; try { recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm); @@ -252,6 +248,12 @@ public void testRandomized() throws Exception { txid = recovered + 1; + // Periodically purge old data on disk so it's easier to look + // at failure cases. + if (txid > 100 && i % 10 == 1) { + qjm.purgeLogsOlderThan(txid - 100); + } + Holder thrown = new Holder(null); for (int j = 0; j < SEGMENTS_PER_WRITER; j++) { lastAcked = writeSegmentUntilCrash(cluster, qjm, txid, 4, thrown); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index 7c13e4b2bd..7c2bb29d40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -854,6 +854,12 @@ public void testPurgeLogs() throws Exception { GenericTestUtils.assertGlobEquals(paxosDir, "\\d+", "1", "3"); + // Create some temporary files of the sort that are used during recovery. + assertTrue(new File(curDir, + "edits_inprogress_0000000000000000001.epoch=140").createNewFile()); + assertTrue(new File(curDir, + "edits_inprogress_0000000000000000002.empty").createNewFile()); + qjm.purgeLogsOlderThan(3); // Log purging is asynchronous, so we have to wait for the calls