diff --git a/src/contrib/ec2/bin/hadoop-ec2-init-remote.sh b/src/contrib/ec2/bin/hadoop-ec2-init-remote.sh
index 2327d4e09d..a95f4fa1f3 100644
--- a/src/contrib/ec2/bin/hadoop-ec2-init-remote.sh
+++ b/src/contrib/ec2/bin/hadoop-ec2-init-remote.sh
@@ -17,7 +17,9 @@ MASTER_HOST=%MASTER_HOST% # Interpolated before being sent to EC2 node
SECURITY_GROUPS=`wget -q -O - http://169.254.169.254/latest/meta-data/security-groups`
IS_MASTER=`echo $SECURITY_GROUPS | awk '{ a = match ($0, "-master$"); if (a) print "true"; else print "false"; }'`
if [ "$IS_MASTER" == "true" ]; then
- MASTER_HOST=`wget -q -O - http://169.254.169.254/latest/meta-data/local-hostname`
+ # use public hostnames for master. private hostnames can be used by substituting:
+ # MASTER_HOST=`wget -q -O - http://169.254.169.254/latest/meta-data/local-hostname`
+ MASTER_HOST=`wget -q -O - 'http://169.254.169.254/latest/meta-data/public-hostname'`
fi
HADOOP_HOME=`ls -d /usr/local/hadoop-*`
@@ -78,6 +80,12 @@ cat > $HADOOP_HOME/conf/hadoop-site.xml <3
+
+ hadoop.rpc.socket.factory.class.default
+ org.apache.hadoop.net.StandardSocketFactory
+ true
+
+
EOF
diff --git a/src/contrib/ec2/bin/launch-hadoop-slaves b/src/contrib/ec2/bin/launch-hadoop-slaves
index c9289c52af..b40c1d7996 100644
--- a/src/contrib/ec2/bin/launch-hadoop-slaves
+++ b/src/contrib/ec2/bin/launch-hadoop-slaves
@@ -42,7 +42,9 @@ fi
# Finding Hadoop image
AMI_IMAGE=`ec2-describe-images -a | grep $S3_BUCKET | grep $HADOOP_VERSION | grep $ARCH |grep available | awk '{print $2}'`
-MASTER_HOST=`cat $MASTER_PRIVATE_IP_PATH`
+# to use private master hostname, substitute below with:
+# MASTER_HOST=`cat $MASTER_PRIVATE_IP_PATH`
+MASTER_HOST=`cat $MASTER_IP_PATH`
MASTER_ZONE=`cat $MASTER_ZONE_PATH`
# Substituting master hostname
diff --git a/src/contrib/eclipse-plugin/build.xml b/src/contrib/eclipse-plugin/build.xml
index 4a46e3362e..8fca990811 100644
--- a/src/contrib/eclipse-plugin/build.xml
+++ b/src/contrib/eclipse-plugin/build.xml
@@ -67,7 +67,8 @@
-
+
+
diff --git a/src/java/core-default.xml b/src/java/core-default.xml
index b56dda4235..8fc055069f 100644
--- a/src/java/core-default.xml
+++ b/src/java/core-default.xml
@@ -247,6 +247,23 @@
+
+ fs.automatic.close
+ true
+ By default, FileSystem instances are automatically closed at program
+ exit using a JVM shutdown hook. Setting this property to false disables this
+ behavior. This is an advanced option that should only be used by server applications
+ requiring a more carefully orchestrated shutdown sequence.
+
+
+
+
+ fs.s3n.block.size
+ 67108864
+ Block size to use when reading files using the native S3
+ filesystem (s3n: URIs).
+
+
local.cache.size
10737418240
diff --git a/src/java/org/apache/hadoop/filecache/DistributedCache.java b/src/java/org/apache/hadoop/filecache/DistributedCache.java
index 8070e07355..542e47dad1 100644
--- a/src/java/org/apache/hadoop/filecache/DistributedCache.java
+++ b/src/java/org/apache/hadoop/filecache/DistributedCache.java
@@ -34,11 +34,11 @@
* framework to cache files (text, archives, jars etc.) needed by applications.
*
*
- * Applications specify the files, via urls (hdfs:// or http://) to be
- * cached via the org.apache.hadoop.mapred.JobConf.
- * The DistributedCache
assumes that the
- * files specified via hdfs:// urls are already present on the
- * {@link FileSystem} at the path specified by the url.
+ * Applications specify the files, via urls (hdfs:// or http://) to be cached
+ * via the org.apache.hadoop.mapred.JobConf. The
+ * DistributedCache
assumes that the files specified via urls are
+ * already present on the {@link FileSystem} at the path specified by the url
+ * and are accessible by every machine in the cluster.
*
* The framework will copy the necessary files on to the slave node before
* any tasks for the job are executed on that node. Its efficiency stems from
@@ -127,9 +127,7 @@ public class DistributedCache {
* previously cached (and valid) or copy it from the {@link FileSystem} now.
*
* @param cache the cache to be localized, this should be specified as
- * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema
- * or hostname:port is provided the file is assumed to be in the filesystem
- * being used in the Configuration
+ * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
* @param conf The Confguration file which contains the filesystem
* @param baseDir The base cache Dir where you wnat to localize the files/archives
* @param fileStatus The file status on the dfs.
@@ -160,9 +158,7 @@ public static Path getLocalCache(URI cache, Configuration conf,
* previously cached (and valid) or copy it from the {@link FileSystem} now.
*
* @param cache the cache to be localized, this should be specified as
- * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema
- * or hostname:port is provided the file is assumed to be in the filesystem
- * being used in the Configuration
+ * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
* @param conf The Confguration file which contains the filesystem
* @param baseDir The base cache Dir where you wnat to localize the files/archives
* @param fileStatus The file status on the dfs.
@@ -229,9 +225,7 @@ public static Path getLocalCache(URI cache, Configuration conf,
* previously cached (and valid) or copy it from the {@link FileSystem} now.
*
* @param cache the cache to be localized, this should be specified as
- * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema
- * or hostname:port is provided the file is assumed to be in the filesystem
- * being used in the Configuration
+ * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
* @param conf The Confguration file which contains the filesystem
* @param baseDir The base cache Dir where you wnat to localize the files/archives
* @param isArchive if the cache is an archive or a file. In case it is an
@@ -348,7 +342,7 @@ private static Path localizeCache(Configuration conf,
if(cache.getFragment() == null) {
doSymlink = false;
}
- FileSystem fs = getFileSystem(cache, conf);
+ FileSystem fs = FileSystem.get(cache, conf);
String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
File flink = new File(link);
if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
@@ -531,14 +525,6 @@ public static void createAllSymlink(Configuration conf, File jobCacheDir, File w
}
}
- private static FileSystem getFileSystem(URI cache, Configuration conf)
- throws IOException {
- if ("hdfs".equals(cache.getScheme()))
- return FileSystem.get(cache, conf);
- else
- return FileSystem.get(conf);
- }
-
/**
* Set the configuration with the given set of archives
* @param archives The list of archives that need to be localized
@@ -695,7 +681,7 @@ public static void addFileToClassPath(Path file, Configuration conf)
throws IOException {
String classpath = conf.get("mapred.job.classpath.files");
conf.set("mapred.job.classpath.files", classpath == null ? file.toString()
- : classpath + System.getProperty("path.separator") + file.toString());
+ : classpath + "," + file.toString());
FileSystem fs = FileSystem.get(conf);
URI uri = fs.makeQualified(file).toUri();
@@ -708,14 +694,14 @@ public static void addFileToClassPath(Path file, Configuration conf)
* @param conf Configuration that contains the classpath setting
*/
public static Path[] getFileClassPaths(Configuration conf) {
- String classpath = conf.get("mapred.job.classpath.files");
- if (classpath == null)
- return null;
- ArrayList list = Collections.list(new StringTokenizer(classpath, System
- .getProperty("path.separator")));
+ ArrayList list = (ArrayList)conf.getStringCollection(
+ "mapred.job.classpath.files");
+ if (list.size() == 0) {
+ return null;
+ }
Path[] paths = new Path[list.size()];
for (int i = 0; i < list.size(); i++) {
- paths[i] = new Path((String) list.get(i));
+ paths[i] = new Path(list.get(i));
}
return paths;
}
@@ -731,8 +717,7 @@ public static void addArchiveToClassPath(Path archive, Configuration conf)
throws IOException {
String classpath = conf.get("mapred.job.classpath.archives");
conf.set("mapred.job.classpath.archives", classpath == null ? archive
- .toString() : classpath + System.getProperty("path.separator")
- + archive.toString());
+ .toString() : classpath + "," + archive.toString());
FileSystem fs = FileSystem.get(conf);
URI uri = fs.makeQualified(archive).toUri();
@@ -745,14 +730,14 @@ public static void addArchiveToClassPath(Path archive, Configuration conf)
* @param conf Configuration that contains the classpath setting
*/
public static Path[] getArchiveClassPaths(Configuration conf) {
- String classpath = conf.get("mapred.job.classpath.archives");
- if (classpath == null)
- return null;
- ArrayList list = Collections.list(new StringTokenizer(classpath, System
- .getProperty("path.separator")));
+ ArrayList list = (ArrayList)conf.getStringCollection(
+ "mapred.job.classpath.archives");
+ if (list.size() == 0) {
+ return null;
+ }
Path[] paths = new Path[list.size()];
for (int i = 0; i < list.size(); i++) {
- paths[i] = new Path((String) list.get(i));
+ paths[i] = new Path(list.get(i));
}
return paths;
}
diff --git a/src/java/org/apache/hadoop/fs/CreateFlag.java b/src/java/org/apache/hadoop/fs/CreateFlag.java
new file mode 100644
index 0000000000..ab62a3b284
--- /dev/null
+++ b/src/java/org/apache/hadoop/fs/CreateFlag.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+/****************************************************************
+ *CreateFlag specifies the file create semantic. Users can combine flags like:
+ *
+ * EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND)
+ *
+ * and pass it to {@link org.apache.hadoop.fs.FileSystem #create(Path f, FsPermission permission,
+ * EnumSet flag, int bufferSize, short replication, long blockSize,
+ * Progressable progress)}.
+ *
+ *
+ * Combine {@link #OVERWRITE} with either {@link #CREATE}
+ * or {@link #APPEND} does the same as only use
+ * {@link #OVERWRITE}.
+ * Combine {@link #CREATE} with {@link #APPEND} has the semantic:
+ *
+ * - create the file if it does not exist;
+ *
- append the file if it already exists.
+ *
+ *****************************************************************/
+public enum CreateFlag {
+
+ /**
+ * create the file if it does not exist, and throw an IOException if it
+ * already exists
+ */
+ CREATE((short) 0x01),
+
+ /**
+ * create the file if it does not exist, if it exists, overwrite it.
+ */
+ OVERWRITE((short) 0x02),
+
+ /**
+ * append to a file, and throw an IOException if it does not exist
+ */
+ APPEND((short) 0x04);
+
+ private short mode;
+
+ private CreateFlag(short mode) {
+ this.mode = mode;
+ }
+
+ short getMode() {
+ return mode;
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/hadoop/fs/FileSystem.java b/src/java/org/apache/hadoop/fs/FileSystem.java
index fcc5817d27..afa4d04ffb 100644
--- a/src/java/org/apache/hadoop/fs/FileSystem.java
+++ b/src/java/org/apache/hadoop/fs/FileSystem.java
@@ -24,7 +24,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
@@ -65,12 +67,12 @@
* implementation is DistributedFileSystem.
*****************************************************************/
public abstract class FileSystem extends Configured implements Closeable {
- private static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
+ public static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
public static final Log LOG = LogFactory.getLog(FileSystem.class);
/** FileSystem cache */
- private static final Cache CACHE = new Cache();
+ static final Cache CACHE = new Cache();
/** The key this instance is stored under in the cache. */
private Cache.Key key;
@@ -224,17 +226,6 @@ public static LocalFileSystem newInstanceLocal(Configuration conf)
return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
}
- private static class ClientFinalizer extends Thread {
- public synchronized void run() {
- try {
- FileSystem.closeAll();
- } catch (IOException e) {
- LOG.info("FileSystem.closeAll() threw an exception:\n" + e);
- }
- }
- }
- private static final ClientFinalizer clientFinalizer = new ClientFinalizer();
-
/**
* Close all cached filesystems. Be sure those filesystems are not
* used anymore.
@@ -516,6 +507,7 @@ public FSDataOutputStream create(Path f,
/**
* Opens an FSDataOutputStream at the indicated Path with write-progress
* reporting.
+ * @deprecated Consider using {@link #create(Path, FsPermission, EnumSet, int, short, long, Progressable)} instead.
* @param f the file name to open
* @param permission
* @param overwrite if a file with this name already exists, then if true,
@@ -527,13 +519,36 @@ public FSDataOutputStream create(Path f,
* @throws IOException
* @see #setPermission(Path, FsPermission)
*/
- public abstract FSDataOutputStream create(Path f,
+ public FSDataOutputStream create(Path f,
FsPermission permission,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
- Progressable progress) throws IOException;
+ Progressable progress) throws IOException{
+ return create(f, permission, overwrite ? EnumSet.of(CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), bufferSize, replication, blockSize,
+ progress);
+ }
+
+ /**
+ * Opens an FSDataOutputStream at the indicated Path with write-progress
+ * reporting.
+ * @param f the file name to open.
+ * @param permission
+ * @param flag determines the semantic of this create.
+ * @param bufferSize the size of the buffer to be used.
+ * @param replication required block replication for the file.
+ * @param blockSize
+ * @param progress
+ * @throws IOException
+ * @see #setPermission(Path, FsPermission)
+ * @see CreateFlag
+ */
+ public abstract FSDataOutputStream create(Path f, FsPermission permission,
+ EnumSet flag, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException ;
+
/**
* Creates the given Path as a brand-new zero-length file. If
@@ -1409,7 +1424,10 @@ private static FileSystem createFileSystem(URI uri, Configuration conf
/** Caching FileSystem objects */
static class Cache {
+ private final ClientFinalizer clientFinalizer = new ClientFinalizer();
+
private final Map map = new HashMap();
+ private final Set toAutoClose = new HashSet();
/** A variable that makes all objects in the cache unique */
private static AtomicLong unique = new AtomicLong(1);
@@ -1434,6 +1452,10 @@ private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOEx
}
fs.key = key;
map.put(key, fs);
+
+ if (conf.getBoolean("fs.automatic.close", true)) {
+ toAutoClose.add(key);
+ }
}
return fs;
}
@@ -1441,6 +1463,7 @@ private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOEx
synchronized void remove(Key key, FileSystem fs) {
if (map.containsKey(key) && fs == map.get(key)) {
map.remove(key);
+ toAutoClose.remove(key);
if (map.isEmpty() && !clientFinalizer.isAlive()) {
if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {
LOG.info("Could not cancel cleanup thread, though no " +
@@ -1451,11 +1474,27 @@ synchronized void remove(Key key, FileSystem fs) {
}
synchronized void closeAll() throws IOException {
+ closeAll(false);
+ }
+
+ /**
+ * Close all FileSystem instances in the Cache.
+ * @param onlyAutomatic only close those that are marked for automatic closing
+ */
+ synchronized void closeAll(boolean onlyAutomatic) throws IOException {
List exceptions = new ArrayList();
- for(; !map.isEmpty(); ) {
- Map.Entry e = map.entrySet().iterator().next();
- final Key key = e.getKey();
- final FileSystem fs = e.getValue();
+
+ // Make a copy of the keys in the map since we'll be modifying
+ // the map while iterating over it, which isn't safe.
+ List keys = new ArrayList();
+ keys.addAll(map.keySet());
+
+ for (Key key : keys) {
+ final FileSystem fs = map.get(key);
+
+ if (onlyAutomatic && !toAutoClose.contains(key)) {
+ continue;
+ }
//remove from cache
remove(key, fs);
@@ -1475,6 +1514,16 @@ synchronized void closeAll() throws IOException {
}
}
+ private class ClientFinalizer extends Thread {
+ public synchronized void run() {
+ try {
+ closeAll(true);
+ } catch (IOException e) {
+ LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
+ }
+ }
+ }
+
/** FileSystem.Cache.Key */
static class Key {
final String scheme;
diff --git a/src/java/org/apache/hadoop/fs/FilterFileSystem.java b/src/java/org/apache/hadoop/fs/FilterFileSystem.java
index 2a2aa619af..0bcb322dc7 100644
--- a/src/java/org/apache/hadoop/fs/FilterFileSystem.java
+++ b/src/java/org/apache/hadoop/fs/FilterFileSystem.java
@@ -20,6 +20,7 @@
import java.io.*;
import java.net.URI;
+import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -106,10 +107,10 @@ public FSDataOutputStream append(Path f, int bufferSize,
/** {@inheritDoc} */
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
- boolean overwrite, int bufferSize, short replication, long blockSize,
+ EnumSet flag, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return fs.create(f, permission,
- overwrite, bufferSize, replication, blockSize, progress);
+ flag, bufferSize, replication, blockSize, progress);
}
/**
diff --git a/src/java/org/apache/hadoop/fs/FsShell.java b/src/java/org/apache/hadoop/fs/FsShell.java
index 987b499966..23a503fb84 100644
--- a/src/java/org/apache/hadoop/fs/FsShell.java
+++ b/src/java/org/apache/hadoop/fs/FsShell.java
@@ -61,6 +61,7 @@ public class FsShell extends Configured implements Tool {
static final String COPYTOLOCAL_SHORT_USAGE = GET_SHORT_USAGE.replace(
"-get", "-copyToLocal");
static final String TAIL_USAGE="-tail [-f] ";
+ static final String DU_USAGE="-du [-s] [-h] ";
/**
*/
@@ -670,58 +671,98 @@ void df(String path) throws IOException {
/**
* Show the size of all files that match the file pattern src
- * @param src a file pattern specifying source files
+ * @param cmd
+ * @param pos ignore anything before this pos in cmd
* @throws IOException
* @see org.apache.hadoop.fs.FileSystem#globStatus(Path)
*/
- void du(String src) throws IOException {
- Path srcPath = new Path(src);
- FileSystem srcFs = srcPath.getFileSystem(getConf());
- Path[] pathItems = FileUtil.stat2Paths(srcFs.globStatus(srcPath),
- srcPath);
- FileStatus items[] = srcFs.listStatus(pathItems);
- if ((items == null) || ((items.length == 0) &&
- (!srcFs.exists(srcPath)))){
- throw new FileNotFoundException("Cannot access " + src
- + ": No such file or directory.");
- } else {
- System.out.println("Found " + items.length + " items");
- int maxLength = 10;
-
- long length[] = new long[items.length];
- for (int i = 0; i < items.length; i++) {
- length[i] = items[i].isDir() ?
- srcFs.getContentSummary(items[i].getPath()).getLength() :
- items[i].getLen();
- int len = String.valueOf(length[i]).length();
- if (len > maxLength) maxLength = len;
+ void du(String[] cmd, int pos) throws IOException {
+ CommandFormat c = new CommandFormat(
+ "du", 0, Integer.MAX_VALUE, "h", "s");
+ List params;
+ try {
+ params = c.parse(cmd, pos);
+ } catch (IllegalArgumentException iae) {
+ System.err.println("Usage: java FsShell " + DU_USAGE);
+ throw iae;
+ }
+ boolean humanReadable = c.getOpt("h");
+ boolean summary = c.getOpt("s");
+
+ // Default to cwd
+ if (params.isEmpty()) {
+ params.add(".");
+ }
+
+ List usages = new ArrayList();
+
+ for (String src : params) {
+ Path srcPath = new Path(src);
+ FileSystem srcFs = srcPath.getFileSystem(getConf());
+ FileStatus globStatus[] = srcFs.globStatus(srcPath);
+ FileStatus statusToPrint[];
+
+ if (summary) {
+ statusToPrint = globStatus;
+ } else {
+ Path statPaths[] = FileUtil.stat2Paths(globStatus, srcPath);
+ statusToPrint = srcFs.listStatus(statPaths);
}
- for(int i = 0; i < items.length; i++) {
- System.out.printf("%-"+ (maxLength + BORDER) +"d", length[i]);
- System.out.println(items[i].getPath());
+ if ((statusToPrint == null) || ((statusToPrint.length == 0) &&
+ (!srcFs.exists(srcPath)))){
+ throw new FileNotFoundException("Cannot access " + src
+ + ": No such file or directory.");
+ }
+
+ if (!summary) {
+ System.out.println("Found " + statusToPrint.length + " items");
+ }
+
+ for (FileStatus stat : statusToPrint) {
+ long length;
+ if (summary || stat.isDir()) {
+ length = srcFs.getContentSummary(stat.getPath()).getLength();
+ } else {
+ length = stat.getLen();
+ }
+
+ usages.add(new UsagePair(String.valueOf(stat.getPath()), length));
}
}
+ printUsageSummary(usages, humanReadable);
}
/**
* Show the summary disk usage of each dir/file
* that matches the file pattern src
- * @param src a file pattern specifying source files
+ * @param cmd
+ * @param pos ignore anything before this pos in cmd
* @throws IOException
* @see org.apache.hadoop.fs.FileSystem#globStatus(Path)
*/
- void dus(String src) throws IOException {
- Path srcPath = new Path(src);
- FileSystem srcFs = srcPath.getFileSystem(getConf());
- FileStatus status[] = srcFs.globStatus(new Path(src));
- if (status==null || status.length==0) {
- throw new FileNotFoundException("Cannot access " + src +
- ": No such file or directory.");
+ void dus(String[] cmd, int pos) throws IOException {
+ String newcmd[] = new String[cmd.length + 1];
+ System.arraycopy(cmd, 0, newcmd, 0, cmd.length);
+ newcmd[cmd.length] = "-s";
+ du(newcmd, pos);
+ }
+
+ private void printUsageSummary(List usages,
+ boolean humanReadable) {
+ int maxColumnWidth = 0;
+ for (UsagePair usage : usages) {
+ String toPrint = humanReadable ?
+ StringUtils.humanReadableInt(usage.bytes) : String.valueOf(usage.bytes);
+ if (toPrint.length() > maxColumnWidth) {
+ maxColumnWidth = toPrint.length();
+ }
}
- for(int i=0; i flag, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
+
+ if(flag.contains(CreateFlag.APPEND)){
+ if (!exists(f)){
+ if(flag.contains(CreateFlag.CREATE))
+ return create(f, false, bufferSize, replication, blockSize, progress);
+ }
+ return append(f, bufferSize, progress);
+ }
+
FSDataOutputStream out = create(f,
- overwrite, bufferSize, replication, blockSize, progress);
+ flag.contains(CreateFlag.OVERWRITE), bufferSize, replication, blockSize, progress);
setPermission(f, permission);
return out;
}
diff --git a/src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java b/src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
index ee91f1c899..bfc1abcb5b 100644
--- a/src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
+++ b/src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
@@ -21,6 +21,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.util.EnumSet;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.net.ftp.FTP;
@@ -28,6 +30,7 @@
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -193,19 +196,30 @@ public FSDataInputStream open(Path file, int bufferSize) throws IOException {
*/
@Override
public FSDataOutputStream create(Path file, FsPermission permission,
- boolean overwrite, int bufferSize, short replication, long blockSize,
+ EnumSet flag, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
final FTPClient client = connect();
Path workDir = new Path(client.printWorkingDirectory());
Path absolute = makeAbsolute(workDir, file);
+
+ boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+ boolean create = flag.contains(CreateFlag.CREATE);
+ boolean append= flag.contains(CreateFlag.APPEND);
+
if (exists(client, file)) {
if (overwrite) {
delete(client, file);
+ } else if(append){
+ return append(file, bufferSize, progress);
} else {
disconnect(client);
throw new IOException("File already exists: " + file);
}
+ } else {
+ if(append && !create)
+ throw new FileNotFoundException("File does not exist: "+ file);
}
+
Path parent = absolute.getParent();
if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) {
parent = (parent == null) ? new Path("/") : parent;
diff --git a/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java b/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java
index f2a773663e..199264ab68 100644
--- a/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java
+++ b/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java
@@ -12,7 +12,6 @@
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
- * @author: Sriram Rao (Kosmix Corp.)
*
* We need to provide the ability to the code in fs/kfs without really
* having a KFS deployment. In particular, the glue code that wraps
diff --git a/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java b/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java
index bc66ec2570..b08c0b2986 100644
--- a/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java
+++ b/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java
@@ -12,7 +12,6 @@
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
- * @author: Sriram Rao (Kosmix Corp.)
*
* Provide the implementation of KFS which turn into calls to KfsAccess.
*/
diff --git a/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java b/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java
index bb2c32c31b..f1f4c3aec5 100644
--- a/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java
+++ b/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java
@@ -12,7 +12,6 @@
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
- * @author: Sriram Rao (Kosmix Corp.)
*
* Implements the Hadoop FSInputStream interfaces to allow applications to read
* files in Kosmos File System (KFS).
diff --git a/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java b/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java
index e55f4205d8..978f696663 100644
--- a/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java
+++ b/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java
@@ -12,7 +12,6 @@
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
- * @author: Sriram Rao (Kosmix Corp.)
*
* Implements the Hadoop FSOutputStream interfaces to allow applications to write to
* files in Kosmos File System (KFS).
diff --git a/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java b/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
index 57b27a2a0e..40d076261c 100644
--- a/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
+++ b/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
@@ -12,7 +12,6 @@
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
- * @author: Sriram Rao (Kosmix Corp.)
*
* Implements the Hadoop FS interfaces to allow applications to store
*files in Kosmos File System (KFS).
@@ -23,9 +22,11 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
+import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -120,7 +121,6 @@ public boolean mkdirs(Path path, FsPermission permission
}
@Override
- @Deprecated
public boolean isDirectory(Path path) throws IOException {
Path absolute = makeAbsolute(path);
String srep = absolute.toUri().getPath();
@@ -131,7 +131,6 @@ public boolean isDirectory(Path path) throws IOException {
}
@Override
- @Deprecated
public boolean isFile(Path path) throws IOException {
Path absolute = makeAbsolute(path);
String srep = absolute.toUri().getPath();
@@ -186,16 +185,25 @@ public FSDataOutputStream append(Path f, int bufferSize,
@Override
public FSDataOutputStream create(Path file, FsPermission permission,
- boolean overwrite, int bufferSize,
+ EnumSet flag, int bufferSize,
short replication, long blockSize, Progressable progress)
throws IOException {
+ boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+ boolean create = flag.contains(CreateFlag.CREATE);
+ boolean append= flag.contains(CreateFlag.APPEND);
+
if (exists(file)) {
if (overwrite) {
delete(file, true);
+ } else if (append){
+ return append(file, bufferSize, progress);
} else {
throw new IOException("File already exists: " + file);
}
+ } else {
+ if(append && !create)
+ throw new FileNotFoundException("File does not exist: "+ file);
}
Path parent = file.getParent();
diff --git a/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java b/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
index b0013aa0a9..ea2030435b 100644
--- a/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
+++ b/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
@@ -22,12 +22,14 @@
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -201,18 +203,24 @@ public FSDataOutputStream append(Path f, int bufferSize,
*/
@Override
public FSDataOutputStream create(Path file, FsPermission permission,
- boolean overwrite, int bufferSize,
+ EnumSet flag, int bufferSize,
short replication, long blockSize, Progressable progress)
throws IOException {
INode inode = store.retrieveINode(makeAbsolute(file));
if (inode != null) {
- if (overwrite) {
+ if (flag.contains(CreateFlag.OVERWRITE)) {
delete(file, true);
+ } else if (flag.contains(CreateFlag.APPEND)){
+ return append(file, bufferSize, progress);
} else {
throw new IOException("File already exists: " + file);
}
} else {
+
+ if(flag.contains(CreateFlag.APPEND) && !flag.contains(CreateFlag.CREATE))
+ throw new FileNotFoundException("File does not exist: "+ file);
+
Path parent = file.getParent();
if (parent != null) {
if (!mkdirs(parent)) {
@@ -324,6 +332,11 @@ public FileStatus getFileStatus(Path f) throws IOException {
}
return new S3FileStatus(f.makeQualified(this), inode);
}
+
+ @Override
+ public long getDefaultBlockSize() {
+ return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
+ }
// diagnostic methods
diff --git a/src/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java b/src/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
index b24a8e06b7..d4a79d8d83 100644
--- a/src/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
+++ b/src/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
@@ -24,6 +24,7 @@
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@@ -53,10 +54,7 @@ public void initialize(URI uri, Configuration conf) throws IOException {
s3Credentials.getSecretAccessKey());
this.s3Service = new RestS3Service(awsCredentials);
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(e);
}
bucket = new S3Bucket(uri.getHost());
}
@@ -76,10 +74,7 @@ public void storeFile(String key, File file, byte[] md5Hash)
}
s3Service.putObject(bucket, object);
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(e);
} finally {
if (in != null) {
try {
@@ -99,10 +94,7 @@ public void storeEmptyFile(String key) throws IOException {
object.setContentLength(0);
s3Service.putObject(bucket, object);
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(e);
}
}
@@ -116,10 +108,8 @@ public FileMetadata retrieveMetadata(String key) throws IOException {
if (e.getMessage().contains("ResponseCode=404")) {
return null;
}
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(e);
+ return null; //never returned - keep compiler happy
}
}
@@ -128,13 +118,8 @@ public InputStream retrieve(String key) throws IOException {
S3Object object = s3Service.getObject(bucket, key);
return object.getDataInputStream();
} catch (S3ServiceException e) {
- if ("NoSuchKey".equals(e.getS3ErrorCode())) {
- return null;
- }
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(key, e);
+ return null; //never returned - keep compiler happy
}
}
@@ -145,32 +130,22 @@ public InputStream retrieve(String key, long byteRangeStart)
null, byteRangeStart, null);
return object.getDataInputStream();
} catch (S3ServiceException e) {
- if ("NoSuchKey".equals(e.getS3ErrorCode())) {
- return null;
- }
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(key, e);
+ return null; //never returned - keep compiler happy
}
}
public PartialListing list(String prefix, int maxListingLength)
throws IOException {
- return list(prefix, maxListingLength, null);
+ return list(prefix, maxListingLength, null, false);
}
- public PartialListing list(String prefix, int maxListingLength,
- String priorLastKey) throws IOException {
+ public PartialListing list(String prefix, int maxListingLength, String priorLastKey,
+ boolean recurse) throws IOException {
- return list(prefix, PATH_DELIMITER, maxListingLength, priorLastKey);
+ return list(prefix, recurse ? null : PATH_DELIMITER, maxListingLength, priorLastKey);
}
- public PartialListing listAll(String prefix, int maxListingLength,
- String priorLastKey) throws IOException {
-
- return list(prefix, null, maxListingLength, priorLastKey);
- }
private PartialListing list(String prefix, String delimiter,
int maxListingLength, String priorLastKey) throws IOException {
@@ -191,10 +166,8 @@ private PartialListing list(String prefix, String delimiter,
return new PartialListing(chunk.getPriorLastKey(), fileMetadata,
chunk.getCommonPrefixes());
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(e);
+ return null; //never returned - keep compiler happy
}
}
@@ -202,36 +175,27 @@ public void delete(String key) throws IOException {
try {
s3Service.deleteObject(bucket, key);
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(key, e);
}
}
- public void rename(String srcKey, String dstKey) throws IOException {
+ public void copy(String srcKey, String dstKey) throws IOException {
try {
- s3Service.moveObject(bucket.getName(), srcKey, bucket.getName(),
+ s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(),
new S3Object(dstKey), false);
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(srcKey, e);
}
}
public void purge(String prefix) throws IOException {
try {
S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
- for (int i = 0; i < objects.length; i++) {
- s3Service.deleteObject(bucket, objects[i].getKey());
+ for (S3Object object : objects) {
+ s3Service.deleteObject(bucket, object.getKey());
}
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(e);
}
}
@@ -240,16 +204,29 @@ public void dump() throws IOException {
sb.append(bucket.getName()).append("\n");
try {
S3Object[] objects = s3Service.listObjects(bucket);
- for (int i = 0; i < objects.length; i++) {
- sb.append(objects[i].getKey()).append("\n");
+ for (S3Object object : objects) {
+ sb.append(object.getKey()).append("\n");
}
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(e);
}
System.out.println(sb);
}
-
+
+ private void handleServiceException(String key, S3ServiceException e) throws IOException {
+ if ("NoSuchKey".equals(e.getS3ErrorCode())) {
+ throw new FileNotFoundException("Key '" + key + "' does not exist in S3");
+ } else {
+ handleServiceException(e);
+ }
+ }
+
+ private void handleServiceException(S3ServiceException e) throws IOException {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ else {
+ throw new S3Exception(e);
+ }
+ }
}
diff --git a/src/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java b/src/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java
index eb0a682486..7ee80f24f2 100644
--- a/src/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java
+++ b/src/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java
@@ -42,14 +42,12 @@ interface NativeFileSystemStore {
InputStream retrieve(String key, long byteRangeStart) throws IOException;
PartialListing list(String prefix, int maxListingLength) throws IOException;
- PartialListing list(String prefix, int maxListingLength, String priorLastKey)
+ PartialListing list(String prefix, int maxListingLength, String priorLastKey, boolean recursive)
throws IOException;
- PartialListing listAll(String prefix, int maxListingLength,
- String priorLastKey) throws IOException;
void delete(String key) throws IOException;
- void rename(String srcKey, String dstKey) throws IOException;
+ void copy(String srcKey, String dstKey) throws IOException;
/**
* Delete all keys with the given prefix. Used for testing.
diff --git a/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java b/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
index 7ec60655dd..15d0af4c5a 100644
--- a/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
+++ b/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
@@ -30,6 +30,7 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -41,6 +42,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
@@ -61,6 +63,17 @@
* Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
* stores files on S3 in their
* native form so they can be read by other S3 tools.
+ *
+ * A note about directories. S3 of course has no "native" support for them.
+ * The idiom we choose then is: for any directory created by this class,
+ * we use an empty object "#{dirpath}_$folder$" as a marker.
+ * Further, to interoperate with other S3 tools, we also accept the following:
+ * - an object "#{dirpath}/' denoting a directory marker
+ * - if there exists any objects with the prefix "#{dirpath}/", then the
+ * directory is said to exist
+ * - if both a file with the name of a directory and a marker for that
+ * directory exists, then the *file masks the directory*, and the directory
+ * is never returned.
*
* @see org.apache.hadoop.fs.s3.S3FileSystem
*/
@@ -70,7 +83,6 @@ public class NativeS3FileSystem extends FileSystem {
LogFactory.getLog(NativeS3FileSystem.class);
private static final String FOLDER_SUFFIX = "_$folder$";
- private static final long MAX_S3_FILE_SIZE = 5 * 1024 * 1024 * 1024L;
static final String PATH_DELIMITER = Path.SEPARATOR;
private static final int S3_MAX_LISTING_LENGTH = 1000;
@@ -85,6 +97,7 @@ public NativeS3FsInputStream(InputStream in, String key) {
this.key = key;
}
+ @Override
public synchronized int read() throws IOException {
int result = in.read();
if (result != -1) {
@@ -95,6 +108,7 @@ public synchronized int read() throws IOException {
}
return result;
}
+ @Override
public synchronized int read(byte[] b, int off, int len)
throws IOException {
@@ -108,18 +122,23 @@ public synchronized int read(byte[] b, int off, int len)
return result;
}
+ @Override
public void close() throws IOException {
in.close();
}
+ @Override
public synchronized void seek(long pos) throws IOException {
in.close();
+ LOG.info("Opening key '" + key + "' for reading at position '" + pos + "'");
in = store.retrieve(key, pos);
this.pos = pos;
}
+ @Override
public synchronized long getPos() throws IOException {
return pos;
}
+ @Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@@ -140,6 +159,7 @@ public NativeS3FsOutputStream(Configuration conf,
this.conf = conf;
this.key = key;
this.backupFile = newBackupFile();
+ LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
try {
this.digest = MessageDigest.getInstance("MD5");
this.backupStream = new BufferedOutputStream(new DigestOutputStream(
@@ -174,6 +194,7 @@ public synchronized void close() throws IOException {
}
backupStream.close();
+ LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload");
try {
byte[] md5Hash = digest == null ? null : digest.digest();
@@ -185,7 +206,7 @@ public synchronized void close() throws IOException {
super.close();
closed = true;
}
-
+ LOG.info("OutputStream for key '" + key + "' upload complete");
}
@Override
@@ -197,8 +218,6 @@ public void write(int b) throws IOException {
public void write(byte[] b, int off, int len) throws IOException {
backupStream.write(b, off, len);
}
-
-
}
private URI uri;
@@ -242,6 +261,7 @@ private static NativeFileSystemStore createDefaultStore(Configuration conf) {
Map methodNameToPolicyMap =
new HashMap();
methodNameToPolicyMap.put("storeFile", methodPolicy);
+ methodNameToPolicyMap.put("rename", methodPolicy);
return (NativeFileSystemStore)
RetryProxy.create(NativeFileSystemStore.class, store,
@@ -249,10 +269,19 @@ private static NativeFileSystemStore createDefaultStore(Configuration conf) {
}
private static String pathToKey(Path path) {
+ if (path.toUri().getScheme() != null && "".equals(path.toUri().getPath())) {
+ // allow uris without trailing slash after bucket to refer to root,
+ // like s3n://mybucket
+ return "";
+ }
if (!path.isAbsolute()) {
throw new IllegalArgumentException("Path must be absolute: " + path);
}
- return path.toUri().getPath().substring(1); // remove initial slash
+ String ret = path.toUri().getPath().substring(1); // remove initial slash
+ if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
+ ret = ret.substring(0, ret.length() -1);
+ }
+ return ret;
}
private static Path keyToPath(String key) {
@@ -267,6 +296,7 @@ private Path makeAbsolute(Path path) {
}
/** This optional operation is not yet supported. */
+ @Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
throw new IOException("Not supported");
@@ -274,12 +304,21 @@ public FSDataOutputStream append(Path f, int bufferSize,
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
- boolean overwrite, int bufferSize, short replication, long blockSize,
+ EnumSet flag, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
- if (exists(f) && !overwrite) {
- throw new IOException("File already exists:"+f);
+ if(exists(f)) {
+ if(flag.contains(CreateFlag.APPEND)){
+ return append(f, bufferSize, progress);
+ } else if(!flag.contains(CreateFlag.OVERWRITE)) {
+ throw new IOException("File already exists: "+f);
+ }
+ } else {
+ if (flag.contains(CreateFlag.APPEND) && !flag.contains(CreateFlag.CREATE))
+ throw new IOException("File already exists: " + f.toString());
}
+
+ LOG.debug("Creating new file '" + f + "' in S3");
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
@@ -287,27 +326,41 @@ public FSDataOutputStream create(Path f, FsPermission permission,
}
@Override
- public boolean delete(Path f, boolean recursive) throws IOException {
+ public boolean delete(Path f, boolean recurse) throws IOException {
FileStatus status;
try {
status = getFileStatus(f);
} catch (FileNotFoundException e) {
+ LOG.debug("Delete called for '" + f + "' but file does not exist, so returning false");
return false;
}
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
if (status.isDir()) {
- FileStatus[] contents = listStatus(f);
- if (!recursive && contents.length > 0) {
- throw new IOException("Directory " + f.toString() + " is not empty.");
+ if (!recurse && listStatus(f).length > 0) {
+ throw new IOException("Can not delete " + f + " at is a not empty directory and recurse option is false");
}
- for (FileStatus p : contents) {
- if (!delete(p.getPath(), recursive)) {
- return false;
+
+ createParent(f);
+
+ LOG.debug("Deleting directory '" + f + "'");
+ String priorLastKey = null;
+ do {
+ PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
+ for (FileMetadata file : listing.getFiles()) {
+ store.delete(file.getKey());
}
+ priorLastKey = listing.getPriorLastKey();
+ } while (priorLastKey != null);
+
+ try {
+ store.delete(key + FOLDER_SUFFIX);
+ } catch (FileNotFoundException e) {
+ //this is fine, we don't require a marker
}
- store.delete(key + FOLDER_SUFFIX);
} else {
+ LOG.debug("Deleting file '" + f + "'");
+ createParent(f);
store.delete(key);
}
return true;
@@ -315,7 +368,6 @@ public boolean delete(Path f, boolean recursive) throws IOException {
@Override
public FileStatus getFileStatus(Path f) throws IOException {
-
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
@@ -323,23 +375,28 @@ public FileStatus getFileStatus(Path f) throws IOException {
return newDirectory(absolutePath);
}
+ LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
FileMetadata meta = store.retrieveMetadata(key);
if (meta != null) {
+ LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
return newFile(meta, absolutePath);
}
if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
+ LOG.debug("getFileStatus returning 'directory' for key '" + key + "' as '"
+ + key + FOLDER_SUFFIX + "' exists");
return newDirectory(absolutePath);
}
+ LOG.debug("getFileStatus listing key '" + key + "'");
PartialListing listing = store.list(key, 1);
if (listing.getFiles().length > 0 ||
listing.getCommonPrefixes().length > 0) {
+ LOG.debug("getFileStatus returning 'directory' for key '" + key + "' as it has contents");
return newDirectory(absolutePath);
}
- throw new FileNotFoundException(absolutePath +
- ": No such file or directory.");
-
+ LOG.debug("getFileStatus could not find key '" + key + "'");
+ throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
}
@Override
@@ -372,16 +429,20 @@ public FileStatus[] listStatus(Path f) throws IOException {
Set status = new TreeSet();
String priorLastKey = null;
do {
- PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH,
- priorLastKey);
+ PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
for (FileMetadata fileMetadata : listing.getFiles()) {
Path subpath = keyToPath(fileMetadata.getKey());
String relativePath = pathUri.relativize(subpath.toUri()).getPath();
- if (relativePath.endsWith(FOLDER_SUFFIX)) {
- status.add(newDirectory(new Path(absolutePath,
- relativePath.substring(0,
- relativePath.indexOf(FOLDER_SUFFIX)))));
- } else {
+
+ if (fileMetadata.getKey().equals(key + "/")) {
+ // this is just the directory we have been asked to list
+ }
+ else if (relativePath.endsWith(FOLDER_SUFFIX)) {
+ status.add(newDirectory(new Path(
+ absolutePath,
+ relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
+ }
+ else {
status.add(newFile(fileMetadata, subpath));
}
}
@@ -398,17 +459,16 @@ public FileStatus[] listStatus(Path f) throws IOException {
return null;
}
- return status.toArray(new FileStatus[0]);
+ return status.toArray(new FileStatus[status.size()]);
}
private FileStatus newFile(FileMetadata meta, Path path) {
- return new FileStatus(meta.getLength(), false, 1, MAX_S3_FILE_SIZE,
+ return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
meta.getLastModified(), path.makeQualified(this));
}
private FileStatus newDirectory(Path path) {
- return new FileStatus(0, true, 1, MAX_S3_FILE_SIZE, 0,
- path.makeQualified(this));
+ return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this));
}
@Override
@@ -432,10 +492,11 @@ private boolean mkdir(Path f) throws IOException {
FileStatus fileStatus = getFileStatus(f);
if (!fileStatus.isDir()) {
throw new IOException(String.format(
- "Can't make directory for path %s since it is a file.", f));
+ "Can't make directory for path '%s' since it is a file.", f));
}
} catch (FileNotFoundException e) {
+ LOG.debug("Making dir '" + f + "' in S3");
String key = pathToKey(f) + FOLDER_SUFFIX;
store.storeEmptyFile(key);
}
@@ -444,9 +505,11 @@ private boolean mkdir(Path f) throws IOException {
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
- if (!exists(f)) {
- throw new FileNotFoundException(f.toString());
+ FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
+ if (fs.isDir()) {
+ throw new IOException("'" + f + "' is a directory");
}
+ LOG.info("Opening '" + f + "' for reading");
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
return new FSDataInputStream(new BufferedFSInputStream(
@@ -456,47 +519,16 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
// rename() and delete() use this method to ensure that the parent directory
// of the source does not vanish.
private void createParent(Path path) throws IOException {
- Path parent = path.getParent();
- if (parent != null) {
- String key = pathToKey(makeAbsolute(parent));
- if (key.length() > 0) {
- store.storeEmptyFile(key + FOLDER_SUFFIX);
- }
+ Path parent = path.getParent();
+ if (parent != null) {
+ String key = pathToKey(makeAbsolute(parent));
+ if (key.length() > 0) {
+ store.storeEmptyFile(key + FOLDER_SUFFIX);
}
+ }
}
- private boolean existsAndIsFile(Path f) throws IOException {
- Path absolutePath = makeAbsolute(f);
- String key = pathToKey(absolutePath);
-
- if (key.length() == 0) {
- return false;
- }
-
- FileMetadata meta = store.retrieveMetadata(key);
- if (meta != null) {
- // S3 object with given key exists, so this is a file
- return true;
- }
-
- if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
- // Signifies empty directory
- return false;
- }
-
- PartialListing listing = store.list(key, 1, null);
- if (listing.getFiles().length > 0 ||
- listing.getCommonPrefixes().length > 0) {
- // Non-empty directory
- return false;
- }
-
- throw new FileNotFoundException(absolutePath +
- ": No such file or directory");
-}
-
-
@Override
public boolean rename(Path src, Path dst) throws IOException {
@@ -507,60 +539,79 @@ public boolean rename(Path src, Path dst) throws IOException {
return false;
}
+ final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
+
// Figure out the final destination
String dstKey;
try {
- boolean dstIsFile = existsAndIsFile(dst);
+ boolean dstIsFile = !getFileStatus(dst).isDir();
if (dstIsFile) {
- // Attempting to overwrite a file using rename()
+ LOG.debug(debugPreamble + "returning false as dst is an already existing file");
return false;
} else {
- // Move to within the existent directory
+ LOG.debug(debugPreamble + "using dst as output directory");
dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
}
} catch (FileNotFoundException e) {
- // dst doesn't exist, so we can proceed
+ LOG.debug(debugPreamble + "using dst as output destination");
dstKey = pathToKey(makeAbsolute(dst));
try {
if (!getFileStatus(dst.getParent()).isDir()) {
- return false; // parent dst is a file
+ LOG.debug(debugPreamble + "returning false as dst parent exists and is a file");
+ return false;
}
} catch (FileNotFoundException ex) {
- return false; // parent dst does not exist
+ LOG.debug(debugPreamble + "returning false as dst parent does not exist");
+ return false;
}
}
+ boolean srcIsFile;
try {
- boolean srcIsFile = existsAndIsFile(src);
- if (srcIsFile) {
- store.rename(srcKey, dstKey);
- } else {
- // Move the folder object
- store.delete(srcKey + FOLDER_SUFFIX);
- store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
-
- // Move everything inside the folder
- String priorLastKey = null;
- do {
- PartialListing listing = store.listAll(srcKey, S3_MAX_LISTING_LENGTH,
- priorLastKey);
- for (FileMetadata file : listing.getFiles()) {
- store.rename(file.getKey(), dstKey
- + file.getKey().substring(srcKey.length()));
- }
- priorLastKey = listing.getPriorLastKey();
- } while (priorLastKey != null);
- }
-
- createParent(src);
- return true;
-
+ srcIsFile = !getFileStatus(src).isDir();
} catch (FileNotFoundException e) {
- // Source file does not exist;
+ LOG.debug(debugPreamble + "returning false as src does not exist");
return false;
}
- }
+ if (srcIsFile) {
+ LOG.debug(debugPreamble + "src is file, so doing copy then delete in S3");
+ store.copy(srcKey, dstKey);
+ store.delete(srcKey);
+ } else {
+ LOG.debug(debugPreamble + "src is directory, so copying contents");
+ store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
+ List keysToDelete = new ArrayList();
+ String priorLastKey = null;
+ do {
+ PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
+ for (FileMetadata file : listing.getFiles()) {
+ keysToDelete.add(file.getKey());
+ store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
+ }
+ priorLastKey = listing.getPriorLastKey();
+ } while (priorLastKey != null);
+
+ LOG.debug(debugPreamble + "all files in src copied, now removing src files");
+ for (String key: keysToDelete) {
+ store.delete(key);
+ }
+
+ try {
+ store.delete(srcKey + FOLDER_SUFFIX);
+ } catch (FileNotFoundException e) {
+ //this is fine, we don't require a marker
+ }
+ LOG.debug(debugPreamble + "done");
+ }
+
+ return true;
+ }
+
+ @Override
+ public long getDefaultBlockSize() {
+ return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
+ }
/**
* Set the working directory to the given directory.
@@ -574,5 +625,4 @@ public void setWorkingDirectory(Path newDir) {
public Path getWorkingDirectory() {
return workingDir;
}
-
}
diff --git a/src/java/org/apache/hadoop/http/HttpServer.java b/src/java/org/apache/hadoop/http/HttpServer.java
index a739ba69ac..62248e70f1 100644
--- a/src/java/org/apache/hadoop/http/HttpServer.java
+++ b/src/java/org/apache/hadoop/http/HttpServer.java
@@ -238,13 +238,15 @@ public void addServlet(String name, String pathSpec,
}
/**
- * Add an internal servlet in the server.
+ * Add an internal servlet in the server.
+ * Note: This method is to be used for adding servlets that facilitate
+ * internal communication and not for user facing functionality. For
+ * servlets added using this method, filters are not enabled.
+ *
* @param name The name of the servlet (can be passed as null)
* @param pathSpec The path spec for the servlet
* @param clazz The servlet class
- * @deprecated this is a temporary method
*/
- @Deprecated
public void addInternalServlet(String name, String pathSpec,
Class extends HttpServlet> clazz) {
ServletHolder holder = new ServletHolder(clazz);
diff --git a/src/java/org/apache/hadoop/io/DeprecatedUTF8.java b/src/java/org/apache/hadoop/io/DeprecatedUTF8.java
deleted file mode 100644
index b27973c180..0000000000
--- a/src/java/org/apache/hadoop/io/DeprecatedUTF8.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Wrapper for {@link UTF8}.
- * This class should be used only when it is absolutely necessary
- * to use {@link UTF8}. The only difference is that using this class
- * does not require "@SuppressWarning" annotation to avoid javac warning.
- * Instead the deprecation is implied in the class name.
- */
-@SuppressWarnings("deprecation")
-public class DeprecatedUTF8 extends UTF8 {
-
- public DeprecatedUTF8() {
- super();
- }
-
- /** Construct from a given string. */
- public DeprecatedUTF8(String string) {
- super(string);
- }
-
- /** Construct from a given string. */
- public DeprecatedUTF8(DeprecatedUTF8 utf8) {
- super(utf8);
- }
-
- /* The following two are the mostly commonly used methods.
- * wrapping them so that editors do not complain about the deprecation.
- */
-
- public static String readString(DataInput in) throws IOException {
- return UTF8.readString(in);
- }
-
- public static int writeString(DataOutput out, String s) throws IOException {
- return UTF8.writeString(out, s);
- }
-}
diff --git a/src/java/org/apache/hadoop/io/IOUtils.java b/src/java/org/apache/hadoop/io/IOUtils.java
index 44723f4c32..a55eaab3ed 100644
--- a/src/java/org/apache/hadoop/io/IOUtils.java
+++ b/src/java/org/apache/hadoop/io/IOUtils.java
@@ -41,17 +41,8 @@ public class IOUtils {
public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close)
throws IOException {
- PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
- byte buf[] = new byte[buffSize];
try {
- int bytesRead = in.read(buf);
- while (bytesRead >= 0) {
- out.write(buf, 0, bytesRead);
- if ((ps != null) && ps.checkError()) {
- throw new IOException("Unable to write to output stream.");
- }
- bytesRead = in.read(buf);
- }
+ copyBytes(in, out, buffSize);
} finally {
if(close) {
out.close();
@@ -60,6 +51,27 @@ public static void copyBytes(InputStream in, OutputStream out, int buffSize, boo
}
}
+ /**
+ * Copies from one stream to another.
+ *
+ * @param in InputStrem to read from
+ * @param out OutputStream to write to
+ * @param buffSize the size of the buffer
+ */
+ public static void copyBytes(InputStream in, OutputStream out, int buffSize)
+ throws IOException {
+
+ PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
+ byte buf[] = new byte[buffSize];
+ int bytesRead = in.read(buf);
+ while (bytesRead >= 0) {
+ out.write(buf, 0, bytesRead);
+ if ((ps != null) && ps.checkError()) {
+ throw new IOException("Unable to write to output stream.");
+ }
+ bytesRead = in.read(buf);
+ }
+ }
/**
* Copies from one stream to another. closes the input and output streams
* at the end.
diff --git a/src/java/org/apache/hadoop/metrics/ContextFactory.java b/src/java/org/apache/hadoop/metrics/ContextFactory.java
index 67bd9f9500..e2cc2a4ea9 100644
--- a/src/java/org/apache/hadoop/metrics/ContextFactory.java
+++ b/src/java/org/apache/hadoop/metrics/ContextFactory.java
@@ -188,16 +188,19 @@ public static synchronized ContextFactory getFactory() throws IOException {
private void setAttributes() throws IOException {
InputStream is = getClass().getResourceAsStream(PROPERTIES_FILE);
if (is != null) {
- Properties properties = new Properties();
- properties.load(is);
- //for (Object propertyNameObj : properties.keySet()) {
- Iterator it = properties.keySet().iterator();
- while (it.hasNext()) {
- String propertyName = (String) it.next();
- String propertyValue = properties.getProperty(propertyName);
- setAttribute(propertyName, propertyValue);
+ try {
+ Properties properties = new Properties();
+ properties.load(is);
+ //for (Object propertyNameObj : properties.keySet()) {
+ Iterator it = properties.keySet().iterator();
+ while (it.hasNext()) {
+ String propertyName = (String) it.next();
+ String propertyValue = properties.getProperty(propertyName);
+ setAttribute(propertyName, propertyValue);
+ }
+ } finally {
+ is.close();
}
- is.close();
}
}
diff --git a/src/java/org/apache/hadoop/net/NetUtils.java b/src/java/org/apache/hadoop/net/NetUtils.java
index ce07fab858..7cebfd60ff 100644
--- a/src/java/org/apache/hadoop/net/NetUtils.java
+++ b/src/java/org/apache/hadoop/net/NetUtils.java
@@ -132,6 +132,9 @@ public static InetSocketAddress createSocketAddr(String target) {
*/
public static InetSocketAddress createSocketAddr(String target,
int defaultPort) {
+ if (target == null) {
+ throw new IllegalArgumentException("Target address cannot be null.");
+ }
int colonIndex = target.indexOf(':');
if (colonIndex < 0 && defaultPort == -1) {
throw new RuntimeException("Not a host:port pair: " + target);
diff --git a/src/java/org/apache/hadoop/util/ProcessTree.java b/src/java/org/apache/hadoop/util/ProcessTree.java
index 62b5c058ee..9242046932 100644
--- a/src/java/org/apache/hadoop/util/ProcessTree.java
+++ b/src/java/org/apache/hadoop/util/ProcessTree.java
@@ -53,6 +53,93 @@ private static boolean isSetsidSupported() {
}
}
+ /**
+ * Destroy the process-tree.
+ * @param pid process id of the root process of the subtree of processes
+ * to be killed
+ * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+ * after sending SIGTERM
+ * @param isProcessGroup pid is a process group leader or not
+ * @param inBackground Process is to be killed in the back ground with
+ * a separate thread
+ */
+ public static void destroy(String pid, long sleeptimeBeforeSigkill,
+ boolean isProcessGroup, boolean inBackground) {
+ if(isProcessGroup) {
+ destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground);
+ }
+ else {
+ //TODO: Destroy all the processes in the subtree in this case also.
+ // For the time being, killing only the root process.
+ destroyProcess(pid, sleeptimeBeforeSigkill, inBackground);
+ }
+ }
+
+ /** Destroy the process.
+ * @param pid Process id of to-be-killed-process
+ * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+ * after sending SIGTERM
+ * @param inBackground Process is to be killed in the back ground with
+ * a separate thread
+ */
+ protected static void destroyProcess(String pid, long sleeptimeBeforeSigkill,
+ boolean inBackground) {
+ terminateProcess(pid);
+ sigKill(pid, false, sleeptimeBeforeSigkill, inBackground);
+ }
+
+ /** Destroy the process group.
+ * @param pgrpId Process group id of to-be-killed-processes
+ * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+ * after sending SIGTERM
+ * @param inBackground Process group is to be killed in the back ground with
+ * a separate thread
+ */
+ protected static void destroyProcessGroup(String pgrpId,
+ long sleeptimeBeforeSigkill, boolean inBackground) {
+ terminateProcessGroup(pgrpId);
+ sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
+ }
+
+ /**
+ * Sends terminate signal to the process, allowing it to gracefully exit.
+ *
+ * @param pid pid of the process to be sent SIGTERM
+ */
+ public static void terminateProcess(String pid) {
+ ShellCommandExecutor shexec = null;
+ try {
+ String[] args = { "kill", pid };
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (IOException ioe) {
+ LOG.warn("Error executing shell command " + ioe);
+ } finally {
+ LOG.info("Killing process " + pid +
+ " with SIGTERM. Exit code " + shexec.getExitCode());
+ }
+ }
+
+ /**
+ * Sends terminate signal to all the process belonging to the passed process
+ * group, allowing the group to gracefully exit.
+ *
+ * @param pgrpId process group id
+ */
+ public static void terminateProcessGroup(String pgrpId) {
+ ShellCommandExecutor shexec = null;
+ try {
+ String[] args = { "kill", "--", "-" + pgrpId };
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (IOException ioe) {
+ LOG.warn("Error executing shell command " + ioe);
+ } finally {
+ LOG.info("Killing all processes in the process group " + pgrpId +
+ " with SIGTERM. Exit code " + shexec.getExitCode());
+ }
+ }
+
/**
* Kills the process(OR process group) by sending the signal SIGKILL
* in the current thread
@@ -72,35 +159,14 @@ private static void sigKillInCurrentThread(String pid, boolean isProcessGroup,
} catch (InterruptedException i) {
LOG.warn("Thread sleep is interrupted.");
}
-
- ShellCommandExecutor shexec = null;
-
- try {
- String pid_pgrpid;
- if(isProcessGroup) {//kill the whole process group
- pid_pgrpid = "-" + pid;
- }
- else {//kill single process
- pid_pgrpid = pid;
- }
-
- String[] args = { "kill", "-9", pid_pgrpid };
- shexec = new ShellCommandExecutor(args);
- shexec.execute();
- } catch (IOException ioe) {
- LOG.warn("Error executing shell command " + ioe);
- } finally {
- if(isProcessGroup) {
- LOG.info("Killing process group" + pid + " with SIGKILL. Exit code "
- + shexec.getExitCode());
- }
- else {
- LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
- + shexec.getExitCode());
- }
+ if(isProcessGroup) {
+ killProcessGroup(pid);
+ } else {
+ killProcess(pid);
}
- }
+ }
}
+
/** Kills the process(OR process group) by sending the signal SIGKILL
* @param pid Process id(OR process group id) of to-be-deleted-process
@@ -124,81 +190,63 @@ private static void sigKill(String pid, boolean isProcessGroup,
}
}
- /** Destroy the process.
- * @param pid Process id of to-be-killed-process
- * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
- * after sending SIGTERM
- * @param inBackground Process is to be killed in the back ground with
- * a separate thread
+ /**
+ * Sends kill signal to process, forcefully terminating the process.
+ *
+ * @param pid process id
*/
- protected static void destroyProcess(String pid, long sleeptimeBeforeSigkill,
- boolean inBackground) {
- ShellCommandExecutor shexec = null;
- try {
- String[] args = { "kill", pid };
- shexec = new ShellCommandExecutor(args);
- shexec.execute();
- } catch (IOException ioe) {
- LOG.warn("Error executing shell command " + ioe);
- } finally {
- LOG.info("Killing process " + pid +
- " with SIGTERM. Exit code " + shexec.getExitCode());
+ public static void killProcess(String pid) {
+
+ //If process tree is not alive then return immediately.
+ if(!ProcessTree.isAlive(pid)) {
+ return;
}
-
- sigKill(pid, false, sleeptimeBeforeSigkill, inBackground);
- }
-
- /** Destroy the process group.
- * @param pgrpId Process group id of to-be-killed-processes
- * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
- * after sending SIGTERM
- * @param inBackground Process group is to be killed in the back ground with
- * a separate thread
- */
- protected static void destroyProcessGroup(String pgrpId,
- long sleeptimeBeforeSigkill, boolean inBackground) {
- ShellCommandExecutor shexec = null;
+ String[] args = { "kill", "-9", pid };
+ ShellCommandExecutor shexec = new ShellCommandExecutor(args);
try {
- String[] args = { "kill", "--", "-" + pgrpId };
- shexec = new ShellCommandExecutor(args);
shexec.execute();
- } catch (IOException ioe) {
- LOG.warn("Error executing shell command " + ioe);
+ } catch (IOException e) {
+ LOG.warn("Error sending SIGKILL to process "+ pid + " ."+
+ StringUtils.stringifyException(e));
} finally {
- LOG.info("Killing all processes in the process group " + pgrpId +
- " with SIGTERM. Exit code " + shexec.getExitCode());
+ LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
+ + shexec.getExitCode());
}
-
- sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
}
/**
- * Destroy the process-tree.
- * @param pid process id of the root process of the subtree of processes
- * to be killed
- * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
- * after sending SIGTERM
- * @param isProcessGroup pid is a process group leader or not
- * @param inBackground Process is to be killed in the back ground with
- * a separate thread
+ * Sends kill signal to all process belonging to same process group,
+ * forcefully terminating the process group.
+ *
+ * @param pgrpId process group id
*/
- public static void destroy(String pid, long sleeptimeBeforeSigkill,
- boolean isProcessGroup, boolean inBackground) {
- if(isProcessGroup) {
- destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground);
+ public static void killProcessGroup(String pgrpId) {
+
+ //If process tree is not alive then return immediately.
+ if(!ProcessTree.isProcessGroupAlive(pgrpId)) {
+ return;
}
- else {
- //TODO: Destroy all the processes in the subtree in this case also.
- // For the time being, killing only the root process.
- destroyProcess(pid, sleeptimeBeforeSigkill, inBackground);
+
+ String[] args = { "kill", "-9", "-"+pgrpId };
+ ShellCommandExecutor shexec = new ShellCommandExecutor(args);
+ try {
+ shexec.execute();
+ } catch (IOException e) {
+ LOG.warn("Error sending SIGKILL to process group "+ pgrpId + " ."+
+ StringUtils.stringifyException(e));
+ } finally {
+ LOG.info("Killing process group" + pgrpId + " with SIGKILL. Exit code "
+ + shexec.getExitCode());
}
}
-
-
+
/**
* Is the process with PID pid still alive?
* This method assumes that isAlive is called on a pid that was alive not
* too long ago, and hence assumes no chance of pid-wrapping-around.
+ *
+ * @param pid pid of the process to check.
+ * @return true if process is alive.
*/
public static boolean isAlive(String pid) {
ShellCommandExecutor shexec = null;
@@ -215,6 +263,32 @@ public static boolean isAlive(String pid) {
}
return (shexec.getExitCode() == 0 ? true : false);
}
+
+ /**
+ * Is the process group with still alive?
+ *
+ * This method assumes that isAlive is called on a pid that was alive not
+ * too long ago, and hence assumes no chance of pid-wrapping-around.
+ *
+ * @param pgrpId process group id
+ * @return true if any of process in group is alive.
+ */
+ public static boolean isProcessGroupAlive(String pgrpId) {
+ ShellCommandExecutor shexec = null;
+ try {
+ String[] args = { "kill", "-0", "-"+pgrpId };
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (ExitCodeException ee) {
+ return false;
+ } catch (IOException ioe) {
+ LOG.warn("Error executing shell command "
+ + Arrays.toString(shexec.getExecString()) + ioe);
+ return false;
+ }
+ return (shexec.getExitCode() == 0 ? true : false);
+ }
+
/**
* Helper thread class that kills process-tree with SIGKILL in background
diff --git a/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java b/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java
index 52dd36ecc1..901bcc683c 100644
--- a/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java
+++ b/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java
@@ -47,6 +47,10 @@ public class ProcfsBasedProcessTree extends ProcessTree {
private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern
.compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)(\\s[0-9-]+){16}");
+ // to enable testing, using this variable which can be configured
+ // to a test directory.
+ private String procfsDir;
+
private Integer pid = -1;
private boolean setsidUsed = false;
private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
@@ -59,11 +63,29 @@ public ProcfsBasedProcessTree(String pid) {
public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
long sigkillInterval) {
+ this(pid, setsidUsed, sigkillInterval, PROCFS);
+ }
+
+ /**
+ * Build a new process tree rooted at the pid.
+ *
+ * This method is provided mainly for testing purposes, where
+ * the root of the proc file system can be adjusted.
+ *
+ * @param pid root of the process tree
+ * @param setsidUsed true, if setsid was used for the root pid
+ * @param sigkillInterval how long to wait between a SIGTERM and SIGKILL
+ * when killing a process tree
+ * @param procfsDir the root of a proc file system - only used for testing.
+ */
+ public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
+ long sigkillInterval, String procfsDir) {
this.pid = getValidPID(pid);
this.setsidUsed = setsidUsed;
sleeptimeBeforeSigkill = sigkillInterval;
+ this.procfsDir = procfsDir;
}
-
+
/**
* Sets SIGKILL interval
* @deprecated Use {@link ProcfsBasedProcessTree#ProcfsBasedProcessTree(
@@ -108,13 +130,17 @@ public ProcfsBasedProcessTree getProcessTree() {
List processList = getProcessList();
Map allProcessInfo = new HashMap();
+
+ // cache the processTree to get the age for processes
+ Map oldProcs =
+ new HashMap(processTree);
processTree.clear();
ProcessInfo me = null;
for (Integer proc : processList) {
// Get information for each process
ProcessInfo pInfo = new ProcessInfo(proc);
- if (constructProcessInfo(pInfo) != null) {
+ if (constructProcessInfo(pInfo, procfsDir) != null) {
allProcessInfo.put(proc, pInfo);
if (proc.equals(this.pid)) {
me = pInfo; // cache 'me'
@@ -150,6 +176,16 @@ public ProcfsBasedProcessTree getProcessTree() {
pInfoQueue.addAll(pInfo.getChildren());
}
+ // update age values.
+ for (Map.Entry procs : processTree.entrySet()) {
+ ProcessInfo oldInfo = oldProcs.get(procs.getKey());
+ if (oldInfo != null) {
+ if (procs.getValue() != null) {
+ procs.getValue().updateAge(oldInfo);
+ }
+ }
+ }
+
if (LOG.isDebugEnabled()) {
// Log.debug the ProcfsBasedProcessTree
LOG.debug(this.toString());
@@ -269,15 +305,29 @@ public void destroy(boolean inBackground) {
* @return cumulative virtual memory used by the process-tree in bytes.
*/
public long getCumulativeVmem() {
+ // include all processes.. all processes will be older than 0.
+ return getCumulativeVmem(0);
+ }
+
+ /**
+ * Get the cumulative virtual memory used by all the processes in the
+ * process-tree that are older than the passed in age.
+ *
+ * @param olderThanAge processes above this age are included in the
+ * memory addition
+ * @return cumulative virtual memory used by the process-tree in bytes,
+ * for processes older than this age.
+ */
+ public long getCumulativeVmem(int olderThanAge) {
long total = 0;
for (ProcessInfo p : processTree.values()) {
- if (p != null) {
+ if ((p != null) && (p.getAge() > olderThanAge)) {
total += p.getVmem();
}
}
return total;
}
-
+
private static Integer getValidPID(String pid) {
Integer retPid = -1;
try {
@@ -295,13 +345,13 @@ private static Integer getValidPID(String pid) {
* Get the list of all processes in the system.
*/
private List getProcessList() {
- String[] processDirs = (new File(PROCFS)).list();
+ String[] processDirs = (new File(procfsDir)).list();
List processList = new ArrayList();
for (String dir : processDirs) {
try {
int pd = Integer.parseInt(dir);
- if ((new File(PROCFS + dir)).isDirectory()) {
+ if ((new File(procfsDir, dir)).isDirectory()) {
processList.add(Integer.valueOf(pd));
}
} catch (NumberFormatException n) {
@@ -319,12 +369,29 @@ private List getProcessList() {
* same. Returns null on failing to read from procfs,
*/
private static ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
+ return constructProcessInfo(pinfo, PROCFS);
+ }
+
+ /**
+ * Construct the ProcessInfo using the process' PID and procfs rooted at the
+ * specified directory and return the same. It is provided mainly to assist
+ * testing purposes.
+ *
+ * Returns null on failing to read from procfs,
+ *
+ * @param pinfo ProcessInfo that needs to be updated
+ * @param procfsDir root of the proc file system
+ * @return updated ProcessInfo, null on errors.
+ */
+ private static ProcessInfo constructProcessInfo(ProcessInfo pinfo,
+ String procfsDir) {
ProcessInfo ret = null;
- // Read "/proc//stat" file
+ // Read "procfsDir//stat" file - typically /proc//stat
BufferedReader in = null;
FileReader fReader = null;
try {
- fReader = new FileReader(PROCFS + pinfo.getPid() + "/stat");
+ File pidDir = new File(procfsDir, String.valueOf(pinfo.getPid()));
+ fReader = new FileReader(new File(pidDir, "/stat"));
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// The process vanished in the interim!
@@ -338,7 +405,7 @@ private static ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
boolean mat = m.find();
if (mat) {
// Set ( name ) ( ppid ) ( pgrpId ) (session ) (vsize )
- pinfo.update(m.group(2), Integer.parseInt(m.group(3)), Integer
+ pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)), Integer
.parseInt(m.group(4)), Integer.parseInt(m.group(5)), Long
.parseLong(m.group(7)));
}
@@ -365,7 +432,6 @@ private static ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
return ret;
}
-
/**
* Returns a string printing PIDs of process present in the
* ProcfsBasedProcessTree. Output format : [pid pid ..]
@@ -391,10 +457,14 @@ private static class ProcessInfo {
private Integer ppid; // parent process-id
private Integer sessionId; // session-id
private Long vmem; // virtual memory usage
+ // how many times has this process been seen alive
+ private int age;
private List children = new ArrayList(); // list of children
public ProcessInfo(int pid) {
this.pid = Integer.valueOf(pid);
+ // seeing this the first time.
+ this.age = 1;
}
public Integer getPid() {
@@ -421,6 +491,10 @@ public Long getVmem() {
return vmem;
}
+ public int getAge() {
+ return age;
+ }
+
public boolean isParent(ProcessInfo p) {
if (pid.equals(p.getPpid())) {
return true;
@@ -428,7 +502,7 @@ public boolean isParent(ProcessInfo p) {
return false;
}
- public void update(String name, Integer ppid, Integer pgrpId,
+ public void updateProcessInfo(String name, Integer ppid, Integer pgrpId,
Integer sessionId, Long vmem) {
this.name = name;
this.ppid = ppid;
@@ -437,6 +511,10 @@ public void update(String name, Integer ppid, Integer pgrpId,
this.vmem = vmem;
}
+ public void updateAge(ProcessInfo oldInfo) {
+ this.age = oldInfo.age + 1;
+ }
+
public boolean addChild(ProcessInfo p) {
return children.add(p);
}
diff --git a/src/java/org/apache/hadoop/util/Progress.java b/src/java/org/apache/hadoop/util/Progress.java
index 81be35c8e0..e6a2e0c5e3 100644
--- a/src/java/org/apache/hadoop/util/Progress.java
+++ b/src/java/org/apache/hadoop/util/Progress.java
@@ -20,19 +20,32 @@
import java.util.ArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
/** Utility to assist with generation of progress reports. Applications build
* a hierarchy of {@link Progress} instances, each modelling a phase of
* execution. The root is constructed with {@link #Progress()}. Nodes for
* sub-phases are created by calling {@link #addPhase()}.
*/
public class Progress {
+ private static final Log LOG = LogFactory.getLog(Progress.class);
private String status = "";
private float progress;
private int currentPhase;
private ArrayList