HADOOP-7460. Support pluggable trash policies. Contributed by Usman Masoon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1149760 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
233a7aa34f
commit
184ff33de5
@ -58,6 +58,8 @@ Trunk (unreleased changes)
|
|||||||
HADOOP-7380. Add client failover functionality to o.a.h.io.(ipc|retry).
|
HADOOP-7380. Add client failover functionality to o.a.h.io.(ipc|retry).
|
||||||
(atm via eli)
|
(atm via eli)
|
||||||
|
|
||||||
|
HADOOP-7460. Support pluggable trash policies. (Usman Masoon via suresh)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HADOOP-7042. Updates to test-patch.sh to include failed test names and
|
HADOOP-7042. Updates to test-patch.sh to include failed test names and
|
||||||
|
@ -17,60 +17,26 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.fs;
|
package org.apache.hadoop.fs;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT;
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
|
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.text.DateFormat;
|
|
||||||
import java.text.ParseException;
|
|
||||||
import java.text.SimpleDateFormat;
|
|
||||||
import java.util.Date;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
|
|
||||||
/** Provides a <i>trash</i> feature. Files are moved to a user's trash
|
/**
|
||||||
* directory, a subdirectory of their home directory named ".Trash". Files are
|
* Provides a trash facility which supports pluggable Trash policies.
|
||||||
* initially moved to a <i>current</i> sub-directory of the trash directory.
|
*
|
||||||
* Within that sub-directory their original path is preserved. Periodically
|
* See the implementation of the configured TrashPolicy for more
|
||||||
* one may checkpoint the current trash and remove older checkpoints. (This
|
* details.
|
||||||
* design permits trash management without enumeration of the full trash
|
|
||||||
* content, without date support in the filesystem, and without clock
|
|
||||||
* synchronization.)
|
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
public class Trash extends Configured {
|
public class Trash extends Configured {
|
||||||
private static final Log LOG =
|
private TrashPolicy trashPolicy; // configured trash policy instance
|
||||||
LogFactory.getLog(Trash.class);
|
|
||||||
|
|
||||||
private static final Path CURRENT = new Path("Current");
|
/**
|
||||||
private static final Path TRASH = new Path(".Trash/");
|
* Construct a trash can accessor.
|
||||||
|
|
||||||
|
|
||||||
private static final FsPermission PERMISSION =
|
|
||||||
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
|
|
||||||
|
|
||||||
private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmmss");
|
|
||||||
private static final int MSECS_PER_MINUTE = 60*1000;
|
|
||||||
|
|
||||||
private final FileSystem fs;
|
|
||||||
private final Path trash;
|
|
||||||
private final Path current;
|
|
||||||
private final long deletionInterval;
|
|
||||||
private final Path homesParent;
|
|
||||||
|
|
||||||
/** Construct a trash can accessor.
|
|
||||||
* @param conf a Configuration
|
* @param conf a Configuration
|
||||||
*/
|
*/
|
||||||
public Trash(Configuration conf) throws IOException {
|
public Trash(Configuration conf) throws IOException {
|
||||||
@ -79,22 +45,18 @@ public class Trash extends Configured {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a trash can accessor for the FileSystem provided.
|
* Construct a trash can accessor for the FileSystem provided.
|
||||||
|
* @param fs the FileSystem
|
||||||
|
* @param conf a Configuration
|
||||||
*/
|
*/
|
||||||
public Trash(FileSystem fs, Configuration conf) throws IOException {
|
public Trash(FileSystem fs, Configuration conf) throws IOException {
|
||||||
super(conf);
|
super(conf);
|
||||||
this.fs = fs;
|
trashPolicy = TrashPolicy.getInstance(conf, fs, fs.getHomeDirectory());
|
||||||
this.trash = new Path(fs.getHomeDirectory(), TRASH);
|
|
||||||
this.homesParent = fs.getHomeDirectory().getParent();
|
|
||||||
this.current = new Path(trash, CURRENT);
|
|
||||||
this.deletionInterval = (long) (conf.getFloat(FS_TRASH_INTERVAL_KEY,
|
|
||||||
FS_TRASH_INTERVAL_DEFAULT) *
|
|
||||||
MSECS_PER_MINUTE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In case of the symlinks or mount points, one has to move the appropriate
|
* In case of the symlinks or mount points, one has to move the appropriate
|
||||||
* trashbin in the actual volume of the path p being deleted.
|
* trashbin in the actual volume of the path p being deleted.
|
||||||
*
|
*
|
||||||
* Hence we get the file system of the fully-qualified resolved-path and
|
* Hence we get the file system of the fully-qualified resolved-path and
|
||||||
* then move the path p to the trashbin in that volume,
|
* then move the path p to the trashbin in that volume,
|
||||||
* @param fs - the filesystem of path p
|
* @param fs - the filesystem of path p
|
||||||
@ -115,240 +77,49 @@ public class Trash extends Configured {
|
|||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Trash(Path home, Configuration conf) throws IOException {
|
|
||||||
super(conf);
|
|
||||||
this.fs = home.getFileSystem(conf);
|
|
||||||
this.trash = new Path(home, TRASH);
|
|
||||||
this.homesParent = home.getParent();
|
|
||||||
this.current = new Path(trash, CURRENT);
|
|
||||||
this.deletionInterval = (long) (conf.getFloat(FS_TRASH_INTERVAL_KEY,
|
|
||||||
FS_TRASH_INTERVAL_DEFAULT) *
|
|
||||||
MSECS_PER_MINUTE);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {
|
|
||||||
return new Path(basePath + rmFilePath.toUri().getPath());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns whether the trash is enabled for this filesystem
|
* Returns whether the trash is enabled for this filesystem
|
||||||
*/
|
*/
|
||||||
public boolean isEnabled() {
|
public boolean isEnabled() {
|
||||||
return (deletionInterval != 0);
|
return trashPolicy.isEnabled();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Move a file or directory to the current trash directory.
|
/** Move a file or directory to the current trash directory.
|
||||||
* @return false if the item is already in the trash or trash is disabled
|
* @return false if the item is already in the trash or trash is disabled
|
||||||
*/
|
*/
|
||||||
public boolean moveToTrash(Path path) throws IOException {
|
public boolean moveToTrash(Path path) throws IOException {
|
||||||
if (!isEnabled())
|
return trashPolicy.moveToTrash(path);
|
||||||
return false;
|
|
||||||
|
|
||||||
if (!path.isAbsolute()) // make path absolute
|
|
||||||
path = new Path(fs.getWorkingDirectory(), path);
|
|
||||||
|
|
||||||
if (!fs.exists(path)) // check that path exists
|
|
||||||
throw new FileNotFoundException(path.toString());
|
|
||||||
|
|
||||||
String qpath = fs.makeQualified(path).toString();
|
|
||||||
|
|
||||||
if (qpath.startsWith(trash.toString())) {
|
|
||||||
return false; // already in trash
|
|
||||||
}
|
|
||||||
|
|
||||||
if (trash.getParent().toString().startsWith(qpath)) {
|
|
||||||
throw new IOException("Cannot move \"" + path +
|
|
||||||
"\" to the trash, as it contains the trash");
|
|
||||||
}
|
|
||||||
|
|
||||||
Path trashPath = makeTrashRelativePath(current, path);
|
|
||||||
Path baseTrashPath = makeTrashRelativePath(current, path.getParent());
|
|
||||||
|
|
||||||
IOException cause = null;
|
|
||||||
|
|
||||||
// try twice, in case checkpoint between the mkdirs() & rename()
|
|
||||||
for (int i = 0; i < 2; i++) {
|
|
||||||
try {
|
|
||||||
if (!fs.mkdirs(baseTrashPath, PERMISSION)) { // create current
|
|
||||||
LOG.warn("Can't create(mkdir) trash directory: "+baseTrashPath);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Can't create trash directory: "+baseTrashPath);
|
|
||||||
cause = e;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
//
|
|
||||||
// if the target path in Trash already exists, then append with
|
|
||||||
// a current time in millisecs.
|
|
||||||
//
|
|
||||||
String orig = trashPath.toString();
|
|
||||||
|
|
||||||
while(fs.exists(trashPath)) {
|
|
||||||
trashPath = new Path(orig + System.currentTimeMillis());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (fs.rename(path, trashPath)) // move to current trash
|
|
||||||
return true;
|
|
||||||
} catch (IOException e) {
|
|
||||||
cause = e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw (IOException)
|
|
||||||
new IOException("Failed to move to trash: "+path).initCause(cause);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Create a trash checkpoint. */
|
/** Create a trash checkpoint. */
|
||||||
public void checkpoint() throws IOException {
|
public void checkpoint() throws IOException {
|
||||||
if (!fs.exists(current)) // no trash, no checkpoint
|
trashPolicy.createCheckpoint();
|
||||||
return;
|
|
||||||
|
|
||||||
Path checkpoint;
|
|
||||||
synchronized (CHECKPOINT) {
|
|
||||||
checkpoint = new Path(trash, CHECKPOINT.format(new Date()));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (fs.rename(current, checkpoint)) {
|
|
||||||
LOG.info("Created trash checkpoint: "+checkpoint.toUri().getPath());
|
|
||||||
} else {
|
|
||||||
throw new IOException("Failed to checkpoint trash: "+checkpoint);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Delete old checkpoints. */
|
/** Delete old checkpoint(s). */
|
||||||
public void expunge() throws IOException {
|
public void expunge() throws IOException {
|
||||||
FileStatus[] dirs = null;
|
trashPolicy.deleteCheckpoint();
|
||||||
|
|
||||||
try {
|
|
||||||
dirs = fs.listStatus(trash); // scan trash sub-directories
|
|
||||||
} catch (FileNotFoundException fnfe) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
long now = System.currentTimeMillis();
|
|
||||||
for (int i = 0; i < dirs.length; i++) {
|
|
||||||
Path path = dirs[i].getPath();
|
|
||||||
String dir = path.toUri().getPath();
|
|
||||||
String name = path.getName();
|
|
||||||
if (name.equals(CURRENT.getName())) // skip current
|
|
||||||
continue;
|
|
||||||
|
|
||||||
long time;
|
|
||||||
try {
|
|
||||||
synchronized (CHECKPOINT) {
|
|
||||||
time = CHECKPOINT.parse(name).getTime();
|
|
||||||
}
|
|
||||||
} catch (ParseException e) {
|
|
||||||
LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((now - deletionInterval) > time) {
|
|
||||||
if (fs.delete(path, true)) {
|
|
||||||
LOG.info("Deleted trash checkpoint: "+dir);
|
|
||||||
} else {
|
|
||||||
LOG.warn("Couldn't delete checkpoint: "+dir+" Ignoring.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
/** get the current working directory */
|
||||||
// get the current working directory
|
|
||||||
//
|
|
||||||
Path getCurrentTrashDir() {
|
Path getCurrentTrashDir() {
|
||||||
return current;
|
return trashPolicy.getCurrentTrashDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** get the configured trash policy */
|
||||||
|
TrashPolicy getTrashPolicy() {
|
||||||
|
return trashPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Return a {@link Runnable} that periodically empties the trash of all
|
/** Return a {@link Runnable} that periodically empties the trash of all
|
||||||
* users, intended to be run by the superuser. Only one checkpoint is kept
|
* users, intended to be run by the superuser.
|
||||||
* at a time.
|
|
||||||
*/
|
*/
|
||||||
public Runnable getEmptier() throws IOException {
|
public Runnable getEmptier() throws IOException {
|
||||||
return new Emptier(getConf());
|
return trashPolicy.getEmptier();
|
||||||
}
|
|
||||||
|
|
||||||
private class Emptier implements Runnable {
|
|
||||||
|
|
||||||
private Configuration conf;
|
|
||||||
private long emptierInterval;
|
|
||||||
|
|
||||||
Emptier(Configuration conf) throws IOException {
|
|
||||||
this.conf = conf;
|
|
||||||
this.emptierInterval = (long) (conf.getFloat(FS_TRASH_CHECKPOINT_INTERVAL_KEY,
|
|
||||||
FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT) *
|
|
||||||
MSECS_PER_MINUTE);
|
|
||||||
if (this.emptierInterval > deletionInterval ||
|
|
||||||
this.emptierInterval == 0) {
|
|
||||||
LOG.warn("The configured interval for checkpoint is " +
|
|
||||||
this.emptierInterval + " minutes." +
|
|
||||||
" Using interval of " + deletionInterval +
|
|
||||||
" minutes that is used for deletion instead");
|
|
||||||
this.emptierInterval = deletionInterval;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void run() {
|
|
||||||
if (emptierInterval == 0)
|
|
||||||
return; // trash disabled
|
|
||||||
long now = System.currentTimeMillis();
|
|
||||||
long end;
|
|
||||||
while (true) {
|
|
||||||
end = ceiling(now, emptierInterval);
|
|
||||||
try { // sleep for interval
|
|
||||||
Thread.sleep(end - now);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
break; // exit on interrupt
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
now = System.currentTimeMillis();
|
|
||||||
if (now >= end) {
|
|
||||||
|
|
||||||
FileStatus[] homes = null;
|
|
||||||
try {
|
|
||||||
homes = fs.listStatus(homesParent); // list all home dirs
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Trash can't list homes: "+e+" Sleeping.");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (FileStatus home : homes) { // dump each trash
|
|
||||||
if (!home.isDirectory())
|
|
||||||
continue;
|
|
||||||
try {
|
|
||||||
Trash trash = new Trash(home.getPath(), conf);
|
|
||||||
trash.expunge();
|
|
||||||
trash.checkpoint();
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Trash caught: "+e+". Skipping "+home.getPath()+".");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.warn("RuntimeException during Trash.Emptier.run(): ", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
fs.close();
|
|
||||||
} catch(IOException e) {
|
|
||||||
LOG.warn("Trash cannot close FileSystem: ", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private long ceiling(long time, long interval) {
|
|
||||||
return floor(time, interval) + interval;
|
|
||||||
}
|
|
||||||
private long floor(long time, long interval) {
|
|
||||||
return (time / interval) * interval;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Run an emptier.*/
|
/** Run an emptier.*/
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
new Trash(new Configuration()).getEmptier().run();
|
new Trash(new Configuration()).getEmptier().run();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
98
common/src/java/org/apache/hadoop/fs/TrashPolicy.java
Normal file
98
common/src/java/org/apache/hadoop/fs/TrashPolicy.java
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This interface is used for implementing different Trash policies.
|
||||||
|
* Provides factory method to create instances of the configured Trash policy.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public abstract class TrashPolicy extends Configured {
|
||||||
|
protected FileSystem fs; // the FileSystem
|
||||||
|
protected Path trash; // path to trash directory
|
||||||
|
protected long deletionInterval; // deletion interval for Emptier
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to setup the trash policy. Must be implemented by all TrashPolicy
|
||||||
|
* implementations
|
||||||
|
* @param conf the configuration to be used
|
||||||
|
* @param fs the filesystem to be used
|
||||||
|
* @param home the home directory
|
||||||
|
*/
|
||||||
|
public abstract void initialize(Configuration conf, FileSystem fs, Path home);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns whether the Trash Policy is enabled for this filesystem
|
||||||
|
*/
|
||||||
|
public abstract boolean isEnabled();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Move a file or directory to the current trash directory.
|
||||||
|
* @return false if the item is already in the trash or trash is disabled
|
||||||
|
*/
|
||||||
|
public abstract boolean moveToTrash(Path path) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a trash checkpoint.
|
||||||
|
*/
|
||||||
|
public abstract void createCheckpoint() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete old trash checkpoint(s).
|
||||||
|
*/
|
||||||
|
public abstract void deleteCheckpoint() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the current working directory of the Trash Policy
|
||||||
|
*/
|
||||||
|
public abstract Path getCurrentTrashDir();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a {@link Runnable} that periodically empties the trash of all
|
||||||
|
* users, intended to be run by the superuser.
|
||||||
|
*/
|
||||||
|
public abstract Runnable getEmptier() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get an instance of the configured TrashPolicy based on the value
|
||||||
|
* of the configuration paramater fs.trash.classname.
|
||||||
|
*
|
||||||
|
* @param conf the configuration to be used
|
||||||
|
* @param fs the file system to be used
|
||||||
|
* @param home the home directory
|
||||||
|
* @return an instance of TrashPolicy
|
||||||
|
*/
|
||||||
|
public static TrashPolicy getInstance(Configuration conf, FileSystem fs, Path home)
|
||||||
|
throws IOException {
|
||||||
|
Class<? extends TrashPolicy> trashClass = conf.getClass("fs.trash.classname",
|
||||||
|
TrashPolicyDefault.class,
|
||||||
|
TrashPolicy.class);
|
||||||
|
TrashPolicy trash = (TrashPolicy) ReflectionUtils.newInstance(trashClass, conf);
|
||||||
|
trash.initialize(conf, fs, home); // initialize TrashPolicy
|
||||||
|
return trash;
|
||||||
|
}
|
||||||
|
}
|
291
common/src/java/org/apache/hadoop/fs/TrashPolicyDefault.java
Normal file
291
common/src/java/org/apache/hadoop/fs/TrashPolicyDefault.java
Normal file
@ -0,0 +1,291 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.text.DateFormat;
|
||||||
|
import java.text.ParseException;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
|
||||||
|
/** Provides a <i>trash</i> feature. Files are moved to a user's trash
|
||||||
|
* directory, a subdirectory of their home directory named ".Trash". Files are
|
||||||
|
* initially moved to a <i>current</i> sub-directory of the trash directory.
|
||||||
|
* Within that sub-directory their original path is preserved. Periodically
|
||||||
|
* one may checkpoint the current trash and remove older checkpoints. (This
|
||||||
|
* design permits trash management without enumeration of the full trash
|
||||||
|
* content, without date support in the filesystem, and without clock
|
||||||
|
* synchronization.)
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class TrashPolicyDefault extends TrashPolicy {
|
||||||
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(TrashPolicyDefault.class);
|
||||||
|
|
||||||
|
private static final Path CURRENT = new Path("Current");
|
||||||
|
private static final Path TRASH = new Path(".Trash/");
|
||||||
|
|
||||||
|
private static final FsPermission PERMISSION =
|
||||||
|
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
|
||||||
|
|
||||||
|
private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmmss");
|
||||||
|
private static final int MSECS_PER_MINUTE = 60*1000;
|
||||||
|
|
||||||
|
private Path current;
|
||||||
|
private Path homesParent;
|
||||||
|
|
||||||
|
public TrashPolicyDefault() { }
|
||||||
|
|
||||||
|
private TrashPolicyDefault(Path home, Configuration conf) throws IOException {
|
||||||
|
initialize(conf, home.getFileSystem(conf), home);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(Configuration conf, FileSystem fs, Path home) {
|
||||||
|
this.fs = fs;
|
||||||
|
this.trash = new Path(home, TRASH);
|
||||||
|
this.homesParent = home.getParent();
|
||||||
|
this.current = new Path(trash, CURRENT);
|
||||||
|
this.deletionInterval = (long) (conf.getFloat(FS_TRASH_INTERVAL_KEY,
|
||||||
|
FS_TRASH_INTERVAL_DEFAULT) * MSECS_PER_MINUTE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {
|
||||||
|
return new Path(basePath + rmFilePath.toUri().getPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isEnabled() {
|
||||||
|
return (deletionInterval != 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean moveToTrash(Path path) throws IOException {
|
||||||
|
if (!isEnabled())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (!path.isAbsolute()) // make path absolute
|
||||||
|
path = new Path(fs.getWorkingDirectory(), path);
|
||||||
|
|
||||||
|
if (!fs.exists(path)) // check that path exists
|
||||||
|
throw new FileNotFoundException(path.toString());
|
||||||
|
|
||||||
|
String qpath = fs.makeQualified(path).toString();
|
||||||
|
|
||||||
|
if (qpath.startsWith(trash.toString())) {
|
||||||
|
return false; // already in trash
|
||||||
|
}
|
||||||
|
|
||||||
|
if (trash.getParent().toString().startsWith(qpath)) {
|
||||||
|
throw new IOException("Cannot move \"" + path +
|
||||||
|
"\" to the trash, as it contains the trash");
|
||||||
|
}
|
||||||
|
|
||||||
|
Path trashPath = makeTrashRelativePath(current, path);
|
||||||
|
Path baseTrashPath = makeTrashRelativePath(current, path.getParent());
|
||||||
|
|
||||||
|
IOException cause = null;
|
||||||
|
|
||||||
|
// try twice, in case checkpoint between the mkdirs() & rename()
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
try {
|
||||||
|
if (!fs.mkdirs(baseTrashPath, PERMISSION)) { // create current
|
||||||
|
LOG.warn("Can't create(mkdir) trash directory: "+baseTrashPath);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Can't create trash directory: "+baseTrashPath);
|
||||||
|
cause = e;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
// if the target path in Trash already exists, then append with
|
||||||
|
// a current time in millisecs.
|
||||||
|
String orig = trashPath.toString();
|
||||||
|
|
||||||
|
while(fs.exists(trashPath)) {
|
||||||
|
trashPath = new Path(orig + System.currentTimeMillis());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fs.rename(path, trashPath)) // move to current trash
|
||||||
|
return true;
|
||||||
|
} catch (IOException e) {
|
||||||
|
cause = e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw (IOException)
|
||||||
|
new IOException("Failed to move to trash: "+path).initCause(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void createCheckpoint() throws IOException {
|
||||||
|
if (!fs.exists(current)) // no trash, no checkpoint
|
||||||
|
return;
|
||||||
|
|
||||||
|
Path checkpoint;
|
||||||
|
synchronized (CHECKPOINT) {
|
||||||
|
checkpoint = new Path(trash, CHECKPOINT.format(new Date()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fs.rename(current, checkpoint)) {
|
||||||
|
LOG.info("Created trash checkpoint: "+checkpoint.toUri().getPath());
|
||||||
|
} else {
|
||||||
|
throw new IOException("Failed to checkpoint trash: "+checkpoint);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteCheckpoint() throws IOException {
|
||||||
|
FileStatus[] dirs = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
dirs = fs.listStatus(trash); // scan trash sub-directories
|
||||||
|
} catch (FileNotFoundException fnfe) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
for (int i = 0; i < dirs.length; i++) {
|
||||||
|
Path path = dirs[i].getPath();
|
||||||
|
String dir = path.toUri().getPath();
|
||||||
|
String name = path.getName();
|
||||||
|
if (name.equals(CURRENT.getName())) // skip current
|
||||||
|
continue;
|
||||||
|
|
||||||
|
long time;
|
||||||
|
try {
|
||||||
|
synchronized (CHECKPOINT) {
|
||||||
|
time = CHECKPOINT.parse(name).getTime();
|
||||||
|
}
|
||||||
|
} catch (ParseException e) {
|
||||||
|
LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((now - deletionInterval) > time) {
|
||||||
|
if (fs.delete(path, true)) {
|
||||||
|
LOG.info("Deleted trash checkpoint: "+dir);
|
||||||
|
} else {
|
||||||
|
LOG.warn("Couldn't delete checkpoint: "+dir+" Ignoring.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Path getCurrentTrashDir() {
|
||||||
|
return current;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable getEmptier() throws IOException {
|
||||||
|
return new Emptier(getConf());
|
||||||
|
}
|
||||||
|
|
||||||
|
private class Emptier implements Runnable {
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private long emptierInterval;
|
||||||
|
|
||||||
|
Emptier(Configuration conf) throws IOException {
|
||||||
|
this.conf = conf;
|
||||||
|
this.emptierInterval = (long) (conf.getFloat(FS_TRASH_CHECKPOINT_INTERVAL_KEY,
|
||||||
|
FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT) *
|
||||||
|
MSECS_PER_MINUTE);
|
||||||
|
if (this.emptierInterval > deletionInterval ||
|
||||||
|
this.emptierInterval == 0) {
|
||||||
|
LOG.warn("The configured interval for checkpoint is " +
|
||||||
|
this.emptierInterval + " minutes." +
|
||||||
|
" Using interval of " + deletionInterval +
|
||||||
|
" minutes that is used for deletion instead");
|
||||||
|
this.emptierInterval = deletionInterval;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
if (emptierInterval == 0)
|
||||||
|
return; // trash disabled
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
long end;
|
||||||
|
while (true) {
|
||||||
|
end = ceiling(now, emptierInterval);
|
||||||
|
try { // sleep for interval
|
||||||
|
Thread.sleep(end - now);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
break; // exit on interrupt
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
now = System.currentTimeMillis();
|
||||||
|
if (now >= end) {
|
||||||
|
|
||||||
|
FileStatus[] homes = null;
|
||||||
|
try {
|
||||||
|
homes = fs.listStatus(homesParent); // list all home dirs
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Trash can't list homes: "+e+" Sleeping.");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (FileStatus home : homes) { // dump each trash
|
||||||
|
if (!home.isDirectory())
|
||||||
|
continue;
|
||||||
|
try {
|
||||||
|
TrashPolicyDefault trash = new TrashPolicyDefault(home.getPath(), conf);
|
||||||
|
trash.deleteCheckpoint();
|
||||||
|
trash.createCheckpoint();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Trash caught: "+e+". Skipping "+home.getPath()+".");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("RuntimeException during Trash.Emptier.run(): ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
fs.close();
|
||||||
|
} catch(IOException e) {
|
||||||
|
LOG.warn("Trash cannot close FileSystem: ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private long ceiling(long time, long interval) {
|
||||||
|
return floor(time, interval) + interval;
|
||||||
|
}
|
||||||
|
private long floor(long time, long interval) {
|
||||||
|
return (time / interval) * interval;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -480,6 +480,15 @@ public class TestTrash extends TestCase {
|
|||||||
trashNonDefaultFS(conf);
|
trashNonDefaultFS(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testPluggableTrash() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
// Test plugged TrashPolicy
|
||||||
|
conf.setClass("fs.trash.classname", TestTrashPolicy.class, TrashPolicy.class);
|
||||||
|
Trash trash = new Trash(conf);
|
||||||
|
assertTrue(trash.getTrashPolicy().getClass().equals(TestTrashPolicy.class));
|
||||||
|
}
|
||||||
|
|
||||||
public void testTrashEmptier() throws Exception {
|
public void testTrashEmptier() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
// Trash with 12 second deletes and 6 seconds checkpoints
|
// Trash with 12 second deletes and 6 seconds checkpoints
|
||||||
@ -638,4 +647,41 @@ public class TestTrash extends TestCase {
|
|||||||
// run performance piece as a separate test
|
// run performance piece as a separate test
|
||||||
performanceTestDeleteSameFile();
|
performanceTestDeleteSameFile();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test TrashPolicy. Don't care about implementation.
|
||||||
|
public static class TestTrashPolicy extends TrashPolicy {
|
||||||
|
public TestTrashPolicy() { }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(Configuration conf, FileSystem fs, Path home) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isEnabled() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean moveToTrash(Path path) throws IOException {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void createCheckpoint() throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteCheckpoint() throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Path getCurrentTrashDir() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable getEmptier() throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user