From 686c0141eff0886c285b8e52fddade43c1ce4570 Mon Sep 17 00:00:00 2001 From: Stephen O'Donnell Date: Tue, 5 Mar 2019 14:09:00 +0000 Subject: [PATCH] HADOOP-16140. hadoop fs expunge to add -immediate option to purge trash immediately. Contributed by Stephen O'Donnell. Signed-off-by: Steve Loughran --- .../main/java/org/apache/hadoop/fs/Trash.java | 6 ++ .../org/apache/hadoop/fs/TrashPolicy.java | 5 ++ .../apache/hadoop/fs/TrashPolicyDefault.java | 18 +++-- .../org/apache/hadoop/fs/shell/Delete.java | 25 +++++-- .../src/site/markdown/FileSystemShell.md | 5 +- .../java/org/apache/hadoop/fs/TestTrash.java | 65 +++++++++++++++++-- 6 files changed, 107 insertions(+), 17 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Trash.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Trash.java index 49cd600628..e29cb9a4e0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Trash.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Trash.java @@ -120,6 +120,12 @@ public void expunge() throws IOException { trashPolicy.deleteCheckpoint(); } + /** Delete all trash immediately. */ + public void expungeImmediately() throws IOException { + trashPolicy.createCheckpoint(); + trashPolicy.deleteCheckpointsImmediately(); + } + /** get the current working directory */ Path getCurrentTrashDir() throws IOException { return trashPolicy.getCurrentTrashDir(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicy.java index 2fe3fd1a87..64fb81be99 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicy.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicy.java @@ -79,6 +79,11 @@ public void initialize(Configuration conf, FileSystem fs) { */ public abstract void deleteCheckpoint() throws IOException; + /** + * Delete all checkpoints immediately, ie empty trash. + */ + public abstract void deleteCheckpointsImmediately() throws IOException; + /** * Get the current working directory of the Trash Policy * This API does not work with files deleted from encryption zone when HDFS diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java index 39d5e73f0b..9f07d3de1e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java @@ -213,11 +213,20 @@ public void createCheckpoint(Date date) throws IOException { @Override public void deleteCheckpoint() throws IOException { + deleteCheckpoint(false); + } + + @Override + public void deleteCheckpointsImmediately() throws IOException { + deleteCheckpoint(true); + } + + private void deleteCheckpoint(boolean deleteImmediately) throws IOException { Collection trashRoots = fs.getTrashRoots(false); for (FileStatus trashRoot : trashRoots) { LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " + trashRoot.getPath()); - deleteCheckpoint(trashRoot.getPath()); + deleteCheckpoint(trashRoot.getPath(), deleteImmediately); } } @@ -283,7 +292,7 @@ public void run() { continue; try { TrashPolicyDefault trash = new TrashPolicyDefault(fs, conf); - trash.deleteCheckpoint(trashRoot.getPath()); + trash.deleteCheckpoint(trashRoot.getPath(), false); trash.createCheckpoint(trashRoot.getPath(), new Date(now)); } catch (IOException e) { LOG.warn("Trash caught: "+e+". Skipping " + @@ -341,7 +350,8 @@ private void createCheckpoint(Path trashRoot, Date date) throws IOException { } } - private void deleteCheckpoint(Path trashRoot) throws IOException { + private void deleteCheckpoint(Path trashRoot, boolean deleteImmediately) + throws IOException { LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " + trashRoot); FileStatus[] dirs = null; @@ -368,7 +378,7 @@ private void deleteCheckpoint(Path trashRoot) throws IOException { continue; } - if ((now - deletionInterval) > time) { + if (((now - deletionInterval) > time) || deleteImmediately) { if (fs.delete(path, true)) { LOG.info("Deleted trash checkpoint: "+dir); } else { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Delete.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Delete.java index a066395015..57b543acc2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Delete.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Delete.java @@ -219,16 +219,20 @@ protected void processPath(PathData item) throws IOException { // than the retention threshold. static class Expunge extends FsCommand { public static final String NAME = "expunge"; - public static final String USAGE = ""; + public static final String USAGE = + "[-immediate]"; public static final String DESCRIPTION = "Delete files from the trash that are older " + "than the retention threshold"; + private boolean emptyImmediately = false; + // TODO: should probably allow path arguments for the filesystems @Override protected void processOptions(LinkedList args) throws IOException { - CommandFormat cf = new CommandFormat(0, 0); + CommandFormat cf = new CommandFormat(0, 1, "immediate"); cf.parse(args); + emptyImmediately = cf.getOpt("immediate"); } @Override @@ -239,14 +243,23 @@ protected void processArguments(LinkedList args) if (null != childFileSystems) { for (FileSystem fs : childFileSystems) { Trash trash = new Trash(fs, getConf()); - trash.expunge(); - trash.checkpoint(); + if (emptyImmediately) { + trash.expungeImmediately(); + } else { + trash.expunge(); + trash.checkpoint(); + } } } else { Trash trash = new Trash(getConf()); - trash.expunge(); - trash.checkpoint(); + if (emptyImmediately) { + trash.expungeImmediately(); + } else { + trash.expunge(); + trash.checkpoint(); + } } } } + } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md index d9567b9a9d..f4a37ea036 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md @@ -264,7 +264,7 @@ Displays a summary of file lengths. expunge ------- -Usage: `hadoop fs -expunge` +Usage: `hadoop fs -expunge [-immediate]` Permanently delete files in checkpoints older than the retention threshold from trash directory, and create new checkpoint. @@ -279,6 +279,9 @@ users can configure to create and delete checkpoints periodically by the parameter stored as `fs.trash.checkpoint.interval` (in core-site.xml). This value should be smaller or equal to `fs.trash.interval`. +If the `-immediate` option is passed, all files in the trash for the current +user are immediately deleted, ignoring the `fs.trash.interval` setting. + Refer to the [HDFS Architecture guide](../hadoop-hdfs/HdfsDesign.html#File_Deletes_and_Undeletes) for more information about trash feature of HDFS. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestTrash.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestTrash.java index 04f56fb075..cf22f3b10b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestTrash.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestTrash.java @@ -36,6 +36,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.Assert.*; import org.apache.hadoop.conf.Configuration; @@ -486,6 +488,41 @@ public static void trashShell(final Configuration conf, final Path base, trashRootFs.exists(dirToKeep)); } + // Verify expunge -immediate removes all checkpoints and current folder + { + // Setup a recent and old checkpoint and a current folder + // to be deleted on the next expunge and one that isn't. + long trashInterval = conf.getLong(FS_TRASH_INTERVAL_KEY, + FS_TRASH_INTERVAL_DEFAULT); + long now = Time.now(); + DateFormat checkpointFormat = new SimpleDateFormat("yyMMddHHmm"); + Path oldCheckpoint = new Path(trashRoot.getParent(), + checkpointFormat.format(now - (trashInterval * 60 * 1000) - 1)); + Path recentCheckpoint = new Path(trashRoot.getParent(), + checkpointFormat.format(now)); + Path currentFolder = new Path(trashRoot.getParent(), "Current"); + mkdir(trashRootFs, oldCheckpoint); + mkdir(trashRootFs, recentCheckpoint); + mkdir(trashRootFs, currentFolder); + + // Clear out trash + int rc = -1; + try { + rc = shell.run(new String[] {"-expunge", "-immediate"}); + } catch (Exception e) { + fail("Unexpected exception running the trash shell: " + + e.getLocalizedMessage()); + } + assertEquals("Expunge immediate should return zero", 0, rc); + assertFalse("Old checkpoint should be removed", + trashRootFs.exists(oldCheckpoint)); + assertFalse("Recent checkpoint should be removed", + trashRootFs.exists(recentCheckpoint)); + assertFalse("Current folder should be removed", + trashRootFs.exists(currentFolder)); + assertEquals("Ensure trash folder is empty", + trashRootFs.listStatus(trashRoot.getParent()).length, 0); + } } public static void trashNonDefaultFS(Configuration conf) throws IOException { @@ -1000,6 +1037,10 @@ public void createCheckpoint() throws IOException { public void deleteCheckpoint() throws IOException { } + @Override + public void deleteCheckpointsImmediately() throws IOException { + } + @Override public Path getCurrentTrashDir() { return null; @@ -1059,6 +1100,11 @@ public void deleteCheckpoint() throws IOException { AuditableCheckpoints.delete(); } + @Override + public void deleteCheckpointsImmediately() throws IOException { + AuditableCheckpoints.deleteAll(); + } + @Override public Path getCurrentTrashDir() { return null; @@ -1115,25 +1161,32 @@ public boolean isEnabled() { */ private static class AuditableCheckpoints { + private static final Logger LOG = + LoggerFactory.getLogger(AuditableCheckpoints.class); + private static AtomicInteger numOfCheckpoint = new AtomicInteger(0); private static void add() { numOfCheckpoint.incrementAndGet(); - System.out.println(String - .format("Create a checkpoint, current number of checkpoints %d", - numOfCheckpoint.get())); + LOG.info("Create a checkpoint, current number of checkpoints {}", + numOfCheckpoint.get()); } private static void delete() { if(numOfCheckpoint.get() > 0) { numOfCheckpoint.decrementAndGet(); - System.out.println(String - .format("Delete a checkpoint, current number of checkpoints %d", - numOfCheckpoint.get())); + LOG.info("Delete a checkpoint, current number of checkpoints {}", + numOfCheckpoint.get()); } } + private static void deleteAll() { + numOfCheckpoint.set(0); + LOG.info("Delete all checkpoints, current number of checkpoints {}", + numOfCheckpoint.get()); + } + private static int get() { return numOfCheckpoint.get(); }