HDFS-2225. Refactor file management so it's not in classes which should be generic. Contributed by Ivan Kelly.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1154029 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-08-04 21:56:17 +00:00
parent 53190cfa1d
commit 23762da4fa
13 changed files with 286 additions and 270 deletions

View File

@ -632,6 +632,9 @@ Trunk (unreleased changes)
HDFS-2187. Make EditLogInputStream act like an iterator over FSEditLogOps HDFS-2187. Make EditLogInputStream act like an iterator over FSEditLogOps
(Ivan Kelly and todd via todd) (Ivan Kelly and todd via todd)
HDFS-2225. Refactor file management so it's not in classes which should
be generic. (Ivan Kelly via todd)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -54,7 +54,7 @@ public void setOutputBufferCapacity(int size) {
} }
@Override @Override
public void purgeLogsOlderThan(long minTxIdToKeep, StoragePurger purger) public void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException { throws IOException {
} }

View File

@ -862,8 +862,7 @@ public void apply(JournalAndStream jas) throws IOException {
/** /**
* Archive any log files that are older than the given txid. * Archive any log files that are older than the given txid.
*/ */
public void purgeLogsOlderThan( public void purgeLogsOlderThan(final long minTxIdToKeep) {
final long minTxIdToKeep, final StoragePurger purger) {
synchronized (this) { synchronized (this) {
// synchronized to prevent findbugs warning about inconsistent // synchronized to prevent findbugs warning about inconsistent
// synchronization. This will be JIT-ed out if asserts are // synchronization. This will be JIT-ed out if asserts are
@ -877,7 +876,7 @@ public void purgeLogsOlderThan(
mapJournalsAndReportErrors(new JournalClosure() { mapJournalsAndReportErrors(new JournalClosure() {
@Override @Override
public void apply(JournalAndStream jas) throws IOException { public void apply(JournalAndStream jas) throws IOException {
jas.manager.purgeLogsOlderThan(minTxIdToKeep, purger); jas.manager.purgeLogsOlderThan(minTxIdToKeep);
} }
}, "purging logs older than " + minTxIdToKeep); }, "purging logs older than " + minTxIdToKeep);
} }

View File

@ -96,4 +96,36 @@ public String toString() {
return sb.toString(); return sb.toString();
} }
} }
/**
* Record of an image that has been located and had its filename parsed.
*/
static class FSImageFile {
final StorageDirectory sd;
final long txId;
private final File file;
FSImageFile(StorageDirectory sd, File file, long txId) {
assert txId >= 0 : "Invalid txid on " + file +": " + txId;
this.sd = sd;
this.txId = txId;
this.file = file;
}
File getFile() {
return file;
}
public long getCheckpointTxId() {
return txId;
}
@Override
public String toString() {
return String.format("FSImageFile(file=%s, cpktTxId=%019d)",
file.toString(), txId);
}
}
} }

View File

@ -37,9 +37,9 @@
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@ -54,17 +54,13 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
private boolean needToSave = false; private boolean needToSave = false;
private boolean isUpgradeFinalized = true; private boolean isUpgradeFinalized = true;
List<FoundFSImage> foundImages = new ArrayList<FoundFSImage>(); List<FSImageFile> foundImages = new ArrayList<FSImageFile>();
List<FoundEditLog> foundEditLogs = new ArrayList<FoundEditLog>(); List<EditLogFile> foundEditLogs = new ArrayList<EditLogFile>();
SortedMap<Long, LogGroup> logGroups = new TreeMap<Long, LogGroup>(); SortedMap<Long, LogGroup> logGroups = new TreeMap<Long, LogGroup>();
long maxSeenTxId = 0; long maxSeenTxId = 0;
private static final Pattern IMAGE_REGEX = Pattern.compile( private static final Pattern IMAGE_REGEX = Pattern.compile(
NameNodeFile.IMAGE.getName() + "_(\\d+)"); NameNodeFile.IMAGE.getName() + "_(\\d+)");
private static final Pattern EDITS_REGEX = Pattern.compile(
NameNodeFile.EDITS.getName() + "_(\\d+)-(\\d+)");
private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile(
NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
@Override @Override
public void inspectDirectory(StorageDirectory sd) throws IOException { public void inspectDirectory(StorageDirectory sd) throws IOException {
@ -95,7 +91,7 @@ public void inspectDirectory(StorageDirectory sd) throws IOException {
if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) { if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
try { try {
long txid = Long.valueOf(imageMatch.group(1)); long txid = Long.valueOf(imageMatch.group(1));
foundImages.add(new FoundFSImage(sd, f, txid)); foundImages.add(new FSImageFile(sd, f, txid));
} catch (NumberFormatException nfe) { } catch (NumberFormatException nfe) {
LOG.error("Image file " + f + " has improperly formatted " + LOG.error("Image file " + f + " has improperly formatted " +
"transaction ID"); "transaction ID");
@ -117,9 +113,10 @@ public void inspectDirectory(StorageDirectory sd) throws IOException {
LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe); LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
} }
List<FoundEditLog> editLogs = matchEditLogs(filesInStorage); List<EditLogFile> editLogs
= FileJournalManager.matchEditLogs(filesInStorage);
if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) { if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
for (FoundEditLog log : editLogs) { for (EditLogFile log : editLogs) {
addEditLog(log); addEditLog(log);
} }
} else if (!editLogs.isEmpty()){ } else if (!editLogs.isEmpty()){
@ -133,47 +130,12 @@ public void inspectDirectory(StorageDirectory sd) throws IOException {
isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists(); isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
} }
static List<FoundEditLog> matchEditLogs(File[] filesInStorage) { private void addEditLog(EditLogFile foundEditLog) {
List<FoundEditLog> ret = Lists.newArrayList();
for (File f : filesInStorage) {
String name = f.getName();
// Check for edits
Matcher editsMatch = EDITS_REGEX.matcher(name);
if (editsMatch.matches()) {
try {
long startTxId = Long.valueOf(editsMatch.group(1));
long endTxId = Long.valueOf(editsMatch.group(2));
ret.add(new FoundEditLog(f, startTxId, endTxId));
} catch (NumberFormatException nfe) {
LOG.error("Edits file " + f + " has improperly formatted " +
"transaction ID");
// skip
}
}
// Check for in-progress edits
Matcher inProgressEditsMatch = EDITS_INPROGRESS_REGEX.matcher(name);
if (inProgressEditsMatch.matches()) {
try {
long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
ret.add(
new FoundEditLog(f, startTxId, FoundEditLog.UNKNOWN_END));
} catch (NumberFormatException nfe) {
LOG.error("In-progress edits file " + f + " has improperly " +
"formatted transaction ID");
// skip
}
}
}
return ret;
}
private void addEditLog(FoundEditLog foundEditLog) {
foundEditLogs.add(foundEditLog); foundEditLogs.add(foundEditLog);
LogGroup group = logGroups.get(foundEditLog.startTxId); LogGroup group = logGroups.get(foundEditLog.getFirstTxId());
if (group == null) { if (group == null) {
group = new LogGroup(foundEditLog.startTxId); group = new LogGroup(foundEditLog.getFirstTxId());
logGroups.put(foundEditLog.startTxId, group); logGroups.put(foundEditLog.getFirstTxId(), group);
} }
group.add(foundEditLog); group.add(foundEditLog);
} }
@ -191,9 +153,9 @@ public boolean isUpgradeFinalized() {
* *
* Returns null if no images were found. * Returns null if no images were found.
*/ */
FoundFSImage getLatestImage() { FSImageFile getLatestImage() {
FoundFSImage ret = null; FSImageFile ret = null;
for (FoundFSImage img : foundImages) { for (FSImageFile img : foundImages) {
if (ret == null || img.txId > ret.txId) { if (ret == null || img.txId > ret.txId) {
ret = img; ret = img;
} }
@ -201,11 +163,11 @@ FoundFSImage getLatestImage() {
return ret; return ret;
} }
public List<FoundFSImage> getFoundImages() { public List<FSImageFile> getFoundImages() {
return ImmutableList.copyOf(foundImages); return ImmutableList.copyOf(foundImages);
} }
public List<FoundEditLog> getFoundEditLogs() { public List<EditLogFile> getEditLogFiles() {
return ImmutableList.copyOf(foundEditLogs); return ImmutableList.copyOf(foundEditLogs);
} }
@ -215,7 +177,7 @@ public LoadPlan createLoadPlan() throws IOException {
throw new FileNotFoundException("No valid image files found"); throw new FileNotFoundException("No valid image files found");
} }
FoundFSImage recoveryImage = getLatestImage(); FSImageFile recoveryImage = getLatestImage();
LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE); LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE);
return new TransactionalLoadPlan(recoveryImage, return new TransactionalLoadPlan(recoveryImage,
@ -233,7 +195,7 @@ public LoadPlan createLoadPlan() throws IOException {
LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException { LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException {
long expectedTxId = sinceTxId + 1; long expectedTxId = sinceTxId + 1;
List<FoundEditLog> recoveryLogs = new ArrayList<FoundEditLog>(); List<EditLogFile> recoveryLogs = new ArrayList<EditLogFile>();
SortedMap<Long, LogGroup> tailGroups = logGroups.tailMap(expectedTxId); SortedMap<Long, LogGroup> tailGroups = logGroups.tailMap(expectedTxId);
if (logGroups.size() > tailGroups.size()) { if (logGroups.size() > tailGroups.size()) {
@ -312,10 +274,10 @@ RemoteEditLogManifest getEditLogManifest(long sinceTxId) {
for (LogGroup g : logGroups.values()) { for (LogGroup g : logGroups.values()) {
if (!g.hasFinalized) continue; if (!g.hasFinalized) continue;
FoundEditLog fel = g.getBestNonCorruptLog(); EditLogFile fel = g.getBestNonCorruptLog();
if (fel.getLastTxId() < sinceTxId) continue; if (fel.getLastTxId() < sinceTxId) continue;
logs.add(new RemoteEditLog(fel.getStartTxId(), logs.add(new RemoteEditLog(fel.getFirstTxId(),
fel.getLastTxId())); fel.getLastTxId()));
} }
@ -330,7 +292,7 @@ RemoteEditLogManifest getEditLogManifest(long sinceTxId) {
*/ */
static class LogGroup { static class LogGroup {
long startTxId; long startTxId;
List<FoundEditLog> logs = new ArrayList<FoundEditLog>();; List<EditLogFile> logs = new ArrayList<EditLogFile>();;
private Set<Long> endTxIds = new TreeSet<Long>(); private Set<Long> endTxIds = new TreeSet<Long>();
private boolean hasInProgress = false; private boolean hasInProgress = false;
private boolean hasFinalized = false; private boolean hasFinalized = false;
@ -339,15 +301,15 @@ static class LogGroup {
this.startTxId = startTxId; this.startTxId = startTxId;
} }
FoundEditLog getBestNonCorruptLog() { EditLogFile getBestNonCorruptLog() {
// First look for non-corrupt finalized logs // First look for non-corrupt finalized logs
for (FoundEditLog log : logs) { for (EditLogFile log : logs) {
if (!log.isCorrupt() && !log.isInProgress()) { if (!log.isCorrupt() && !log.isInProgress()) {
return log; return log;
} }
} }
// Then look for non-corrupt in-progress logs // Then look for non-corrupt in-progress logs
for (FoundEditLog log : logs) { for (EditLogFile log : logs) {
if (!log.isCorrupt()) { if (!log.isCorrupt()) {
return log; return log;
} }
@ -364,7 +326,7 @@ FoundEditLog getBestNonCorruptLog() {
* @return true if we can determine the last txid in this log group. * @return true if we can determine the last txid in this log group.
*/ */
boolean hasKnownLastTxId() { boolean hasKnownLastTxId() {
for (FoundEditLog log : logs) { for (EditLogFile log : logs) {
if (!log.isInProgress()) { if (!log.isInProgress()) {
return true; return true;
} }
@ -378,24 +340,24 @@ boolean hasKnownLastTxId() {
* {@see #hasKnownLastTxId()} * {@see #hasKnownLastTxId()}
*/ */
long getLastTxId() { long getLastTxId() {
for (FoundEditLog log : logs) { for (EditLogFile log : logs) {
if (!log.isInProgress()) { if (!log.isInProgress()) {
return log.lastTxId; return log.getLastTxId();
} }
} }
throw new IllegalStateException("LogGroup only has in-progress logs"); throw new IllegalStateException("LogGroup only has in-progress logs");
} }
void add(FoundEditLog log) { void add(EditLogFile log) {
assert log.getStartTxId() == startTxId; assert log.getFirstTxId() == startTxId;
logs.add(log); logs.add(log);
if (log.isInProgress()) { if (log.isInProgress()) {
hasInProgress = true; hasInProgress = true;
} else { } else {
hasFinalized = true; hasFinalized = true;
endTxIds.add(log.lastTxId); endTxIds.add(log.getLastTxId());
} }
} }
@ -422,7 +384,7 @@ void planRecovery() throws IOException {
* The in-progress logs in this case should be considered corrupt. * The in-progress logs in this case should be considered corrupt.
*/ */
private void planMixedLogRecovery() throws IOException { private void planMixedLogRecovery() throws IOException {
for (FoundEditLog log : logs) { for (EditLogFile log : logs) {
if (log.isInProgress()) { if (log.isInProgress()) {
LOG.warn("Log at " + log.getFile() + " is in progress, but " + LOG.warn("Log at " + log.getFile() + " is in progress, but " +
"other logs starting at the same txid " + startTxId + "other logs starting at the same txid " + startTxId +
@ -446,7 +408,7 @@ private void planAllInProgressRecovery() throws IOException {
"crash)"); "crash)");
if (logs.size() == 1) { if (logs.size() == 1) {
// Only one log, it's our only choice! // Only one log, it's our only choice!
FoundEditLog log = logs.get(0); EditLogFile log = logs.get(0);
if (log.validateLog().numTransactions == 0) { if (log.validateLog().numTransactions == 0) {
// If it has no transactions, we should consider it corrupt just // If it has no transactions, we should consider it corrupt just
// to be conservative. // to be conservative.
@ -459,7 +421,7 @@ private void planAllInProgressRecovery() throws IOException {
} }
long maxValidTxnCount = Long.MIN_VALUE; long maxValidTxnCount = Long.MIN_VALUE;
for (FoundEditLog log : logs) { for (EditLogFile log : logs) {
long validTxnCount = log.validateLog().numTransactions; long validTxnCount = log.validateLog().numTransactions;
LOG.warn(" Log " + log.getFile() + LOG.warn(" Log " + log.getFile() +
" valid txns=" + validTxnCount + " valid txns=" + validTxnCount +
@ -467,7 +429,7 @@ private void planAllInProgressRecovery() throws IOException {
maxValidTxnCount = Math.max(maxValidTxnCount, validTxnCount); maxValidTxnCount = Math.max(maxValidTxnCount, validTxnCount);
} }
for (FoundEditLog log : logs) { for (EditLogFile log : logs) {
long txns = log.validateLog().numTransactions; long txns = log.validateLog().numTransactions;
if (txns < maxValidTxnCount) { if (txns < maxValidTxnCount) {
LOG.warn("Marking log at " + log.getFile() + " as corrupt since " + LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
@ -499,7 +461,7 @@ private void checkConsistentEndTxIds() throws IOException {
} }
void recover() throws IOException { void recover() throws IOException {
for (FoundEditLog log : logs) { for (EditLogFile log : logs) {
if (log.isCorrupt()) { if (log.isCorrupt()) {
log.moveAsideCorruptFile(); log.moveAsideCorruptFile();
} else if (log.isInProgress()) { } else if (log.isInProgress()) {
@ -509,130 +471,11 @@ void recover() throws IOException {
} }
} }
/**
* Record of an image that has been located and had its filename parsed.
*/
static class FoundFSImage {
final StorageDirectory sd;
final long txId;
private final File file;
FoundFSImage(StorageDirectory sd, File file, long txId) {
assert txId >= 0 : "Invalid txid on " + file +": " + txId;
this.sd = sd;
this.txId = txId;
this.file = file;
}
File getFile() {
return file;
}
public long getTxId() {
return txId;
}
@Override
public String toString() {
return file.toString();
}
}
/**
* Record of an edit log that has been located and had its filename parsed.
*/
static class FoundEditLog {
File file;
final long startTxId;
long lastTxId;
private EditLogValidation cachedValidation = null;
private boolean isCorrupt = false;
static final long UNKNOWN_END = -1;
FoundEditLog(File file,
long startTxId, long endTxId) {
assert endTxId == UNKNOWN_END || endTxId >= startTxId;
assert startTxId > 0;
assert file != null;
this.startTxId = startTxId;
this.lastTxId = endTxId;
this.file = file;
}
public void finalizeLog() throws IOException {
long numTransactions = validateLog().numTransactions;
long lastTxId = startTxId + numTransactions - 1;
File dst = new File(file.getParentFile(),
NNStorage.getFinalizedEditsFileName(startTxId, lastTxId));
LOG.info("Finalizing edits log " + file + " by renaming to "
+ dst.getName());
if (!file.renameTo(dst)) {
throw new IOException("Couldn't finalize log " +
file + " to " + dst);
}
this.lastTxId = lastTxId;
file = dst;
}
long getStartTxId() {
return startTxId;
}
long getLastTxId() {
return lastTxId;
}
EditLogValidation validateLog() throws IOException {
if (cachedValidation == null) {
cachedValidation = EditLogFileInputStream.validateEditLog(file);
}
return cachedValidation;
}
boolean isInProgress() {
return (lastTxId == UNKNOWN_END);
}
File getFile() {
return file;
}
void markCorrupt() {
isCorrupt = true;
}
boolean isCorrupt() {
return isCorrupt;
}
void moveAsideCorruptFile() throws IOException {
assert isCorrupt;
File src = file;
File dst = new File(src.getParent(), src.getName() + ".corrupt");
boolean success = src.renameTo(dst);
if (!success) {
throw new IOException(
"Couldn't rename corrupt log " + src + " to " + dst);
}
file = dst;
}
@Override
public String toString() {
return file.toString();
}
}
static class TransactionalLoadPlan extends LoadPlan { static class TransactionalLoadPlan extends LoadPlan {
final FoundFSImage image; final FSImageFile image;
final LogLoadPlan logPlan; final LogLoadPlan logPlan;
public TransactionalLoadPlan(FoundFSImage image, public TransactionalLoadPlan(FSImageFile image,
LogLoadPlan logPlan) { LogLoadPlan logPlan) {
super(); super();
this.image = image; this.image = image;
@ -662,10 +505,10 @@ StorageDirectory getStorageDirectoryForProperties() {
} }
static class LogLoadPlan { static class LogLoadPlan {
final List<FoundEditLog> editLogs; final List<EditLogFile> editLogs;
final List<LogGroup> logGroupsToRecover; final List<LogGroup> logGroupsToRecover;
LogLoadPlan(List<FoundEditLog> editLogs, LogLoadPlan(List<EditLogFile> editLogs,
List<LogGroup> logGroupsToRecover) { List<LogGroup> logGroupsToRecover) {
this.editLogs = editLogs; this.editLogs = editLogs;
this.logGroupsToRecover = logGroupsToRecover; this.logGroupsToRecover = logGroupsToRecover;
@ -679,7 +522,7 @@ public void doRecovery() throws IOException {
public List<File> getEditsFiles() { public List<File> getEditsFiles() {
List<File> ret = new ArrayList<File>(); List<File> ret = new ArrayList<File>();
for (FoundEditLog log : editLogs) { for (EditLogFile log : editLogs) {
ret.add(log.getFile()); ret.add(log.getFile());
} }
return ret; return ret;

View File

@ -23,14 +23,20 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Comparator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog;
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger; import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.ComparisonChain;
/** /**
* Journal manager for the common case of edits files being written * Journal manager for the common case of edits files being written
@ -45,6 +51,15 @@ class FileJournalManager implements JournalManager {
private final StorageDirectory sd; private final StorageDirectory sd;
private int outputBufferCapacity = 512*1024; private int outputBufferCapacity = 512*1024;
private static final Pattern EDITS_REGEX = Pattern.compile(
NameNodeFile.EDITS.getName() + "_(\\d+)-(\\d+)");
private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile(
NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
@VisibleForTesting
StoragePurger purger
= new NNStorageRetentionManager.DeletionStoragePurger();
public FileJournalManager(StorageDirectory sd) { public FileJournalManager(StorageDirectory sd) {
this.sd = sd; this.sd = sd;
} }
@ -91,13 +106,13 @@ public void setOutputBufferCapacity(int size) {
} }
@Override @Override
public void purgeLogsOlderThan(long minTxIdToKeep, StoragePurger purger) public void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException { throws IOException {
File[] files = FileUtil.listFiles(sd.getCurrentDir()); File[] files = FileUtil.listFiles(sd.getCurrentDir());
List<FoundEditLog> editLogs = List<EditLogFile> editLogs =
FSImageTransactionalStorageInspector.matchEditLogs(files); FileJournalManager.matchEditLogs(files);
for (FoundEditLog log : editLogs) { for (EditLogFile log : editLogs) {
if (log.getStartTxId() < minTxIdToKeep && if (log.getFirstTxId() < minTxIdToKeep &&
log.getLastTxId() < minTxIdToKeep) { log.getLastTxId() < minTxIdToKeep) {
purger.purgeLog(log); purger.purgeLog(log);
} }
@ -111,4 +126,139 @@ public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
return new EditLogFileInputStream(f); return new EditLogFileInputStream(f);
} }
static List<EditLogFile> matchEditLogs(File[] filesInStorage) {
List<EditLogFile> ret = Lists.newArrayList();
for (File f : filesInStorage) {
String name = f.getName();
// Check for edits
Matcher editsMatch = EDITS_REGEX.matcher(name);
if (editsMatch.matches()) {
try {
long startTxId = Long.valueOf(editsMatch.group(1));
long endTxId = Long.valueOf(editsMatch.group(2));
ret.add(new EditLogFile(f, startTxId, endTxId));
} catch (NumberFormatException nfe) {
LOG.error("Edits file " + f + " has improperly formatted " +
"transaction ID");
// skip
}
}
// Check for in-progress edits
Matcher inProgressEditsMatch = EDITS_INPROGRESS_REGEX.matcher(name);
if (inProgressEditsMatch.matches()) {
try {
long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
ret.add(
new EditLogFile(f, startTxId, EditLogFile.UNKNOWN_END));
} catch (NumberFormatException nfe) {
LOG.error("In-progress edits file " + f + " has improperly " +
"formatted transaction ID");
// skip
}
}
}
return ret;
}
/**
* Record of an edit log that has been located and had its filename parsed.
*/
static class EditLogFile {
private File file;
private final long firstTxId;
private long lastTxId;
private EditLogValidation cachedValidation = null;
private boolean isCorrupt = false;
static final long UNKNOWN_END = -1;
final static Comparator<EditLogFile> COMPARE_BY_START_TXID
= new Comparator<EditLogFile>() {
public int compare(EditLogFile a, EditLogFile b) {
return ComparisonChain.start()
.compare(a.getFirstTxId(), b.getFirstTxId())
.compare(a.getLastTxId(), b.getLastTxId())
.result();
}
};
EditLogFile(File file,
long firstTxId, long lastTxId) {
assert lastTxId == UNKNOWN_END || lastTxId >= firstTxId;
assert firstTxId > 0;
assert file != null;
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
this.file = file;
}
public void finalizeLog() throws IOException {
long numTransactions = validateLog().numTransactions;
long lastTxId = firstTxId + numTransactions - 1;
File dst = new File(file.getParentFile(),
NNStorage.getFinalizedEditsFileName(firstTxId, lastTxId));
LOG.info("Finalizing edits log " + file + " by renaming to "
+ dst.getName());
if (!file.renameTo(dst)) {
throw new IOException("Couldn't finalize log " +
file + " to " + dst);
}
this.lastTxId = lastTxId;
file = dst;
}
long getFirstTxId() {
return firstTxId;
}
long getLastTxId() {
return lastTxId;
}
EditLogValidation validateLog() throws IOException {
if (cachedValidation == null) {
cachedValidation = EditLogFileInputStream.validateEditLog(file);
}
return cachedValidation;
}
boolean isInProgress() {
return (lastTxId == UNKNOWN_END);
}
File getFile() {
return file;
}
void markCorrupt() {
isCorrupt = true;
}
boolean isCorrupt() {
return isCorrupt;
}
void moveAsideCorruptFile() throws IOException {
assert isCorrupt;
File src = file;
File dst = new File(src.getParent(), src.getName() + ".corrupt");
boolean success = src.renameTo(dst);
if (!success) {
throw new IOException(
"Couldn't rename corrupt log " + src + " to " + dst);
}
file = dst;
}
@Override
public String toString() {
return String.format("EditLogFile(file=%s,first=%019d,last=%019d,"
+"inProgress=%b,corrupt=%b)", file.toString(),
firstTxId, lastTxId, isInProgress(), isCorrupt);
}
}
} }

View File

@ -55,7 +55,7 @@ interface JournalManager {
* @param purger the purging implementation to use * @param purger the purging implementation to use
* @throws IOException if purging fails * @throws IOException if purging fails
*/ */
void purgeLogsOlderThan(long minTxIdToKeep, StoragePurger purger) void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException; throws IOException;
/** /**

View File

@ -27,8 +27,8 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog; import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundFSImage; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.hdfs.util.MD5FileUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -80,14 +80,14 @@ public void purgeOldStorage() throws IOException {
// If fsimage_N is the image we want to keep, then we need to keep // If fsimage_N is the image we want to keep, then we need to keep
// all txns > N. We can remove anything < N+1, since fsimage_N // all txns > N. We can remove anything < N+1, since fsimage_N
// reflects the state up to and including N. // reflects the state up to and including N.
editLog.purgeLogsOlderThan(minImageTxId + 1, purger); editLog.purgeLogsOlderThan(minImageTxId + 1);
} }
private void purgeCheckpointsOlderThan( private void purgeCheckpointsOlderThan(
FSImageTransactionalStorageInspector inspector, FSImageTransactionalStorageInspector inspector,
long minTxId) { long minTxId) {
for (FoundFSImage image : inspector.getFoundImages()) { for (FSImageFile image : inspector.getFoundImages()) {
if (image.getTxId() < minTxId) { if (image.getCheckpointTxId() < minTxId) {
LOG.info("Purging old image " + image); LOG.info("Purging old image " + image);
purger.purgeImage(image); purger.purgeImage(image);
} }
@ -101,10 +101,10 @@ private void purgeCheckpointsOlderThan(
*/ */
private long getImageTxIdToRetain(FSImageTransactionalStorageInspector inspector) { private long getImageTxIdToRetain(FSImageTransactionalStorageInspector inspector) {
List<FoundFSImage> images = inspector.getFoundImages(); List<FSImageFile> images = inspector.getFoundImages();
TreeSet<Long> imageTxIds = Sets.newTreeSet(); TreeSet<Long> imageTxIds = Sets.newTreeSet();
for (FoundFSImage image : images) { for (FSImageFile image : images) {
imageTxIds.add(image.getTxId()); imageTxIds.add(image.getCheckpointTxId());
} }
List<Long> imageTxIdsList = Lists.newArrayList(imageTxIds); List<Long> imageTxIdsList = Lists.newArrayList(imageTxIds);
@ -124,18 +124,18 @@ private long getImageTxIdToRetain(FSImageTransactionalStorageInspector inspector
* Interface responsible for disposing of old checkpoints and edit logs. * Interface responsible for disposing of old checkpoints and edit logs.
*/ */
static interface StoragePurger { static interface StoragePurger {
void purgeLog(FoundEditLog log); void purgeLog(EditLogFile log);
void purgeImage(FoundFSImage image); void purgeImage(FSImageFile image);
} }
static class DeletionStoragePurger implements StoragePurger { static class DeletionStoragePurger implements StoragePurger {
@Override @Override
public void purgeLog(FoundEditLog log) { public void purgeLog(EditLogFile log) {
deleteOrWarn(log.getFile()); deleteOrWarn(log.getFile());
} }
@Override @Override
public void purgeImage(FoundFSImage image) { public void purgeImage(FSImageFile image) {
deleteOrWarn(image.getFile()); deleteOrWarn(image.getFile());
deleteOrWarn(MD5FileUtils.getDigestFileForFile(image.getFile())); deleteOrWarn(MD5FileUtils.getDigestFileForFile(image.getFile()));
} }

View File

@ -25,7 +25,6 @@
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -34,15 +33,14 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundFSImage; import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -154,9 +152,9 @@ public static void assertSameNewestImage(List<File> dirs) throws Exception {
for (File dir : dirs) { for (File dir : dirs) {
FSImageTransactionalStorageInspector inspector = FSImageTransactionalStorageInspector inspector =
inspectStorageDirectory(dir, NameNodeDirType.IMAGE); inspectStorageDirectory(dir, NameNodeDirType.IMAGE);
FoundFSImage latestImage = inspector.getLatestImage(); FSImageFile latestImage = inspector.getLatestImage();
assertNotNull("No image in " + dir, latestImage); assertNotNull("No image in " + dir, latestImage);
long thisTxId = latestImage.getTxId(); long thisTxId = latestImage.getCheckpointTxId();
if (imageTxId != -1 && thisTxId != imageTxId) { if (imageTxId != -1 && thisTxId != imageTxId) {
fail("Storage directory " + dir + " does not have the same " + fail("Storage directory " + dir + " does not have the same " +
"last image index " + imageTxId + " as another"); "last image index " + imageTxId + " as another");
@ -283,7 +281,7 @@ public static File findNewestImageFile(String currentDirPath) throws IOException
new FSImageTransactionalStorageInspector(); new FSImageTransactionalStorageInspector();
inspector.inspectDirectory(sd); inspector.inspectDirectory(sd);
FoundFSImage latestImage = inspector.getLatestImage(); FSImageFile latestImage = inspector.getLatestImage();
return (latestImage == null) ? null : latestImage.getFile(); return (latestImage == null) ? null : latestImage.getFile();
} }
@ -316,23 +314,15 @@ static List<File> getNameNodeCurrentDirs(MiniDFSCluster cluster) {
* @return the latest edits log, finalized or otherwise, from the given * @return the latest edits log, finalized or otherwise, from the given
* storage directory. * storage directory.
*/ */
public static FoundEditLog findLatestEditsLog(StorageDirectory sd) public static EditLogFile findLatestEditsLog(StorageDirectory sd)
throws IOException { throws IOException {
FSImageTransactionalStorageInspector inspector = FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector(); new FSImageTransactionalStorageInspector();
inspector.inspectDirectory(sd); inspector.inspectDirectory(sd);
List<FoundEditLog> foundEditLogs = Lists.newArrayList( List<EditLogFile> foundEditLogs = Lists.newArrayList(
inspector.getFoundEditLogs()); inspector.getEditLogFiles());
return Collections.max(foundEditLogs, new Comparator<FoundEditLog>() { return Collections.max(foundEditLogs, EditLogFile.COMPARE_BY_START_TXID);
@Override
public int compare(FoundEditLog a, FoundEditLog b) {
return ComparisonChain.start()
.compare(a.getStartTxId(), b.getStartTxId())
.compare(a.getLastTxId(), b.getLastTxId())
.result();
}
});
} }
/** /**

View File

@ -33,7 +33,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -163,8 +163,8 @@ public void testBackupNodeTailsEdits() throws Exception {
// When shutting down the BN, it shouldn't finalize logs that are // When shutting down the BN, it shouldn't finalize logs that are
// still open on the NN // still open on the NN
FoundEditLog editsLog = FSImageTestUtil.findLatestEditsLog(sd); EditLogFile editsLog = FSImageTestUtil.findLatestEditsLog(sd);
assertEquals(editsLog.getStartTxId(), assertEquals(editsLog.getFirstTxId(),
nn.getFSImage().getEditLog().getCurSegmentTxId()); nn.getFSImage().getEditLog().getCurSegmentTxId());
assertTrue("Should not have finalized " + editsLog, assertTrue("Should not have finalized " + editsLog,
editsLog.isInProgress()); editsLog.isInProgress());

View File

@ -28,7 +28,7 @@
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -82,7 +82,7 @@ public void testSaveNamespace() throws IOException {
// verify that the edits file is NOT empty // verify that the edits file is NOT empty
NameNode nn = cluster.getNameNode(); NameNode nn = cluster.getNameNode();
for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) { for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
FoundEditLog log = FSImageTestUtil.findLatestEditsLog(sd); EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
assertTrue(log.isInProgress()); assertTrue(log.isInProgress());
assertEquals("In-progress log " + log + " should have 5 transactions", assertEquals("In-progress log " + log + " should have 5 transactions",
5, log.validateLog().numTransactions); 5, log.validateLog().numTransactions);
@ -97,7 +97,7 @@ public void testSaveNamespace() throws IOException {
} }
// verify that the edits file is empty except for the START txn // verify that the edits file is empty except for the START txn
for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) { for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
FoundEditLog log = FSImageTestUtil.findLatestEditsLog(sd); EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
assertTrue(log.isInProgress()); assertTrue(log.isInProgress());
assertEquals("In-progress log " + log + " should only have START txn", assertEquals("In-progress log " + log + " should only have START txn",
1, log.validateLog().numTransactions); 1, log.validateLog().numTransactions);

View File

@ -36,8 +36,8 @@
import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getFinalizedEditsFileName; import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getFinalizedEditsFileName;
import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getImageFileName; import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getImageFileName;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundFSImage; import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.TransactionalLoadPlan; import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.TransactionalLoadPlan;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogGroup; import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogGroup;
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.LoadPlan; import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.LoadPlan;
@ -72,7 +72,7 @@ public void testCurrentStorageInspector() throws IOException {
assertEquals(2, inspector.foundImages.size()); assertEquals(2, inspector.foundImages.size());
assertTrue(inspector.foundEditLogs.get(1).isInProgress()); assertTrue(inspector.foundEditLogs.get(1).isInProgress());
FoundFSImage latestImage = inspector.getLatestImage(); FSImageFile latestImage = inspector.getLatestImage();
assertEquals(456, latestImage.txId); assertEquals(456, latestImage.txId);
assertSame(mockDir, latestImage.sd); assertSame(mockDir, latestImage.sd);
assertTrue(inspector.isUpgradeFinalized()); assertTrue(inspector.isUpgradeFinalized());
@ -203,7 +203,7 @@ public void testLogGroupRecoveryMixed() throws IOException {
LogGroup lg = inspector.logGroups.get(123L); LogGroup lg = inspector.logGroups.get(123L);
assertEquals(3, lg.logs.size()); assertEquals(3, lg.logs.size());
FoundEditLog inProgressLog = lg.logs.get(2); EditLogFile inProgressLog = lg.logs.get(2);
assertTrue(inProgressLog.isInProgress()); assertTrue(inProgressLog.isInProgress());
LoadPlan plan = inspector.createLoadPlan(); LoadPlan plan = inspector.createLoadPlan();
@ -282,7 +282,7 @@ public void testLogGroupRecoveryInProgress() throws IOException {
assertTrue(lg.logs.get(2).isCorrupt()); assertTrue(lg.logs.get(2).isCorrupt());
// Calling recover should move it aside // Calling recover should move it aside
FoundEditLog badLog = lg.logs.get(2); EditLogFile badLog = lg.logs.get(2);
Mockito.doNothing().when(badLog).moveAsideCorruptFile(); Mockito.doNothing().when(badLog).moveAsideCorruptFile();
Mockito.doNothing().when(lg.logs.get(0)).finalizeLog(); Mockito.doNothing().when(lg.logs.get(0)).finalizeLog();
Mockito.doNothing().when(lg.logs.get(1)).finalizeLog(); Mockito.doNothing().when(lg.logs.get(1)).finalizeLog();
@ -303,12 +303,12 @@ private void mockLogValidation(
String path, int numValidTransactions) throws IOException { String path, int numValidTransactions) throws IOException {
for (LogGroup lg : inspector.logGroups.values()) { for (LogGroup lg : inspector.logGroups.values()) {
List<FoundEditLog> logs = lg.logs; List<EditLogFile> logs = lg.logs;
for (int i = 0; i < logs.size(); i++) { for (int i = 0; i < logs.size(); i++) {
FoundEditLog log = logs.get(i); EditLogFile log = logs.get(i);
if (log.file.getPath().equals(path)) { if (log.getFile().getPath().equals(path)) {
// mock out its validation // mock out its validation
FoundEditLog spyLog = spy(log); EditLogFile spyLog = spy(log);
doReturn(new FSEditLogLoader.EditLogValidation(-1, numValidTransactions)) doReturn(new FSEditLogLoader.EditLogValidation(-1, numValidTransactions))
.when(spyLog).validateLog(); .when(spyLog).validateLog();
logs.set(i, spyLog); logs.set(i, spyLog);
@ -356,7 +356,7 @@ public void testCurrentSplitEditsAndImage() throws IOException {
// Check plan // Check plan
TransactionalLoadPlan plan = TransactionalLoadPlan plan =
(TransactionalLoadPlan)inspector.createLoadPlan(); (TransactionalLoadPlan)inspector.createLoadPlan();
FoundFSImage pickedImage = plan.image; FSImageFile pickedImage = plan.image;
assertEquals(456, pickedImage.txId); assertEquals(456, pickedImage.txId);
assertSame(mockImageDir2, pickedImage.sd); assertSame(mockImageDir2, pickedImage.sd);
assertEquals(new File("/foo2/current/" + getImageFileName(456)), assertEquals(new File("/foo2/current/" + getImageFileName(456)),

View File

@ -24,8 +24,8 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundFSImage; import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName; import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getFinalizedEditsFileName; import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getFinalizedEditsFileName;
@ -168,14 +168,14 @@ private void runTest(TestCaseDescription tc) throws IOException {
StoragePurger mockPurger = StoragePurger mockPurger =
Mockito.mock(NNStorageRetentionManager.StoragePurger.class); Mockito.mock(NNStorageRetentionManager.StoragePurger.class);
ArgumentCaptor<FoundFSImage> imagesPurgedCaptor = ArgumentCaptor<FSImageFile> imagesPurgedCaptor =
ArgumentCaptor.forClass(FoundFSImage.class); ArgumentCaptor.forClass(FSImageFile.class);
ArgumentCaptor<FoundEditLog> logsPurgedCaptor = ArgumentCaptor<EditLogFile> logsPurgedCaptor =
ArgumentCaptor.forClass(FoundEditLog.class); ArgumentCaptor.forClass(EditLogFile.class);
// Ask the manager to purge files we don't need any more // Ask the manager to purge files we don't need any more
new NNStorageRetentionManager(conf, new NNStorageRetentionManager(conf,
tc.mockStorage(), tc.mockEditLog(), mockPurger) tc.mockStorage(), tc.mockEditLog(mockPurger), mockPurger)
.purgeOldStorage(); .purgeOldStorage();
// Verify that it asked the purger to remove the correct files // Verify that it asked the purger to remove the correct files
@ -186,7 +186,7 @@ private void runTest(TestCaseDescription tc) throws IOException {
// Check images // Check images
Set<String> purgedPaths = Sets.newHashSet(); Set<String> purgedPaths = Sets.newHashSet();
for (FoundFSImage purged : imagesPurgedCaptor.getAllValues()) { for (FSImageFile purged : imagesPurgedCaptor.getAllValues()) {
purgedPaths.add(purged.getFile().toString()); purgedPaths.add(purged.getFile().toString());
} }
Assert.assertEquals(Joiner.on(",").join(tc.expectedPurgedImages), Assert.assertEquals(Joiner.on(",").join(tc.expectedPurgedImages),
@ -194,7 +194,7 @@ private void runTest(TestCaseDescription tc) throws IOException {
// Check images // Check images
purgedPaths.clear(); purgedPaths.clear();
for (FoundEditLog purged : logsPurgedCaptor.getAllValues()) { for (EditLogFile purged : logsPurgedCaptor.getAllValues()) {
purgedPaths.add(purged.getFile().toString()); purgedPaths.add(purged.getFile().toString());
} }
Assert.assertEquals(Joiner.on(",").join(tc.expectedPurgedLogs), Assert.assertEquals(Joiner.on(",").join(tc.expectedPurgedLogs),
@ -256,13 +256,14 @@ NNStorage mockStorage() throws IOException {
return mockStorageForDirs(sds.toArray(new StorageDirectory[0])); return mockStorageForDirs(sds.toArray(new StorageDirectory[0]));
} }
public FSEditLog mockEditLog() { public FSEditLog mockEditLog(StoragePurger purger) {
final List<JournalManager> jms = Lists.newArrayList(); final List<JournalManager> jms = Lists.newArrayList();
for (FakeRoot root : dirRoots.values()) { for (FakeRoot root : dirRoots.values()) {
if (!root.type.isOfType(NameNodeDirType.EDITS)) continue; if (!root.type.isOfType(NameNodeDirType.EDITS)) continue;
FileJournalManager fjm = new FileJournalManager( FileJournalManager fjm = new FileJournalManager(
root.mockStorageDir()); root.mockStorageDir());
fjm.purger = purger;
jms.add(fjm); jms.add(fjm);
} }
@ -272,17 +273,15 @@ public FSEditLog mockEditLog() {
@Override @Override
public Void answer(InvocationOnMock invocation) throws Throwable { public Void answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments(); Object[] args = invocation.getArguments();
assert args.length == 2; assert args.length == 1;
long txId = (Long) args[0]; long txId = (Long) args[0];
StoragePurger purger = (StoragePurger) args[1];
for (JournalManager jm : jms) { for (JournalManager jm : jms) {
jm.purgeLogsOlderThan(txId, purger); jm.purgeLogsOlderThan(txId);
} }
return null; return null;
} }
}).when(mockLog).purgeLogsOlderThan( }).when(mockLog).purgeLogsOlderThan(Mockito.anyLong());
Mockito.anyLong(), (StoragePurger) Mockito.anyObject());
return mockLog; return mockLog;
} }
} }