From 23762da4fa17ce6ea7b70722147977123a28a7e6 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Thu, 4 Aug 2011 21:56:17 +0000 Subject: [PATCH] HDFS-2225. Refactor file management so it's not in classes which should be generic. Contributed by Ivan Kelly. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1154029 13f79535-47bb-0310-9956-ffa450edef68 --- hdfs/CHANGES.txt | 3 + .../server/namenode/BackupJournalManager.java | 2 +- .../hdfs/server/namenode/FSEditLog.java | 5 +- .../namenode/FSImageStorageInspector.java | 32 +++ .../FSImageTransactionalStorageInspector.java | 237 +++--------------- .../server/namenode/FileJournalManager.java | 162 +++++++++++- .../hdfs/server/namenode/JournalManager.java | 2 +- .../namenode/NNStorageRetentionManager.java | 24 +- .../hdfs/server/namenode/FSImageTestUtil.java | 28 +-- .../hdfs/server/namenode/TestBackupNode.java | 6 +- .../TestCheckPointForSecurityTokens.java | 6 +- .../namenode/TestFSImageStorageInspector.java | 20 +- .../TestNNStorageRetentionManager.java | 29 ++- 13 files changed, 286 insertions(+), 270 deletions(-) diff --git a/hdfs/CHANGES.txt b/hdfs/CHANGES.txt index 65973fdad3..e347c188c5 100644 --- a/hdfs/CHANGES.txt +++ b/hdfs/CHANGES.txt @@ -632,6 +632,9 @@ Trunk (unreleased changes) HDFS-2187. Make EditLogInputStream act like an iterator over FSEditLogOps (Ivan Kelly and todd via todd) + HDFS-2225. Refactor file management so it's not in classes which should + be generic. (Ivan Kelly via todd) + OPTIMIZATIONS HDFS-1458. Improve checkpoint performance by avoiding unnecessary image diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java index 35c4b7384f..3cac6676f1 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java @@ -54,7 +54,7 @@ public void setOutputBufferCapacity(int size) { } @Override - public void purgeLogsOlderThan(long minTxIdToKeep, StoragePurger purger) + public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException { } diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 04d96747ff..7f1047fa58 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -862,8 +862,7 @@ public void apply(JournalAndStream jas) throws IOException { /** * Archive any log files that are older than the given txid. */ - public void purgeLogsOlderThan( - final long minTxIdToKeep, final StoragePurger purger) { + public void purgeLogsOlderThan(final long minTxIdToKeep) { synchronized (this) { // synchronized to prevent findbugs warning about inconsistent // synchronization. This will be JIT-ed out if asserts are @@ -877,7 +876,7 @@ public void purgeLogsOlderThan( mapJournalsAndReportErrors(new JournalClosure() { @Override public void apply(JournalAndStream jas) throws IOException { - jas.manager.purgeLogsOlderThan(minTxIdToKeep, purger); + jas.manager.purgeLogsOlderThan(minTxIdToKeep); } }, "purging logs older than " + minTxIdToKeep); } diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java index 6249f2f5d5..65bfa0ac55 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java @@ -96,4 +96,36 @@ public String toString() { return sb.toString(); } } + + /** + * Record of an image that has been located and had its filename parsed. + */ + static class FSImageFile { + final StorageDirectory sd; + final long txId; + private final File file; + + FSImageFile(StorageDirectory sd, File file, long txId) { + assert txId >= 0 : "Invalid txid on " + file +": " + txId; + + this.sd = sd; + this.txId = txId; + this.file = file; + } + + File getFile() { + return file; + } + + public long getCheckpointTxId() { + return txId; + } + + @Override + public String toString() { + return String.format("FSImageFile(file=%s, cpktTxId=%019d)", + file.toString(), txId); + } + } + } diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java index 6fd1994ece..66be9fef27 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java @@ -37,9 +37,9 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; @@ -54,17 +54,13 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector { private boolean needToSave = false; private boolean isUpgradeFinalized = true; - List foundImages = new ArrayList(); - List foundEditLogs = new ArrayList(); + List foundImages = new ArrayList(); + List foundEditLogs = new ArrayList(); SortedMap logGroups = new TreeMap(); long maxSeenTxId = 0; private static final Pattern IMAGE_REGEX = Pattern.compile( NameNodeFile.IMAGE.getName() + "_(\\d+)"); - private static final Pattern EDITS_REGEX = Pattern.compile( - NameNodeFile.EDITS.getName() + "_(\\d+)-(\\d+)"); - private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile( - NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)"); @Override public void inspectDirectory(StorageDirectory sd) throws IOException { @@ -95,7 +91,7 @@ public void inspectDirectory(StorageDirectory sd) throws IOException { if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) { try { long txid = Long.valueOf(imageMatch.group(1)); - foundImages.add(new FoundFSImage(sd, f, txid)); + foundImages.add(new FSImageFile(sd, f, txid)); } catch (NumberFormatException nfe) { LOG.error("Image file " + f + " has improperly formatted " + "transaction ID"); @@ -117,9 +113,10 @@ public void inspectDirectory(StorageDirectory sd) throws IOException { LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe); } - List editLogs = matchEditLogs(filesInStorage); + List editLogs + = FileJournalManager.matchEditLogs(filesInStorage); if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) { - for (FoundEditLog log : editLogs) { + for (EditLogFile log : editLogs) { addEditLog(log); } } else if (!editLogs.isEmpty()){ @@ -133,47 +130,12 @@ public void inspectDirectory(StorageDirectory sd) throws IOException { isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists(); } - static List matchEditLogs(File[] filesInStorage) { - List ret = Lists.newArrayList(); - for (File f : filesInStorage) { - String name = f.getName(); - // Check for edits - Matcher editsMatch = EDITS_REGEX.matcher(name); - if (editsMatch.matches()) { - try { - long startTxId = Long.valueOf(editsMatch.group(1)); - long endTxId = Long.valueOf(editsMatch.group(2)); - ret.add(new FoundEditLog(f, startTxId, endTxId)); - } catch (NumberFormatException nfe) { - LOG.error("Edits file " + f + " has improperly formatted " + - "transaction ID"); - // skip - } - } - - // Check for in-progress edits - Matcher inProgressEditsMatch = EDITS_INPROGRESS_REGEX.matcher(name); - if (inProgressEditsMatch.matches()) { - try { - long startTxId = Long.valueOf(inProgressEditsMatch.group(1)); - ret.add( - new FoundEditLog(f, startTxId, FoundEditLog.UNKNOWN_END)); - } catch (NumberFormatException nfe) { - LOG.error("In-progress edits file " + f + " has improperly " + - "formatted transaction ID"); - // skip - } - } - } - return ret; - } - - private void addEditLog(FoundEditLog foundEditLog) { + private void addEditLog(EditLogFile foundEditLog) { foundEditLogs.add(foundEditLog); - LogGroup group = logGroups.get(foundEditLog.startTxId); + LogGroup group = logGroups.get(foundEditLog.getFirstTxId()); if (group == null) { - group = new LogGroup(foundEditLog.startTxId); - logGroups.put(foundEditLog.startTxId, group); + group = new LogGroup(foundEditLog.getFirstTxId()); + logGroups.put(foundEditLog.getFirstTxId(), group); } group.add(foundEditLog); } @@ -191,9 +153,9 @@ public boolean isUpgradeFinalized() { * * Returns null if no images were found. */ - FoundFSImage getLatestImage() { - FoundFSImage ret = null; - for (FoundFSImage img : foundImages) { + FSImageFile getLatestImage() { + FSImageFile ret = null; + for (FSImageFile img : foundImages) { if (ret == null || img.txId > ret.txId) { ret = img; } @@ -201,11 +163,11 @@ FoundFSImage getLatestImage() { return ret; } - public List getFoundImages() { + public List getFoundImages() { return ImmutableList.copyOf(foundImages); } - public List getFoundEditLogs() { + public List getEditLogFiles() { return ImmutableList.copyOf(foundEditLogs); } @@ -215,7 +177,7 @@ public LoadPlan createLoadPlan() throws IOException { throw new FileNotFoundException("No valid image files found"); } - FoundFSImage recoveryImage = getLatestImage(); + FSImageFile recoveryImage = getLatestImage(); LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE); return new TransactionalLoadPlan(recoveryImage, @@ -233,7 +195,7 @@ public LoadPlan createLoadPlan() throws IOException { LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException { long expectedTxId = sinceTxId + 1; - List recoveryLogs = new ArrayList(); + List recoveryLogs = new ArrayList(); SortedMap tailGroups = logGroups.tailMap(expectedTxId); if (logGroups.size() > tailGroups.size()) { @@ -312,10 +274,10 @@ RemoteEditLogManifest getEditLogManifest(long sinceTxId) { for (LogGroup g : logGroups.values()) { if (!g.hasFinalized) continue; - FoundEditLog fel = g.getBestNonCorruptLog(); + EditLogFile fel = g.getBestNonCorruptLog(); if (fel.getLastTxId() < sinceTxId) continue; - logs.add(new RemoteEditLog(fel.getStartTxId(), + logs.add(new RemoteEditLog(fel.getFirstTxId(), fel.getLastTxId())); } @@ -330,7 +292,7 @@ RemoteEditLogManifest getEditLogManifest(long sinceTxId) { */ static class LogGroup { long startTxId; - List logs = new ArrayList();; + List logs = new ArrayList();; private Set endTxIds = new TreeSet(); private boolean hasInProgress = false; private boolean hasFinalized = false; @@ -339,15 +301,15 @@ static class LogGroup { this.startTxId = startTxId; } - FoundEditLog getBestNonCorruptLog() { + EditLogFile getBestNonCorruptLog() { // First look for non-corrupt finalized logs - for (FoundEditLog log : logs) { + for (EditLogFile log : logs) { if (!log.isCorrupt() && !log.isInProgress()) { return log; } } // Then look for non-corrupt in-progress logs - for (FoundEditLog log : logs) { + for (EditLogFile log : logs) { if (!log.isCorrupt()) { return log; } @@ -364,7 +326,7 @@ FoundEditLog getBestNonCorruptLog() { * @return true if we can determine the last txid in this log group. */ boolean hasKnownLastTxId() { - for (FoundEditLog log : logs) { + for (EditLogFile log : logs) { if (!log.isInProgress()) { return true; } @@ -378,24 +340,24 @@ boolean hasKnownLastTxId() { * {@see #hasKnownLastTxId()} */ long getLastTxId() { - for (FoundEditLog log : logs) { + for (EditLogFile log : logs) { if (!log.isInProgress()) { - return log.lastTxId; + return log.getLastTxId(); } } throw new IllegalStateException("LogGroup only has in-progress logs"); } - void add(FoundEditLog log) { - assert log.getStartTxId() == startTxId; + void add(EditLogFile log) { + assert log.getFirstTxId() == startTxId; logs.add(log); if (log.isInProgress()) { hasInProgress = true; } else { hasFinalized = true; - endTxIds.add(log.lastTxId); + endTxIds.add(log.getLastTxId()); } } @@ -422,7 +384,7 @@ void planRecovery() throws IOException { * The in-progress logs in this case should be considered corrupt. */ private void planMixedLogRecovery() throws IOException { - for (FoundEditLog log : logs) { + for (EditLogFile log : logs) { if (log.isInProgress()) { LOG.warn("Log at " + log.getFile() + " is in progress, but " + "other logs starting at the same txid " + startTxId + @@ -446,7 +408,7 @@ private void planAllInProgressRecovery() throws IOException { "crash)"); if (logs.size() == 1) { // Only one log, it's our only choice! - FoundEditLog log = logs.get(0); + EditLogFile log = logs.get(0); if (log.validateLog().numTransactions == 0) { // If it has no transactions, we should consider it corrupt just // to be conservative. @@ -459,7 +421,7 @@ private void planAllInProgressRecovery() throws IOException { } long maxValidTxnCount = Long.MIN_VALUE; - for (FoundEditLog log : logs) { + for (EditLogFile log : logs) { long validTxnCount = log.validateLog().numTransactions; LOG.warn(" Log " + log.getFile() + " valid txns=" + validTxnCount + @@ -467,7 +429,7 @@ private void planAllInProgressRecovery() throws IOException { maxValidTxnCount = Math.max(maxValidTxnCount, validTxnCount); } - for (FoundEditLog log : logs) { + for (EditLogFile log : logs) { long txns = log.validateLog().numTransactions; if (txns < maxValidTxnCount) { LOG.warn("Marking log at " + log.getFile() + " as corrupt since " + @@ -499,7 +461,7 @@ private void checkConsistentEndTxIds() throws IOException { } void recover() throws IOException { - for (FoundEditLog log : logs) { + for (EditLogFile log : logs) { if (log.isCorrupt()) { log.moveAsideCorruptFile(); } else if (log.isInProgress()) { @@ -508,131 +470,12 @@ void recover() throws IOException { } } } - - /** - * Record of an image that has been located and had its filename parsed. - */ - static class FoundFSImage { - final StorageDirectory sd; - final long txId; - private final File file; - - FoundFSImage(StorageDirectory sd, File file, long txId) { - assert txId >= 0 : "Invalid txid on " + file +": " + txId; - - this.sd = sd; - this.txId = txId; - this.file = file; - } - - File getFile() { - return file; - } - - public long getTxId() { - return txId; - } - - @Override - public String toString() { - return file.toString(); - } - } - /** - * Record of an edit log that has been located and had its filename parsed. - */ - static class FoundEditLog { - File file; - final long startTxId; - long lastTxId; - - private EditLogValidation cachedValidation = null; - private boolean isCorrupt = false; - - static final long UNKNOWN_END = -1; - - FoundEditLog(File file, - long startTxId, long endTxId) { - assert endTxId == UNKNOWN_END || endTxId >= startTxId; - assert startTxId > 0; - assert file != null; - - this.startTxId = startTxId; - this.lastTxId = endTxId; - this.file = file; - } - - public void finalizeLog() throws IOException { - long numTransactions = validateLog().numTransactions; - long lastTxId = startTxId + numTransactions - 1; - File dst = new File(file.getParentFile(), - NNStorage.getFinalizedEditsFileName(startTxId, lastTxId)); - LOG.info("Finalizing edits log " + file + " by renaming to " - + dst.getName()); - if (!file.renameTo(dst)) { - throw new IOException("Couldn't finalize log " + - file + " to " + dst); - } - this.lastTxId = lastTxId; - file = dst; - } - - long getStartTxId() { - return startTxId; - } - - long getLastTxId() { - return lastTxId; - } - - EditLogValidation validateLog() throws IOException { - if (cachedValidation == null) { - cachedValidation = EditLogFileInputStream.validateEditLog(file); - } - return cachedValidation; - } - - boolean isInProgress() { - return (lastTxId == UNKNOWN_END); - } - - File getFile() { - return file; - } - - void markCorrupt() { - isCorrupt = true; - } - - boolean isCorrupt() { - return isCorrupt; - } - - void moveAsideCorruptFile() throws IOException { - assert isCorrupt; - - File src = file; - File dst = new File(src.getParent(), src.getName() + ".corrupt"); - boolean success = src.renameTo(dst); - if (!success) { - throw new IOException( - "Couldn't rename corrupt log " + src + " to " + dst); - } - file = dst; - } - - @Override - public String toString() { - return file.toString(); - } - } - static class TransactionalLoadPlan extends LoadPlan { - final FoundFSImage image; + final FSImageFile image; final LogLoadPlan logPlan; - public TransactionalLoadPlan(FoundFSImage image, + public TransactionalLoadPlan(FSImageFile image, LogLoadPlan logPlan) { super(); this.image = image; @@ -662,10 +505,10 @@ StorageDirectory getStorageDirectoryForProperties() { } static class LogLoadPlan { - final List editLogs; + final List editLogs; final List logGroupsToRecover; - LogLoadPlan(List editLogs, + LogLoadPlan(List editLogs, List logGroupsToRecover) { this.editLogs = editLogs; this.logGroupsToRecover = logGroupsToRecover; @@ -679,7 +522,7 @@ public void doRecovery() throws IOException { public List getEditsFiles() { List ret = new ArrayList(); - for (FoundEditLog log : editLogs) { + for (EditLogFile log : editLogs) { ret.add(log.getFile()); } return ret; diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index 360a118fa2..84eee71c23 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -23,14 +23,20 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.Comparator; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; -import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog; import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; +import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.ComparisonChain; /** * Journal manager for the common case of edits files being written @@ -45,6 +51,15 @@ class FileJournalManager implements JournalManager { private final StorageDirectory sd; private int outputBufferCapacity = 512*1024; + private static final Pattern EDITS_REGEX = Pattern.compile( + NameNodeFile.EDITS.getName() + "_(\\d+)-(\\d+)"); + private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile( + NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)"); + + @VisibleForTesting + StoragePurger purger + = new NNStorageRetentionManager.DeletionStoragePurger(); + public FileJournalManager(StorageDirectory sd) { this.sd = sd; } @@ -91,13 +106,13 @@ public void setOutputBufferCapacity(int size) { } @Override - public void purgeLogsOlderThan(long minTxIdToKeep, StoragePurger purger) + public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException { File[] files = FileUtil.listFiles(sd.getCurrentDir()); - List editLogs = - FSImageTransactionalStorageInspector.matchEditLogs(files); - for (FoundEditLog log : editLogs) { - if (log.getStartTxId() < minTxIdToKeep && + List editLogs = + FileJournalManager.matchEditLogs(files); + for (EditLogFile log : editLogs) { + if (log.getFirstTxId() < minTxIdToKeep && log.getLastTxId() < minTxIdToKeep) { purger.purgeLog(log); } @@ -111,4 +126,139 @@ public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId) return new EditLogFileInputStream(f); } + static List matchEditLogs(File[] filesInStorage) { + List ret = Lists.newArrayList(); + for (File f : filesInStorage) { + String name = f.getName(); + // Check for edits + Matcher editsMatch = EDITS_REGEX.matcher(name); + if (editsMatch.matches()) { + try { + long startTxId = Long.valueOf(editsMatch.group(1)); + long endTxId = Long.valueOf(editsMatch.group(2)); + ret.add(new EditLogFile(f, startTxId, endTxId)); + } catch (NumberFormatException nfe) { + LOG.error("Edits file " + f + " has improperly formatted " + + "transaction ID"); + // skip + } + } + + // Check for in-progress edits + Matcher inProgressEditsMatch = EDITS_INPROGRESS_REGEX.matcher(name); + if (inProgressEditsMatch.matches()) { + try { + long startTxId = Long.valueOf(inProgressEditsMatch.group(1)); + ret.add( + new EditLogFile(f, startTxId, EditLogFile.UNKNOWN_END)); + } catch (NumberFormatException nfe) { + LOG.error("In-progress edits file " + f + " has improperly " + + "formatted transaction ID"); + // skip + } + } + } + return ret; + } + + /** + * Record of an edit log that has been located and had its filename parsed. + */ + static class EditLogFile { + private File file; + private final long firstTxId; + private long lastTxId; + + private EditLogValidation cachedValidation = null; + private boolean isCorrupt = false; + + static final long UNKNOWN_END = -1; + + final static Comparator COMPARE_BY_START_TXID + = new Comparator() { + public int compare(EditLogFile a, EditLogFile b) { + return ComparisonChain.start() + .compare(a.getFirstTxId(), b.getFirstTxId()) + .compare(a.getLastTxId(), b.getLastTxId()) + .result(); + } + }; + + EditLogFile(File file, + long firstTxId, long lastTxId) { + assert lastTxId == UNKNOWN_END || lastTxId >= firstTxId; + assert firstTxId > 0; + assert file != null; + + this.firstTxId = firstTxId; + this.lastTxId = lastTxId; + this.file = file; + } + + public void finalizeLog() throws IOException { + long numTransactions = validateLog().numTransactions; + long lastTxId = firstTxId + numTransactions - 1; + File dst = new File(file.getParentFile(), + NNStorage.getFinalizedEditsFileName(firstTxId, lastTxId)); + LOG.info("Finalizing edits log " + file + " by renaming to " + + dst.getName()); + if (!file.renameTo(dst)) { + throw new IOException("Couldn't finalize log " + + file + " to " + dst); + } + this.lastTxId = lastTxId; + file = dst; + } + + long getFirstTxId() { + return firstTxId; + } + + long getLastTxId() { + return lastTxId; + } + + EditLogValidation validateLog() throws IOException { + if (cachedValidation == null) { + cachedValidation = EditLogFileInputStream.validateEditLog(file); + } + return cachedValidation; + } + + boolean isInProgress() { + return (lastTxId == UNKNOWN_END); + } + + File getFile() { + return file; + } + + void markCorrupt() { + isCorrupt = true; + } + + boolean isCorrupt() { + return isCorrupt; + } + + void moveAsideCorruptFile() throws IOException { + assert isCorrupt; + + File src = file; + File dst = new File(src.getParent(), src.getName() + ".corrupt"); + boolean success = src.renameTo(dst); + if (!success) { + throw new IOException( + "Couldn't rename corrupt log " + src + " to " + dst); + } + file = dst; + } + + @Override + public String toString() { + return String.format("EditLogFile(file=%s,first=%019d,last=%019d," + +"inProgress=%b,corrupt=%b)", file.toString(), + firstTxId, lastTxId, isInProgress(), isCorrupt); + } + } } diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java index 56ea5c2512..d62aaa7e5a 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java @@ -55,7 +55,7 @@ interface JournalManager { * @param purger the purging implementation to use * @throws IOException if purging fails */ - void purgeLogsOlderThan(long minTxIdToKeep, StoragePurger purger) + void purgeLogsOlderThan(long minTxIdToKeep) throws IOException; /** diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java index 4b5f9a9090..fe651001aa 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java @@ -27,8 +27,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog; -import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundFSImage; +import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import org.apache.hadoop.hdfs.util.MD5FileUtils; import com.google.common.collect.Lists; @@ -80,14 +80,14 @@ public void purgeOldStorage() throws IOException { // If fsimage_N is the image we want to keep, then we need to keep // all txns > N. We can remove anything < N+1, since fsimage_N // reflects the state up to and including N. - editLog.purgeLogsOlderThan(minImageTxId + 1, purger); + editLog.purgeLogsOlderThan(minImageTxId + 1); } private void purgeCheckpointsOlderThan( FSImageTransactionalStorageInspector inspector, long minTxId) { - for (FoundFSImage image : inspector.getFoundImages()) { - if (image.getTxId() < minTxId) { + for (FSImageFile image : inspector.getFoundImages()) { + if (image.getCheckpointTxId() < minTxId) { LOG.info("Purging old image " + image); purger.purgeImage(image); } @@ -101,10 +101,10 @@ private void purgeCheckpointsOlderThan( */ private long getImageTxIdToRetain(FSImageTransactionalStorageInspector inspector) { - List images = inspector.getFoundImages(); + List images = inspector.getFoundImages(); TreeSet imageTxIds = Sets.newTreeSet(); - for (FoundFSImage image : images) { - imageTxIds.add(image.getTxId()); + for (FSImageFile image : images) { + imageTxIds.add(image.getCheckpointTxId()); } List imageTxIdsList = Lists.newArrayList(imageTxIds); @@ -124,18 +124,18 @@ private long getImageTxIdToRetain(FSImageTransactionalStorageInspector inspector * Interface responsible for disposing of old checkpoints and edit logs. */ static interface StoragePurger { - void purgeLog(FoundEditLog log); - void purgeImage(FoundFSImage image); + void purgeLog(EditLogFile log); + void purgeImage(FSImageFile image); } static class DeletionStoragePurger implements StoragePurger { @Override - public void purgeLog(FoundEditLog log) { + public void purgeLog(EditLogFile log) { deleteOrWarn(log.getFile()); } @Override - public void purgeImage(FoundFSImage image) { + public void purgeImage(FSImageFile image) { deleteOrWarn(image.getFile()); deleteOrWarn(MD5FileUtils.getDigestFileForFile(image.getFile())); } diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java index 39d64f0686..6a35981e41 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java @@ -25,7 +25,6 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,15 +33,14 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; -import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog; -import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundFSImage; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; +import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.io.IOUtils; import org.mockito.Mockito; import com.google.common.base.Joiner; -import com.google.common.collect.ComparisonChain; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -154,9 +152,9 @@ public static void assertSameNewestImage(List dirs) throws Exception { for (File dir : dirs) { FSImageTransactionalStorageInspector inspector = inspectStorageDirectory(dir, NameNodeDirType.IMAGE); - FoundFSImage latestImage = inspector.getLatestImage(); + FSImageFile latestImage = inspector.getLatestImage(); assertNotNull("No image in " + dir, latestImage); - long thisTxId = latestImage.getTxId(); + long thisTxId = latestImage.getCheckpointTxId(); if (imageTxId != -1 && thisTxId != imageTxId) { fail("Storage directory " + dir + " does not have the same " + "last image index " + imageTxId + " as another"); @@ -283,7 +281,7 @@ public static File findNewestImageFile(String currentDirPath) throws IOException new FSImageTransactionalStorageInspector(); inspector.inspectDirectory(sd); - FoundFSImage latestImage = inspector.getLatestImage(); + FSImageFile latestImage = inspector.getLatestImage(); return (latestImage == null) ? null : latestImage.getFile(); } @@ -316,23 +314,15 @@ static List getNameNodeCurrentDirs(MiniDFSCluster cluster) { * @return the latest edits log, finalized or otherwise, from the given * storage directory. */ - public static FoundEditLog findLatestEditsLog(StorageDirectory sd) + public static EditLogFile findLatestEditsLog(StorageDirectory sd) throws IOException { FSImageTransactionalStorageInspector inspector = new FSImageTransactionalStorageInspector(); inspector.inspectDirectory(sd); - List foundEditLogs = Lists.newArrayList( - inspector.getFoundEditLogs()); - return Collections.max(foundEditLogs, new Comparator() { - @Override - public int compare(FoundEditLog a, FoundEditLog b) { - return ComparisonChain.start() - .compare(a.getStartTxId(), b.getStartTxId()) - .compare(a.getLastTxId(), b.getLastTxId()) - .result(); - } - }); + List foundEditLogs = Lists.newArrayList( + inspector.getEditLogFiles()); + return Collections.max(foundEditLogs, EditLogFile.COMPARE_BY_START_TXID); } /** diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java index c869adc9fa..72e3bf7230 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; -import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; @@ -163,8 +163,8 @@ public void testBackupNodeTailsEdits() throws Exception { // When shutting down the BN, it shouldn't finalize logs that are // still open on the NN - FoundEditLog editsLog = FSImageTestUtil.findLatestEditsLog(sd); - assertEquals(editsLog.getStartTxId(), + EditLogFile editsLog = FSImageTestUtil.findLatestEditsLog(sd); + assertEquals(editsLog.getFirstTxId(), nn.getFSImage().getEditLog().getCurSegmentTxId()); assertTrue("Should not have finalized " + editsLog, editsLog.isInProgress()); diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java index 44efa548e8..4f698c08d3 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; -import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; @@ -82,7 +82,7 @@ public void testSaveNamespace() throws IOException { // verify that the edits file is NOT empty NameNode nn = cluster.getNameNode(); for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) { - FoundEditLog log = FSImageTestUtil.findLatestEditsLog(sd); + EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd); assertTrue(log.isInProgress()); assertEquals("In-progress log " + log + " should have 5 transactions", 5, log.validateLog().numTransactions); @@ -97,7 +97,7 @@ public void testSaveNamespace() throws IOException { } // verify that the edits file is empty except for the START txn for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) { - FoundEditLog log = FSImageTestUtil.findLatestEditsLog(sd); + EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd); assertTrue(log.isInProgress()); assertEquals("In-progress log " + log + " should only have START txn", 1, log.validateLog().numTransactions); diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java index 297f737cde..05c03b7e72 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java @@ -36,8 +36,8 @@ import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getFinalizedEditsFileName; import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getImageFileName; -import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog; -import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundFSImage; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; +import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile; import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.TransactionalLoadPlan; import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogGroup; import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.LoadPlan; @@ -72,7 +72,7 @@ public void testCurrentStorageInspector() throws IOException { assertEquals(2, inspector.foundImages.size()); assertTrue(inspector.foundEditLogs.get(1).isInProgress()); - FoundFSImage latestImage = inspector.getLatestImage(); + FSImageFile latestImage = inspector.getLatestImage(); assertEquals(456, latestImage.txId); assertSame(mockDir, latestImage.sd); assertTrue(inspector.isUpgradeFinalized()); @@ -203,7 +203,7 @@ public void testLogGroupRecoveryMixed() throws IOException { LogGroup lg = inspector.logGroups.get(123L); assertEquals(3, lg.logs.size()); - FoundEditLog inProgressLog = lg.logs.get(2); + EditLogFile inProgressLog = lg.logs.get(2); assertTrue(inProgressLog.isInProgress()); LoadPlan plan = inspector.createLoadPlan(); @@ -282,7 +282,7 @@ public void testLogGroupRecoveryInProgress() throws IOException { assertTrue(lg.logs.get(2).isCorrupt()); // Calling recover should move it aside - FoundEditLog badLog = lg.logs.get(2); + EditLogFile badLog = lg.logs.get(2); Mockito.doNothing().when(badLog).moveAsideCorruptFile(); Mockito.doNothing().when(lg.logs.get(0)).finalizeLog(); Mockito.doNothing().when(lg.logs.get(1)).finalizeLog(); @@ -303,12 +303,12 @@ private void mockLogValidation( String path, int numValidTransactions) throws IOException { for (LogGroup lg : inspector.logGroups.values()) { - List logs = lg.logs; + List logs = lg.logs; for (int i = 0; i < logs.size(); i++) { - FoundEditLog log = logs.get(i); - if (log.file.getPath().equals(path)) { + EditLogFile log = logs.get(i); + if (log.getFile().getPath().equals(path)) { // mock out its validation - FoundEditLog spyLog = spy(log); + EditLogFile spyLog = spy(log); doReturn(new FSEditLogLoader.EditLogValidation(-1, numValidTransactions)) .when(spyLog).validateLog(); logs.set(i, spyLog); @@ -356,7 +356,7 @@ public void testCurrentSplitEditsAndImage() throws IOException { // Check plan TransactionalLoadPlan plan = (TransactionalLoadPlan)inspector.createLoadPlan(); - FoundFSImage pickedImage = plan.image; + FSImageFile pickedImage = plan.image; assertEquals(456, pickedImage.txId); assertSame(mockImageDir2, pickedImage.sd); assertEquals(new File("/foo2/current/" + getImageFileName(456)), diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java index aa3ab82b40..60a7d477a4 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java @@ -24,8 +24,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; -import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog; -import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundFSImage; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; +import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName; import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getFinalizedEditsFileName; @@ -168,14 +168,14 @@ private void runTest(TestCaseDescription tc) throws IOException { StoragePurger mockPurger = Mockito.mock(NNStorageRetentionManager.StoragePurger.class); - ArgumentCaptor imagesPurgedCaptor = - ArgumentCaptor.forClass(FoundFSImage.class); - ArgumentCaptor logsPurgedCaptor = - ArgumentCaptor.forClass(FoundEditLog.class); + ArgumentCaptor imagesPurgedCaptor = + ArgumentCaptor.forClass(FSImageFile.class); + ArgumentCaptor logsPurgedCaptor = + ArgumentCaptor.forClass(EditLogFile.class); // Ask the manager to purge files we don't need any more new NNStorageRetentionManager(conf, - tc.mockStorage(), tc.mockEditLog(), mockPurger) + tc.mockStorage(), tc.mockEditLog(mockPurger), mockPurger) .purgeOldStorage(); // Verify that it asked the purger to remove the correct files @@ -186,7 +186,7 @@ private void runTest(TestCaseDescription tc) throws IOException { // Check images Set purgedPaths = Sets.newHashSet(); - for (FoundFSImage purged : imagesPurgedCaptor.getAllValues()) { + for (FSImageFile purged : imagesPurgedCaptor.getAllValues()) { purgedPaths.add(purged.getFile().toString()); } Assert.assertEquals(Joiner.on(",").join(tc.expectedPurgedImages), @@ -194,7 +194,7 @@ private void runTest(TestCaseDescription tc) throws IOException { // Check images purgedPaths.clear(); - for (FoundEditLog purged : logsPurgedCaptor.getAllValues()) { + for (EditLogFile purged : logsPurgedCaptor.getAllValues()) { purgedPaths.add(purged.getFile().toString()); } Assert.assertEquals(Joiner.on(",").join(tc.expectedPurgedLogs), @@ -256,13 +256,14 @@ NNStorage mockStorage() throws IOException { return mockStorageForDirs(sds.toArray(new StorageDirectory[0])); } - public FSEditLog mockEditLog() { + public FSEditLog mockEditLog(StoragePurger purger) { final List jms = Lists.newArrayList(); for (FakeRoot root : dirRoots.values()) { if (!root.type.isOfType(NameNodeDirType.EDITS)) continue; FileJournalManager fjm = new FileJournalManager( root.mockStorageDir()); + fjm.purger = purger; jms.add(fjm); } @@ -272,17 +273,15 @@ public FSEditLog mockEditLog() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); - assert args.length == 2; + assert args.length == 1; long txId = (Long) args[0]; - StoragePurger purger = (StoragePurger) args[1]; for (JournalManager jm : jms) { - jm.purgeLogsOlderThan(txId, purger); + jm.purgeLogsOlderThan(txId); } return null; } - }).when(mockLog).purgeLogsOlderThan( - Mockito.anyLong(), (StoragePurger) Mockito.anyObject()); + }).when(mockLog).purgeLogsOlderThan(Mockito.anyLong()); return mockLog; } }