diff --git a/common/CHANGES.txt b/common/CHANGES.txt
index db9c9940d6..952540dcf2 100644
--- a/common/CHANGES.txt
+++ b/common/CHANGES.txt
@@ -58,6 +58,8 @@ Trunk (unreleased changes)
HADOOP-7380. Add client failover functionality to o.a.h.io.(ipc|retry).
(atm via eli)
+ HADOOP-7460. Support pluggable trash policies. (Usman Masoon via suresh)
+
IMPROVEMENTS
HADOOP-7042. Updates to test-patch.sh to include failed test names and
diff --git a/common/src/java/org/apache/hadoop/fs/Trash.java b/common/src/java/org/apache/hadoop/fs/Trash.java
index d90a2e1989..76033eaa6f 100644
--- a/common/src/java/org/apache/hadoop/fs/Trash.java
+++ b/common/src/java/org/apache/hadoop/fs/Trash.java
@@ -17,60 +17,26 @@
*/
package org.apache.hadoop.fs;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
-
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.StringUtils;
-/** Provides a trash feature. Files are moved to a user's trash
- * directory, a subdirectory of their home directory named ".Trash". Files are
- * initially moved to a current sub-directory of the trash directory.
- * Within that sub-directory their original path is preserved. Periodically
- * one may checkpoint the current trash and remove older checkpoints. (This
- * design permits trash management without enumeration of the full trash
- * content, without date support in the filesystem, and without clock
- * synchronization.)
+/**
+ * Provides a trash facility which supports pluggable Trash policies.
+ *
+ * See the implementation of the configured TrashPolicy for more
+ * details.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Trash extends Configured {
- private static final Log LOG =
- LogFactory.getLog(Trash.class);
+ private TrashPolicy trashPolicy; // configured trash policy instance
- private static final Path CURRENT = new Path("Current");
- private static final Path TRASH = new Path(".Trash/");
-
-
- private static final FsPermission PERMISSION =
- new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
-
- private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmmss");
- private static final int MSECS_PER_MINUTE = 60*1000;
-
- private final FileSystem fs;
- private final Path trash;
- private final Path current;
- private final long deletionInterval;
- private final Path homesParent;
-
- /** Construct a trash can accessor.
+ /**
+ * Construct a trash can accessor.
* @param conf a Configuration
*/
public Trash(Configuration conf) throws IOException {
@@ -79,22 +45,18 @@ public class Trash extends Configured {
/**
* Construct a trash can accessor for the FileSystem provided.
+ * @param fs the FileSystem
+ * @param conf a Configuration
*/
public Trash(FileSystem fs, Configuration conf) throws IOException {
super(conf);
- this.fs = fs;
- this.trash = new Path(fs.getHomeDirectory(), TRASH);
- this.homesParent = fs.getHomeDirectory().getParent();
- this.current = new Path(trash, CURRENT);
- this.deletionInterval = (long) (conf.getFloat(FS_TRASH_INTERVAL_KEY,
- FS_TRASH_INTERVAL_DEFAULT) *
- MSECS_PER_MINUTE);
+ trashPolicy = TrashPolicy.getInstance(conf, fs, fs.getHomeDirectory());
}
-
+
/**
* In case of the symlinks or mount points, one has to move the appropriate
* trashbin in the actual volume of the path p being deleted.
- *
+ *
* Hence we get the file system of the fully-qualified resolved-path and
* then move the path p to the trashbin in that volume,
* @param fs - the filesystem of path p
@@ -115,240 +77,49 @@ public class Trash extends Configured {
return success;
}
- private Trash(Path home, Configuration conf) throws IOException {
- super(conf);
- this.fs = home.getFileSystem(conf);
- this.trash = new Path(home, TRASH);
- this.homesParent = home.getParent();
- this.current = new Path(trash, CURRENT);
- this.deletionInterval = (long) (conf.getFloat(FS_TRASH_INTERVAL_KEY,
- FS_TRASH_INTERVAL_DEFAULT) *
- MSECS_PER_MINUTE);
- }
-
- private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {
- return new Path(basePath + rmFilePath.toUri().getPath());
- }
-
/**
* Returns whether the trash is enabled for this filesystem
*/
public boolean isEnabled() {
- return (deletionInterval != 0);
+ return trashPolicy.isEnabled();
}
/** Move a file or directory to the current trash directory.
* @return false if the item is already in the trash or trash is disabled
*/
public boolean moveToTrash(Path path) throws IOException {
- if (!isEnabled())
- return false;
-
- if (!path.isAbsolute()) // make path absolute
- path = new Path(fs.getWorkingDirectory(), path);
-
- if (!fs.exists(path)) // check that path exists
- throw new FileNotFoundException(path.toString());
-
- String qpath = fs.makeQualified(path).toString();
-
- if (qpath.startsWith(trash.toString())) {
- return false; // already in trash
- }
-
- if (trash.getParent().toString().startsWith(qpath)) {
- throw new IOException("Cannot move \"" + path +
- "\" to the trash, as it contains the trash");
- }
-
- Path trashPath = makeTrashRelativePath(current, path);
- Path baseTrashPath = makeTrashRelativePath(current, path.getParent());
-
- IOException cause = null;
-
- // try twice, in case checkpoint between the mkdirs() & rename()
- for (int i = 0; i < 2; i++) {
- try {
- if (!fs.mkdirs(baseTrashPath, PERMISSION)) { // create current
- LOG.warn("Can't create(mkdir) trash directory: "+baseTrashPath);
- return false;
- }
- } catch (IOException e) {
- LOG.warn("Can't create trash directory: "+baseTrashPath);
- cause = e;
- break;
- }
- try {
- //
- // if the target path in Trash already exists, then append with
- // a current time in millisecs.
- //
- String orig = trashPath.toString();
-
- while(fs.exists(trashPath)) {
- trashPath = new Path(orig + System.currentTimeMillis());
- }
-
- if (fs.rename(path, trashPath)) // move to current trash
- return true;
- } catch (IOException e) {
- cause = e;
- }
- }
- throw (IOException)
- new IOException("Failed to move to trash: "+path).initCause(cause);
+ return trashPolicy.moveToTrash(path);
}
/** Create a trash checkpoint. */
public void checkpoint() throws IOException {
- if (!fs.exists(current)) // no trash, no checkpoint
- return;
-
- Path checkpoint;
- synchronized (CHECKPOINT) {
- checkpoint = new Path(trash, CHECKPOINT.format(new Date()));
- }
-
- if (fs.rename(current, checkpoint)) {
- LOG.info("Created trash checkpoint: "+checkpoint.toUri().getPath());
- } else {
- throw new IOException("Failed to checkpoint trash: "+checkpoint);
- }
+ trashPolicy.createCheckpoint();
}
- /** Delete old checkpoints. */
+ /** Delete old checkpoint(s). */
public void expunge() throws IOException {
- FileStatus[] dirs = null;
-
- try {
- dirs = fs.listStatus(trash); // scan trash sub-directories
- } catch (FileNotFoundException fnfe) {
- return;
- }
-
- long now = System.currentTimeMillis();
- for (int i = 0; i < dirs.length; i++) {
- Path path = dirs[i].getPath();
- String dir = path.toUri().getPath();
- String name = path.getName();
- if (name.equals(CURRENT.getName())) // skip current
- continue;
-
- long time;
- try {
- synchronized (CHECKPOINT) {
- time = CHECKPOINT.parse(name).getTime();
- }
- } catch (ParseException e) {
- LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
- continue;
- }
-
- if ((now - deletionInterval) > time) {
- if (fs.delete(path, true)) {
- LOG.info("Deleted trash checkpoint: "+dir);
- } else {
- LOG.warn("Couldn't delete checkpoint: "+dir+" Ignoring.");
- }
- }
- }
+ trashPolicy.deleteCheckpoint();
}
- //
- // get the current working directory
- //
+ /** get the current working directory */
Path getCurrentTrashDir() {
- return current;
+ return trashPolicy.getCurrentTrashDir();
+ }
+
+ /** get the configured trash policy */
+ TrashPolicy getTrashPolicy() {
+ return trashPolicy;
}
/** Return a {@link Runnable} that periodically empties the trash of all
- * users, intended to be run by the superuser. Only one checkpoint is kept
- * at a time.
+ * users, intended to be run by the superuser.
*/
public Runnable getEmptier() throws IOException {
- return new Emptier(getConf());
- }
-
- private class Emptier implements Runnable {
-
- private Configuration conf;
- private long emptierInterval;
-
- Emptier(Configuration conf) throws IOException {
- this.conf = conf;
- this.emptierInterval = (long) (conf.getFloat(FS_TRASH_CHECKPOINT_INTERVAL_KEY,
- FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT) *
- MSECS_PER_MINUTE);
- if (this.emptierInterval > deletionInterval ||
- this.emptierInterval == 0) {
- LOG.warn("The configured interval for checkpoint is " +
- this.emptierInterval + " minutes." +
- " Using interval of " + deletionInterval +
- " minutes that is used for deletion instead");
- this.emptierInterval = deletionInterval;
- }
- }
-
- public void run() {
- if (emptierInterval == 0)
- return; // trash disabled
- long now = System.currentTimeMillis();
- long end;
- while (true) {
- end = ceiling(now, emptierInterval);
- try { // sleep for interval
- Thread.sleep(end - now);
- } catch (InterruptedException e) {
- break; // exit on interrupt
- }
-
- try {
- now = System.currentTimeMillis();
- if (now >= end) {
-
- FileStatus[] homes = null;
- try {
- homes = fs.listStatus(homesParent); // list all home dirs
- } catch (IOException e) {
- LOG.warn("Trash can't list homes: "+e+" Sleeping.");
- continue;
- }
-
- for (FileStatus home : homes) { // dump each trash
- if (!home.isDirectory())
- continue;
- try {
- Trash trash = new Trash(home.getPath(), conf);
- trash.expunge();
- trash.checkpoint();
- } catch (IOException e) {
- LOG.warn("Trash caught: "+e+". Skipping "+home.getPath()+".");
- }
- }
- }
- } catch (Exception e) {
- LOG.warn("RuntimeException during Trash.Emptier.run(): ", e);
- }
- }
- try {
- fs.close();
- } catch(IOException e) {
- LOG.warn("Trash cannot close FileSystem: ", e);
- }
- }
-
- private long ceiling(long time, long interval) {
- return floor(time, interval) + interval;
- }
- private long floor(long time, long interval) {
- return (time / interval) * interval;
- }
-
+ return trashPolicy.getEmptier();
}
/** Run an emptier.*/
public static void main(String[] args) throws Exception {
new Trash(new Configuration()).getEmptier().run();
}
-
}
diff --git a/common/src/java/org/apache/hadoop/fs/TrashPolicy.java b/common/src/java/org/apache/hadoop/fs/TrashPolicy.java
new file mode 100644
index 0000000000..a168f7012e
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/fs/TrashPolicy.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+
+/**
+ * This interface is used for implementing different Trash policies.
+ * Provides factory method to create instances of the configured Trash policy.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class TrashPolicy extends Configured {
+ protected FileSystem fs; // the FileSystem
+ protected Path trash; // path to trash directory
+ protected long deletionInterval; // deletion interval for Emptier
+
+ /**
+ * Used to setup the trash policy. Must be implemented by all TrashPolicy
+ * implementations
+ * @param conf the configuration to be used
+ * @param fs the filesystem to be used
+ * @param home the home directory
+ */
+ public abstract void initialize(Configuration conf, FileSystem fs, Path home);
+
+ /**
+ * Returns whether the Trash Policy is enabled for this filesystem
+ */
+ public abstract boolean isEnabled();
+
+ /**
+ * Move a file or directory to the current trash directory.
+ * @return false if the item is already in the trash or trash is disabled
+ */
+ public abstract boolean moveToTrash(Path path) throws IOException;
+
+ /**
+ * Create a trash checkpoint.
+ */
+ public abstract void createCheckpoint() throws IOException;
+
+ /**
+ * Delete old trash checkpoint(s).
+ */
+ public abstract void deleteCheckpoint() throws IOException;
+
+ /**
+ * Get the current working directory of the Trash Policy
+ */
+ public abstract Path getCurrentTrashDir();
+
+ /**
+ * Return a {@link Runnable} that periodically empties the trash of all
+ * users, intended to be run by the superuser.
+ */
+ public abstract Runnable getEmptier() throws IOException;
+
+ /**
+ * Get an instance of the configured TrashPolicy based on the value
+ * of the configuration paramater fs.trash.classname.
+ *
+ * @param conf the configuration to be used
+ * @param fs the file system to be used
+ * @param home the home directory
+ * @return an instance of TrashPolicy
+ */
+ public static TrashPolicy getInstance(Configuration conf, FileSystem fs, Path home)
+ throws IOException {
+ Class extends TrashPolicy> trashClass = conf.getClass("fs.trash.classname",
+ TrashPolicyDefault.class,
+ TrashPolicy.class);
+ TrashPolicy trash = (TrashPolicy) ReflectionUtils.newInstance(trashClass, conf);
+ trash.initialize(conf, fs, home); // initialize TrashPolicy
+ return trash;
+ }
+}
diff --git a/common/src/java/org/apache/hadoop/fs/TrashPolicyDefault.java b/common/src/java/org/apache/hadoop/fs/TrashPolicyDefault.java
new file mode 100644
index 0000000000..6bb4454db6
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/fs/TrashPolicyDefault.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/** Provides a trash feature. Files are moved to a user's trash
+ * directory, a subdirectory of their home directory named ".Trash". Files are
+ * initially moved to a current sub-directory of the trash directory.
+ * Within that sub-directory their original path is preserved. Periodically
+ * one may checkpoint the current trash and remove older checkpoints. (This
+ * design permits trash management without enumeration of the full trash
+ * content, without date support in the filesystem, and without clock
+ * synchronization.)
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class TrashPolicyDefault extends TrashPolicy {
+ private static final Log LOG =
+ LogFactory.getLog(TrashPolicyDefault.class);
+
+ private static final Path CURRENT = new Path("Current");
+ private static final Path TRASH = new Path(".Trash/");
+
+ private static final FsPermission PERMISSION =
+ new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
+
+ private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmmss");
+ private static final int MSECS_PER_MINUTE = 60*1000;
+
+ private Path current;
+ private Path homesParent;
+
+ public TrashPolicyDefault() { }
+
+ private TrashPolicyDefault(Path home, Configuration conf) throws IOException {
+ initialize(conf, home.getFileSystem(conf), home);
+ }
+
+ @Override
+ public void initialize(Configuration conf, FileSystem fs, Path home) {
+ this.fs = fs;
+ this.trash = new Path(home, TRASH);
+ this.homesParent = home.getParent();
+ this.current = new Path(trash, CURRENT);
+ this.deletionInterval = (long) (conf.getFloat(FS_TRASH_INTERVAL_KEY,
+ FS_TRASH_INTERVAL_DEFAULT) * MSECS_PER_MINUTE);
+ }
+
+ private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {
+ return new Path(basePath + rmFilePath.toUri().getPath());
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return (deletionInterval != 0);
+ }
+
+ @Override
+ public boolean moveToTrash(Path path) throws IOException {
+ if (!isEnabled())
+ return false;
+
+ if (!path.isAbsolute()) // make path absolute
+ path = new Path(fs.getWorkingDirectory(), path);
+
+ if (!fs.exists(path)) // check that path exists
+ throw new FileNotFoundException(path.toString());
+
+ String qpath = fs.makeQualified(path).toString();
+
+ if (qpath.startsWith(trash.toString())) {
+ return false; // already in trash
+ }
+
+ if (trash.getParent().toString().startsWith(qpath)) {
+ throw new IOException("Cannot move \"" + path +
+ "\" to the trash, as it contains the trash");
+ }
+
+ Path trashPath = makeTrashRelativePath(current, path);
+ Path baseTrashPath = makeTrashRelativePath(current, path.getParent());
+
+ IOException cause = null;
+
+ // try twice, in case checkpoint between the mkdirs() & rename()
+ for (int i = 0; i < 2; i++) {
+ try {
+ if (!fs.mkdirs(baseTrashPath, PERMISSION)) { // create current
+ LOG.warn("Can't create(mkdir) trash directory: "+baseTrashPath);
+ return false;
+ }
+ } catch (IOException e) {
+ LOG.warn("Can't create trash directory: "+baseTrashPath);
+ cause = e;
+ break;
+ }
+ try {
+ // if the target path in Trash already exists, then append with
+ // a current time in millisecs.
+ String orig = trashPath.toString();
+
+ while(fs.exists(trashPath)) {
+ trashPath = new Path(orig + System.currentTimeMillis());
+ }
+
+ if (fs.rename(path, trashPath)) // move to current trash
+ return true;
+ } catch (IOException e) {
+ cause = e;
+ }
+ }
+ throw (IOException)
+ new IOException("Failed to move to trash: "+path).initCause(cause);
+ }
+
+ @Override
+ public void createCheckpoint() throws IOException {
+ if (!fs.exists(current)) // no trash, no checkpoint
+ return;
+
+ Path checkpoint;
+ synchronized (CHECKPOINT) {
+ checkpoint = new Path(trash, CHECKPOINT.format(new Date()));
+ }
+
+ if (fs.rename(current, checkpoint)) {
+ LOG.info("Created trash checkpoint: "+checkpoint.toUri().getPath());
+ } else {
+ throw new IOException("Failed to checkpoint trash: "+checkpoint);
+ }
+ }
+
+ @Override
+ public void deleteCheckpoint() throws IOException {
+ FileStatus[] dirs = null;
+
+ try {
+ dirs = fs.listStatus(trash); // scan trash sub-directories
+ } catch (FileNotFoundException fnfe) {
+ return;
+ }
+
+ long now = System.currentTimeMillis();
+ for (int i = 0; i < dirs.length; i++) {
+ Path path = dirs[i].getPath();
+ String dir = path.toUri().getPath();
+ String name = path.getName();
+ if (name.equals(CURRENT.getName())) // skip current
+ continue;
+
+ long time;
+ try {
+ synchronized (CHECKPOINT) {
+ time = CHECKPOINT.parse(name).getTime();
+ }
+ } catch (ParseException e) {
+ LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
+ continue;
+ }
+
+ if ((now - deletionInterval) > time) {
+ if (fs.delete(path, true)) {
+ LOG.info("Deleted trash checkpoint: "+dir);
+ } else {
+ LOG.warn("Couldn't delete checkpoint: "+dir+" Ignoring.");
+ }
+ }
+ }
+ }
+
+ @Override
+ public Path getCurrentTrashDir() {
+ return current;
+ }
+
+ @Override
+ public Runnable getEmptier() throws IOException {
+ return new Emptier(getConf());
+ }
+
+ private class Emptier implements Runnable {
+
+ private Configuration conf;
+ private long emptierInterval;
+
+ Emptier(Configuration conf) throws IOException {
+ this.conf = conf;
+ this.emptierInterval = (long) (conf.getFloat(FS_TRASH_CHECKPOINT_INTERVAL_KEY,
+ FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT) *
+ MSECS_PER_MINUTE);
+ if (this.emptierInterval > deletionInterval ||
+ this.emptierInterval == 0) {
+ LOG.warn("The configured interval for checkpoint is " +
+ this.emptierInterval + " minutes." +
+ " Using interval of " + deletionInterval +
+ " minutes that is used for deletion instead");
+ this.emptierInterval = deletionInterval;
+ }
+ }
+
+ public void run() {
+ if (emptierInterval == 0)
+ return; // trash disabled
+ long now = System.currentTimeMillis();
+ long end;
+ while (true) {
+ end = ceiling(now, emptierInterval);
+ try { // sleep for interval
+ Thread.sleep(end - now);
+ } catch (InterruptedException e) {
+ break; // exit on interrupt
+ }
+
+ try {
+ now = System.currentTimeMillis();
+ if (now >= end) {
+
+ FileStatus[] homes = null;
+ try {
+ homes = fs.listStatus(homesParent); // list all home dirs
+ } catch (IOException e) {
+ LOG.warn("Trash can't list homes: "+e+" Sleeping.");
+ continue;
+ }
+
+ for (FileStatus home : homes) { // dump each trash
+ if (!home.isDirectory())
+ continue;
+ try {
+ TrashPolicyDefault trash = new TrashPolicyDefault(home.getPath(), conf);
+ trash.deleteCheckpoint();
+ trash.createCheckpoint();
+ } catch (IOException e) {
+ LOG.warn("Trash caught: "+e+". Skipping "+home.getPath()+".");
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("RuntimeException during Trash.Emptier.run(): ", e);
+ }
+ }
+ try {
+ fs.close();
+ } catch(IOException e) {
+ LOG.warn("Trash cannot close FileSystem: ", e);
+ }
+ }
+
+ private long ceiling(long time, long interval) {
+ return floor(time, interval) + interval;
+ }
+ private long floor(long time, long interval) {
+ return (time / interval) * interval;
+ }
+ }
+}
diff --git a/common/src/test/core/org/apache/hadoop/fs/TestTrash.java b/common/src/test/core/org/apache/hadoop/fs/TestTrash.java
index 3f8c645c4f..4149af3c9b 100644
--- a/common/src/test/core/org/apache/hadoop/fs/TestTrash.java
+++ b/common/src/test/core/org/apache/hadoop/fs/TestTrash.java
@@ -480,6 +480,15 @@ public class TestTrash extends TestCase {
trashNonDefaultFS(conf);
}
+ public void testPluggableTrash() throws IOException {
+ Configuration conf = new Configuration();
+
+ // Test plugged TrashPolicy
+ conf.setClass("fs.trash.classname", TestTrashPolicy.class, TrashPolicy.class);
+ Trash trash = new Trash(conf);
+ assertTrue(trash.getTrashPolicy().getClass().equals(TestTrashPolicy.class));
+ }
+
public void testTrashEmptier() throws Exception {
Configuration conf = new Configuration();
// Trash with 12 second deletes and 6 seconds checkpoints
@@ -638,4 +647,41 @@ public class TestTrash extends TestCase {
// run performance piece as a separate test
performanceTestDeleteSameFile();
}
+
+ // Test TrashPolicy. Don't care about implementation.
+ public static class TestTrashPolicy extends TrashPolicy {
+ public TestTrashPolicy() { }
+
+ @Override
+ public void initialize(Configuration conf, FileSystem fs, Path home) {
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return false;
+ }
+
+ @Override
+ public boolean moveToTrash(Path path) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void createCheckpoint() throws IOException {
+ }
+
+ @Override
+ public void deleteCheckpoint() throws IOException {
+ }
+
+ @Override
+ public Path getCurrentTrashDir() {
+ return null;
+ }
+
+ @Override
+ public Runnable getEmptier() throws IOException {
+ return null;
+ }
+ }
}