HDFS-3678. Edit log files are never being purged from 2NN. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1377046 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-08-24 18:52:59 +00:00
parent a95bbb107f
commit 92cb6b093c
8 changed files with 136 additions and 27 deletions

View File

@ -203,6 +203,8 @@ Trunk (unreleased changes)
HDFS-3834. Remove unused static fields NAME, DESCRIPTION and Usage from HDFS-3834. Remove unused static fields NAME, DESCRIPTION and Usage from
Command. (Jing Zhao via suresh) Command. (Jing Zhao via suresh)
HDFS-3678. Edit log files are never being purged from 2NN. (atm)
Branch-2 ( Unreleased changes ) Branch-2 ( Unreleased changes )
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -83,7 +83,7 @@
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class FSEditLog { public class FSEditLog implements LogsPurgeable {
static final Log LOG = LogFactory.getLog(FSEditLog.class); static final Log LOG = LogFactory.getLog(FSEditLog.class);
@ -1032,6 +1032,7 @@ synchronized void abortCurrentLogSegment() {
/** /**
* Archive any log files that are older than the given txid. * Archive any log files that are older than the given txid.
*/ */
@Override
public synchronized void purgeLogsOlderThan(final long minTxIdToKeep) { public synchronized void purgeLogsOlderThan(final long minTxIdToKeep) {
assert curSegmentTxId == HdfsConstants.INVALID_TXID || // on format this is no-op assert curSegmentTxId == HdfsConstants.INVALID_TXID || // on format this is no-op
minTxIdToKeep <= curSegmentTxId : minTxIdToKeep <= curSegmentTxId :

View File

@ -91,7 +91,7 @@ public class FSImage implements Closeable {
final private Configuration conf; final private Configuration conf;
private final NNStorageRetentionManager archivalManager; protected NNStorageRetentionManager archivalManager;
/** /**
* Construct an FSImage * Construct an FSImage

View File

@ -35,7 +35,8 @@
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface JournalManager extends Closeable, FormatConfirmable { public interface JournalManager extends Closeable, FormatConfirmable,
LogsPurgeable {
/** /**
* Format the underlying storage, removing any previously * Format the underlying storage, removing any previously
@ -73,17 +74,6 @@ void selectInputStreams(Collection<EditLogInputStream> streams,
*/ */
void setOutputBufferCapacity(int size); void setOutputBufferCapacity(int size);
/**
* The JournalManager may archive/purge any logs for transactions less than
* or equal to minImageTxId.
*
* @param minTxIdToKeep the earliest txid that must be retained after purging
* old logs
* @throws IOException if purging fails
*/
void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException;
/** /**
* Recover segments which have not been finalized. * Recover segments which have not been finalized.
*/ */

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
/**
* Interface used to abstract over classes which manage edit logs that may need
* to be purged.
*/
interface LogsPurgeable {
/**
* Remove all edit logs with transaction IDs lower than the given transaction
* ID.
*
* @param minTxIdToKeep the lowest transaction ID that should be retained
* @throws IOException in the event of error
*/
public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
}

View File

@ -52,12 +52,12 @@ public class NNStorageRetentionManager {
NNStorageRetentionManager.class); NNStorageRetentionManager.class);
private final NNStorage storage; private final NNStorage storage;
private final StoragePurger purger; private final StoragePurger purger;
private final FSEditLog editLog; private final LogsPurgeable purgeableLogs;
public NNStorageRetentionManager( public NNStorageRetentionManager(
Configuration conf, Configuration conf,
NNStorage storage, NNStorage storage,
FSEditLog editLog, LogsPurgeable purgeableLogs,
StoragePurger purger) { StoragePurger purger) {
this.numCheckpointsToRetain = conf.getInt( this.numCheckpointsToRetain = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY,
@ -72,13 +72,13 @@ public NNStorageRetentionManager(
" must not be negative"); " must not be negative");
this.storage = storage; this.storage = storage;
this.editLog = editLog; this.purgeableLogs = purgeableLogs;
this.purger = purger; this.purger = purger;
} }
public NNStorageRetentionManager(Configuration conf, NNStorage storage, public NNStorageRetentionManager(Configuration conf, NNStorage storage,
FSEditLog editLog) { LogsPurgeable purgeableLogs) {
this(conf, storage, editLog, new DeletionStoragePurger()); this(conf, storage, purgeableLogs, new DeletionStoragePurger());
} }
public void purgeOldStorage() throws IOException { public void purgeOldStorage() throws IOException {
@ -95,7 +95,7 @@ public void purgeOldStorage() throws IOException {
// handy for HA, where a remote node may not have as many // handy for HA, where a remote node may not have as many
// new images. // new images.
long purgeLogsFrom = Math.max(0, minImageTxId + 1 - numExtraEditsToRetain); long purgeLogsFrom = Math.max(0, minImageTxId + 1 - numExtraEditsToRetain);
editLog.purgeLogsOlderThan(purgeLogsFrom); purgeableLogs.purgeLogsOlderThan(purgeLogsFrom);
} }
private void purgeCheckpointsOlderThan( private void purgeCheckpointsOlderThan(
@ -103,7 +103,6 @@ private void purgeCheckpointsOlderThan(
long minTxId) { long minTxId) {
for (FSImageFile image : inspector.getFoundImages()) { for (FSImageFile image : inspector.getFoundImages()) {
if (image.getCheckpointTxId() < minTxId) { if (image.getCheckpointTxId() < minTxId) {
LOG.info("Purging old image " + image);
purger.purgeImage(image); purger.purgeImage(image);
} }
} }
@ -146,11 +145,13 @@ static interface StoragePurger {
static class DeletionStoragePurger implements StoragePurger { static class DeletionStoragePurger implements StoragePurger {
@Override @Override
public void purgeLog(EditLogFile log) { public void purgeLog(EditLogFile log) {
LOG.info("Purging old edit log " + log);
deleteOrWarn(log.getFile()); deleteOrWarn(log.getFile());
} }
@Override @Override
public void purgeImage(FSImageFile image) { public void purgeImage(FSImageFile image) {
LOG.info("Purging old image " + image);
deleteOrWarn(image.getFile()); deleteOrWarn(image.getFile());
deleteOrWarn(MD5FileUtils.getDigestFileForFile(image.getFile())); deleteOrWarn(MD5FileUtils.getDigestFileForFile(image.getFile()));
} }

View File

@ -58,6 +58,8 @@
import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ExitUtil.terminate;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@ -473,10 +475,6 @@ boolean doCheckpoint() throws IOException {
LOG.warn("Checkpoint done. New Image Size: " LOG.warn("Checkpoint done. New Image Size: "
+ dstStorage.getFsImageName(txid).length()); + dstStorage.getFsImageName(txid).length());
// Since we've successfully checkpointed, we can remove some old
// image files
checkpointImage.purgeOldStorage();
return loadImage; return loadImage;
} }
@ -703,6 +701,34 @@ private static CommandLineOpts parseArgs(String[] argv) {
} }
static class CheckpointStorage extends FSImage { static class CheckpointStorage extends FSImage {
private static class CheckpointLogPurger implements LogsPurgeable {
private NNStorage storage;
private StoragePurger purger
= new NNStorageRetentionManager.DeletionStoragePurger();
public CheckpointLogPurger(NNStorage storage) {
this.storage = storage;
}
@Override
public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
Iterator<StorageDirectory> iter = storage.dirIterator();
while (iter.hasNext()) {
StorageDirectory dir = iter.next();
List<EditLogFile> editFiles = FileJournalManager.matchEditLogs(
dir.getCurrentDir());
for (EditLogFile f : editFiles) {
if (f.getLastTxId() < minTxIdToKeep) {
purger.purgeLog(f);
}
}
}
}
}
/** /**
* Construct a checkpoint image. * Construct a checkpoint image.
* @param conf Node configuration. * @param conf Node configuration.
@ -719,6 +745,11 @@ static class CheckpointStorage extends FSImage {
// we shouldn't have any editLog instance. Setting to null // we shouldn't have any editLog instance. Setting to null
// makes sure we don't accidentally depend on it. // makes sure we don't accidentally depend on it.
editLog = null; editLog = null;
// Replace the archival manager with one that can actually work on the
// 2NN's edits storage.
this.archivalManager = new NNStorageRetentionManager(conf, storage,
new CheckpointLogPurger(storage));
} }
/** /**
@ -815,6 +846,7 @@ static void doMerge(
} }
Checkpointer.rollForwardByApplyingLogs(manifest, dstImage, dstNamesystem); Checkpointer.rollForwardByApplyingLogs(manifest, dstImage, dstNamesystem);
// The following has the side effect of purging old fsimages/edit logs.
dstImage.saveFSImageInAllDirs(dstNamesystem, dstImage.getLastAppliedTxId()); dstImage.saveFSImageInAllDirs(dstNamesystem, dstImage.getLastAppliedTxId());
dstStorage.writeAll(); dstStorage.writeAll();
} }

View File

@ -28,6 +28,7 @@
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -60,6 +61,7 @@
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.CheckpointStorage; import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.CheckpointStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@ -1836,6 +1838,50 @@ public void testSecondaryHasVeryOutOfDateImage() throws IOException {
} }
} }
/**
* Regression test for HDFS-3678 "Edit log files are never being purged from 2NN"
*/
@Test
public void testSecondaryPurgesEditLogs() throws IOException {
MiniDFSCluster cluster = null;
SecondaryNameNode secondary = null;
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, 0);
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(true).build();
FileSystem fs = cluster.getFileSystem();
fs.mkdirs(new Path("/foo"));
secondary = startSecondaryNameNode(conf);
// Checkpoint a few times. Doing this will cause a log roll, and thus
// several edit log segments on the 2NN.
for (int i = 0; i < 5; i++) {
secondary.doCheckpoint();
}
// Make sure there are no more edit log files than there should be.
List<File> checkpointDirs = getCheckpointCurrentDirs(secondary);
for (File checkpointDir : checkpointDirs) {
List<EditLogFile> editsFiles = FileJournalManager.matchEditLogs(
checkpointDir);
assertEquals("Edit log files were not purged from 2NN", 1,
editsFiles.size());
}
} finally {
if (secondary != null) {
secondary.shutdown();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
/** /**
* Regression test for HDFS-3835 - "Long-lived 2NN cannot perform a * Regression test for HDFS-3835 - "Long-lived 2NN cannot perform a
* checkpoint if security is enabled and the NN restarts without outstanding * checkpoint if security is enabled and the NN restarts without outstanding
@ -1940,7 +1986,7 @@ private void assertParallelFilesInvariant(MiniDFSCluster cluster,
ImmutableSet.of("VERSION")); ImmutableSet.of("VERSION"));
} }
private List<File> getCheckpointCurrentDirs(SecondaryNameNode secondary) { private static List<File> getCheckpointCurrentDirs(SecondaryNameNode secondary) {
List<File> ret = Lists.newArrayList(); List<File> ret = Lists.newArrayList();
for (URI u : secondary.getCheckpointDirs()) { for (URI u : secondary.getCheckpointDirs()) {
File checkpointDir = new File(u.getPath()); File checkpointDir = new File(u.getPath());
@ -1949,7 +1995,7 @@ private List<File> getCheckpointCurrentDirs(SecondaryNameNode secondary) {
return ret; return ret;
} }
private CheckpointStorage spyOnSecondaryImage(SecondaryNameNode secondary1) { private static CheckpointStorage spyOnSecondaryImage(SecondaryNameNode secondary1) {
CheckpointStorage spy = Mockito.spy((CheckpointStorage)secondary1.getFSImage());; CheckpointStorage spy = Mockito.spy((CheckpointStorage)secondary1.getFSImage());;
secondary1.setFSImage(spy); secondary1.setFSImage(spy);
return spy; return spy;