HADOOP-6761. The Trash Emptier has the ability to run more frequently.
(Dmytro Molkov via dhruba) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@945439 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
69693b6a86
commit
175a92850d
@ -10,6 +10,9 @@ Trunk (unreleased changes)
|
||||
HADOOP-6623. Add StringUtils.split for non-escaped single-character
|
||||
separator. (Todd Lipcon via tomwhite)
|
||||
|
||||
HADOOP-6761. The Trash Emptier has the ability to run more frequently.
|
||||
(Dmytro Molkov via dhruba)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HADOOP-6612. Protocols RefreshUserToGroupMappingsProtocol and
|
||||
|
@ -162,11 +162,23 @@
|
||||
<property>
|
||||
<name>fs.trash.interval</name>
|
||||
<value>0</value>
|
||||
<description>Number of minutes between trash checkpoints.
|
||||
<description>Number of minutes after which the checkpoint
|
||||
gets deleted.
|
||||
If zero, the trash feature is disabled.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.trash.checkpoint.interval</name>
|
||||
<value>0</value>
|
||||
<description>Number of minutes between trash checkpoints.
|
||||
Should be smaller or equal to fs.trash.interval.
|
||||
Every time the checkpointer runs it creates a new checkpoint
|
||||
out of current and removes checkpoints created more than
|
||||
fs.trash.interval minutes ago.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.file.impl</name>
|
||||
<value>org.apache.hadoop.fs.LocalFileSystem</value>
|
||||
|
@ -55,6 +55,8 @@ public class CommonConfigurationKeys {
|
||||
|
||||
public static final String FS_CLIENT_BUFFER_DIR_KEY =
|
||||
"fs.client.buffer.dir";
|
||||
public static final String FS_TRASH_CHECKPOINT_INTERVAL_KEY = "fs.trash.checkpoint.interval";
|
||||
public static final long FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT = 0;
|
||||
|
||||
//TBD: Code is not updated to use following keys.
|
||||
//These keys will be used in later versions
|
||||
|
@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.*;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
@ -47,18 +48,19 @@ public class Trash extends Configured {
|
||||
|
||||
private static final Path CURRENT = new Path("Current");
|
||||
private static final Path TRASH = new Path(".Trash/");
|
||||
private static final Path HOMES = new Path("/user/");
|
||||
|
||||
|
||||
private static final FsPermission PERMISSION =
|
||||
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
|
||||
|
||||
private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmm");
|
||||
private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmmss");
|
||||
private static final int MSECS_PER_MINUTE = 60*1000;
|
||||
|
||||
private final FileSystem fs;
|
||||
private final Path trash;
|
||||
private final Path current;
|
||||
private final long interval;
|
||||
private final long deletionInterval;
|
||||
private final Path homesParent;
|
||||
|
||||
/** Construct a trash can accessor.
|
||||
* @param conf a Configuration
|
||||
@ -74,16 +76,22 @@ public class Trash extends Configured {
|
||||
super(conf);
|
||||
this.fs = fs;
|
||||
this.trash = new Path(fs.getHomeDirectory(), TRASH);
|
||||
this.homesParent = fs.getHomeDirectory().getParent();
|
||||
this.current = new Path(trash, CURRENT);
|
||||
this.interval = conf.getLong("fs.trash.interval", 60) * MSECS_PER_MINUTE;
|
||||
this.deletionInterval = (long) (conf.getFloat(FS_TRASH_INTERVAL_KEY,
|
||||
FS_TRASH_INTERVAL_DEFAULT) *
|
||||
MSECS_PER_MINUTE);
|
||||
}
|
||||
|
||||
private Trash(Path home, Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
this.fs = home.getFileSystem(conf);
|
||||
this.trash = new Path(home, TRASH);
|
||||
this.homesParent = home.getParent();
|
||||
this.current = new Path(trash, CURRENT);
|
||||
this.interval = conf.getLong("fs.trash.interval", 60) * MSECS_PER_MINUTE;
|
||||
this.deletionInterval = (long) (conf.getFloat(FS_TRASH_INTERVAL_KEY,
|
||||
FS_TRASH_INTERVAL_DEFAULT) *
|
||||
MSECS_PER_MINUTE);
|
||||
}
|
||||
|
||||
private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {
|
||||
@ -94,7 +102,7 @@ public class Trash extends Configured {
|
||||
* @return false if the item is already in the trash or trash is disabled
|
||||
*/
|
||||
public boolean moveToTrash(Path path) throws IOException {
|
||||
if (interval == 0)
|
||||
if (deletionInterval == 0)
|
||||
return false;
|
||||
|
||||
if (!path.isAbsolute()) // make path absolute
|
||||
@ -197,7 +205,7 @@ public class Trash extends Configured {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((now - interval) > time) {
|
||||
if ((now - deletionInterval) > time) {
|
||||
if (fs.delete(path, true)) {
|
||||
LOG.info("Deleted trash checkpoint: "+dir);
|
||||
} else {
|
||||
@ -225,34 +233,43 @@ public class Trash extends Configured {
|
||||
private class Emptier implements Runnable {
|
||||
|
||||
private Configuration conf;
|
||||
private long interval;
|
||||
private long emptierInterval;
|
||||
|
||||
Emptier(Configuration conf) throws IOException {
|
||||
this.conf = conf;
|
||||
this.interval = conf.getLong("fs.trash.interval", 0) * MSECS_PER_MINUTE;
|
||||
this.emptierInterval = (long) (conf.getFloat(FS_TRASH_CHECKPOINT_INTERVAL_KEY,
|
||||
FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT) *
|
||||
MSECS_PER_MINUTE);
|
||||
if (this.emptierInterval > deletionInterval ||
|
||||
this.emptierInterval == 0) {
|
||||
LOG.warn("The configured interval for checkpoint is " +
|
||||
this.emptierInterval + " minutes." +
|
||||
" Using interval of " + deletionInterval +
|
||||
" minutes that is used for deletion instead");
|
||||
this.emptierInterval = deletionInterval;
|
||||
}
|
||||
}
|
||||
|
||||
public void run() {
|
||||
if (interval == 0)
|
||||
if (emptierInterval == 0)
|
||||
return; // trash disabled
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
long end;
|
||||
while (true) {
|
||||
end = ceiling(now, interval);
|
||||
end = ceiling(now, emptierInterval);
|
||||
try { // sleep for interval
|
||||
Thread.sleep(end - now);
|
||||
} catch (InterruptedException e) {
|
||||
break; // exit on interrupt
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
now = System.currentTimeMillis();
|
||||
if (now >= end) {
|
||||
|
||||
FileStatus[] homes = null;
|
||||
try {
|
||||
homes = fs.listStatus(HOMES); // list all home dirs
|
||||
homes = fs.listStatus(homesParent); // list all home dirs
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Trash can't list homes: "+e+" Sleeping.");
|
||||
continue;
|
||||
|
@ -18,11 +18,15 @@
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.*;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
@ -89,7 +93,7 @@ public class TestTrash extends TestCase {
|
||||
protected static void trashShell(final FileSystem fs, final Path base)
|
||||
throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set("fs.trash.interval", "10"); // 10 minute
|
||||
conf.set(FS_TRASH_INTERVAL_KEY, "10"); // 10 minute
|
||||
conf.set("fs.default.name", fs.getUri().toString());
|
||||
FsShell shell = new FsShell();
|
||||
shell.setConf(conf);
|
||||
@ -376,7 +380,7 @@ public class TestTrash extends TestCase {
|
||||
}
|
||||
|
||||
public static void trashNonDefaultFS(Configuration conf) throws IOException {
|
||||
conf.set("fs.trash.interval", "10"); // 10 minute
|
||||
conf.set(FS_TRASH_INTERVAL_KEY, "10"); // 10 minute
|
||||
// attempt non-default FileSystem trash
|
||||
{
|
||||
final FileSystem lfs = FileSystem.getLocal(conf);
|
||||
@ -414,6 +418,67 @@ public class TestTrash extends TestCase {
|
||||
trashNonDefaultFS(conf);
|
||||
}
|
||||
|
||||
public void testTrashEmptier() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
// Trash with 12 second deletes and 6 seconds checkpoints
|
||||
conf.set(FS_TRASH_INTERVAL_KEY, "0.2"); // 12 seconds
|
||||
conf.setClass("fs.file.impl", TestLFS.class, FileSystem.class);
|
||||
conf.set(FS_TRASH_CHECKPOINT_INTERVAL_KEY, "0.1"); // 6 seconds
|
||||
Trash trash = new Trash(conf);
|
||||
|
||||
// Start Emptier in background
|
||||
Runnable emptier = trash.getEmptier();
|
||||
Thread emptierThread = new Thread(emptier);
|
||||
emptierThread.start();
|
||||
|
||||
FileSystem fs = FileSystem.getLocal(conf);
|
||||
conf.set("fs.default.name", fs.getUri().toString());
|
||||
FsShell shell = new FsShell();
|
||||
shell.setConf(conf);
|
||||
shell.init();
|
||||
// First create a new directory with mkdirs
|
||||
Path myPath = new Path(TEST_DIR, "test/mkdirs");
|
||||
mkdir(fs, myPath);
|
||||
int fileIndex = 0;
|
||||
Set<String> checkpoints = new HashSet<String>();
|
||||
while (true) {
|
||||
// Create a file with a new name
|
||||
Path myFile = new Path(TEST_DIR, "test/mkdirs/myFile" + fileIndex++);
|
||||
writeFile(fs, myFile);
|
||||
|
||||
// Delete the file to trash
|
||||
String[] args = new String[2];
|
||||
args[0] = "-rm";
|
||||
args[1] = myFile.toString();
|
||||
int val = -1;
|
||||
try {
|
||||
val = shell.run(args);
|
||||
} catch (Exception e) {
|
||||
System.err.println("Exception raised from Trash.run " +
|
||||
e.getLocalizedMessage());
|
||||
}
|
||||
assertTrue(val == 0);
|
||||
|
||||
Path trashDir = shell.getCurrentTrashDir();
|
||||
FileStatus files[] = fs.listStatus(trashDir.getParent());
|
||||
// Scan files in .Trash and add them to set of checkpoints
|
||||
for (FileStatus file : files) {
|
||||
String fileName = file.getPath().getName();
|
||||
checkpoints.add(fileName);
|
||||
}
|
||||
// If checkpoints has 4 objects it is Current + 3 checkpoint directories
|
||||
if (checkpoints.size() == 4) {
|
||||
// The actual contents should be smaller since the last checkpoint
|
||||
// should've been deleted and Current might not have been recreated yet
|
||||
assertTrue(checkpoints.size() > files.length);
|
||||
break;
|
||||
}
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
emptierThread.interrupt();
|
||||
emptierThread.join();
|
||||
}
|
||||
|
||||
/**
|
||||
* @see TestCase#tearDown()
|
||||
*/
|
||||
@ -428,7 +493,7 @@ public class TestTrash extends TestCase {
|
||||
static class TestLFS extends LocalFileSystem {
|
||||
Path home;
|
||||
TestLFS() {
|
||||
this(TEST_DIR);
|
||||
this(new Path(TEST_DIR, "user/test"));
|
||||
}
|
||||
TestLFS(Path home) {
|
||||
super();
|
||||
@ -451,7 +516,7 @@ public class TestTrash extends TestCase {
|
||||
FileSystem fs = FileSystem.getLocal(conf);
|
||||
|
||||
conf.set("fs.default.name", fs.getUri().toString());
|
||||
conf.set("fs.trash.interval", "10"); //minutes..
|
||||
conf.set(FS_TRASH_INTERVAL_KEY, "10"); //minutes..
|
||||
FsShell shell = new FsShell();
|
||||
shell.setConf(conf);
|
||||
//Path trashRoot = null;
|
||||
|
Loading…
x
Reference in New Issue
Block a user