HADOOP-6870. Add a new API getFiles to FileSystem and FileContext. Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@980271 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hairong Kuang 2010-07-29 00:14:36 +00:00
parent c539588672
commit 19eea554e3
8 changed files with 545 additions and 1 deletions

View File

@ -21,6 +21,11 @@ Trunk (unreleased changes)
HADOOP-6859 - Introduce additional statistics to FileSystem to track
file system operations (suresh)
HADOOP-6870. Add a new API getFiles to FileSystem and FileContext that
lists all files under the input path or the subtree rooted at the
input path if recursive is true. Block locations are returned together
with each file's status. (hairong)
IMPROVEMENTS
HADOOP-6644. util.Shell getGROUPS_FOR_USER_COMMAND method name

View File

@ -784,6 +784,48 @@ public void remove() {
};
}
/**
* The specification of this method matches that of
* {@link FileContext#listLocatedStatus(Path)} except that Path f must be for this
* file system.
*/
protected Iterator<LocatedFileStatus> listLocatedStatus(final Path f)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
return new Iterator<LocatedFileStatus>() {
private Iterator<FileStatus> itor = listStatusIterator(f);
@Override
public boolean hasNext() {
return itor.hasNext();
}
@Override
public LocatedFileStatus next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
FileStatus result = itor.next();
try {
BlockLocation[] locs = null;
if (result.isFile()) {
locs = getFileBlockLocations(
result.getPath(), 0, result.getLen());
}
return new LocatedFileStatus(result, locs);
} catch (IOException ioe) {
throw (RuntimeException)new RuntimeException().initCause(ioe);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove is not supported");
}
};
}
/**
* The specification of this method matches that of
* {@link FileContext.Util#listStatus(Path)} except that Path f must be

View File

@ -27,12 +27,14 @@
import java.util.EnumSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.Stack;
import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -1285,6 +1287,128 @@ public Iterator<FileStatus> next(final AbstractFileSystem fs, final Path p)
}.resolve(this, absF);
}
/**
* List the statuses and block locations of the files in the given path
* if the path is a directory.
* If the given path is a file, return the file's status and block locations.
* if recursive is true, list all file statuses and block locations in
* the subtree rooted at the given path.
* Files across symbolic links are also returned.
*
* @param f is the path
* @param recursive if the subdirectories need to be traversed recursively
*
* @return an iterator that traverses statuses of the files
*
* @throws AccessControlException If access is denied
* @throws FileNotFoundException If <code>f</code> does not exist
* @throws UnsupportedFileSystemException If file system for <code>f</code> is
* not supported
* @throws IOException If an I/O error occurred
*
* Exceptions applicable to file systems accessed over RPC:
* @throws RpcClientException If an exception occurred in the RPC client
* @throws RpcServerException If an exception occurred in the RPC server
* @throws UnexpectedServerException If server implementation throws
* undeclared exception to RPC server
*/
public Iterator<LocatedFileStatus> listFiles(
final Path f, final boolean recursive) throws AccessControlException,
FileNotFoundException, UnsupportedFileSystemException,
IOException {
return new Iterator<LocatedFileStatus>() {
private Stack<Path> dirs = new Stack<Path>();
private Stack<Path> symLinks = new Stack<Path>();
Iterator<LocatedFileStatus> itor = listLocatedStatus(f);
LocatedFileStatus curFile;
@Override
public boolean hasNext() {
try {
while (curFile == null) {
if (itor.hasNext()) {
handleFileStat(itor.next());
} else if (!dirs.isEmpty()) {
Path dirPath = dirs.pop();
itor = listLocatedStatus(dirPath);
} else if (!symLinks.isEmpty()) {
Path symLink = symLinks.pop();
FileStatus stat = getFileStatus(symLink);
if (stat.isFile() || (recursive && stat.isDirectory())) {
itor = listLocatedStatus(stat.getPath());
}
} else {
return false;
}
}
return true;
} catch (IOException ioe) {
throw (RuntimeException)new RuntimeException().initCause(ioe);
}
}
private void handleFileStat(LocatedFileStatus stat) throws IOException {
if (stat.isFile()) { // file
curFile = stat;
} else if (stat.isSymlink()) { // symbolic link
symLinks.push(stat.getSymlink());
} else if (recursive) { // directory
dirs.push(stat.getPath());
}
}
@Override
public LocatedFileStatus next() {
if (hasNext()) {
LocatedFileStatus result = curFile;
curFile = null;
return result;
}
throw new java.util.NoSuchElementException("No more entry in " + f);
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove is not supported");
}
};
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory. Each returned status contains a file's block locations.
*
* @param f is the path
*
* @return an iterator that traverses statuses of the files/directories
* in the given path
*
* @throws AccessControlException If access is denied
* @throws FileNotFoundException If <code>f</code> does not exist
* @throws UnsupportedFileSystemException If file system for <code>f</code> is
* not supported
* @throws IOException If an I/O error occurred
*
* Exceptions applicable to file systems accessed over RPC:
* @throws RpcClientException If an exception occurred in the RPC client
* @throws RpcServerException If an exception occurred in the RPC server
* @throws UnexpectedServerException If server implementation throws
* undeclared exception to RPC server
*/
public Iterator<LocatedFileStatus> listLocatedStatus(final Path f) throws
AccessControlException, FileNotFoundException,
UnsupportedFileSystemException, IOException {
final Path absF = fixRelativePart(f);
return new FSLinkResolver<Iterator<LocatedFileStatus>>() {
public Iterator<LocatedFileStatus> next(
final AbstractFileSystem fs, final Path p)
throws IOException, UnresolvedLinkException {
return fs.listLocatedStatus(p);
}
}.resolve(this, absF);
}
/**
* Mark a path to be deleted on JVM shutdown.
*

View File

@ -29,9 +29,12 @@
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.Stack;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -1278,6 +1281,91 @@ private Path[] globPathsLevel(Path[] parents, String[] filePattern,
return globPathsLevel(parents, filePattern, level + 1, hasGlob);
}
/**
* List the statuses and block locations of the files in the given path
* if the path is a directory.
* If the given path is a file, return the file's status and block locations.
* if recursive is true, list all file statuses and block locations in
* the subtree rooted at the given path.
*
* @param f is the path
* @param recursive if the subdirectories need to be traversed recursively
*
* @return an iterator that traverses statuses of the files
* @throws FileNotFoundException when the path does not exist;
* IOException see specific implementation
*/
public Iterator<LocatedFileStatus> listFiles(
final Path f, final boolean recursive)
throws FileNotFoundException, IOException {
return new Iterator<LocatedFileStatus>() {
private LinkedList<FileStatus> fileStats = new LinkedList<FileStatus>();
private Stack<FileStatus> dirStats = new Stack<FileStatus>();
{ // initializer
list(f);
}
@Override
public boolean hasNext() {
if (fileStats.isEmpty()) {
listDir();
}
return !fileStats.isEmpty();
}
/**
* list at least one directory until file list is not empty
*/
private void listDir() {
while (fileStats.isEmpty() && !dirStats.isEmpty()) {
FileStatus dir = dirStats.pop();
list(dir.getPath());
}
}
/**
* List the given path
*
* @param dirPath a path
*/
private void list(Path dirPath) {
try {
FileStatus[] stats = listStatus(dirPath);
for (FileStatus stat : stats) {
if (stat.isFile()) {
fileStats.add(stat);
} else if (recursive) { // directory & recursive
dirStats.push(stat);
}
}
} catch (IOException ioe) {
throw (RuntimeException) new RuntimeException().initCause(ioe);
}
}
@Override
public LocatedFileStatus next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
FileStatus status = fileStats.remove();
try {
BlockLocation[] locs = getFileBlockLocations(
status, 0, status.getLen());
return new LocatedFileStatus(status, locs);
} catch (IOException ioe) {
throw (RuntimeException) new RuntimeException().initCause(ioe);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove is not supported");
}
};
}
/** Return the current user's home directory in this filesystem.
* The default implementation returns "/user/$USER/".
*/

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.permission.FsPermission;
/**
* This class defines a FileStatus that includes a file's block locations.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class LocatedFileStatus extends FileStatus {
private BlockLocation[] locations;
/**
* Constructor
* @param stat a file status
* @param locations a file's block locations
*/
public LocatedFileStatus(FileStatus stat, BlockLocation[] locations)
throws IOException {
this(stat.getLen(), stat.isDirectory(), stat.getReplication(),
stat.getBlockSize(), stat.getModificationTime(),
stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
stat.getGroup(), null, stat.getPath(), locations);
if (isSymlink()) {
setSymlink(stat.getSymlink());
}
}
/**
* Constructor
*
* @param length a file's length
* @param isdir if the path is a directory
* @param block_replication the file's replication factor
* @param blocksize a file's block size
* @param modification_time a file's modification time
* @param access_time a file's access time
* @param permission a file's permission
* @param owner a file's owner
* @param group a file's group
* @param symlink symlink if the path is a symbolic link
* @param path the path's qualified name
* @param locations a file's block locations
*/
public LocatedFileStatus(long length, boolean isdir,
int block_replication,
long blocksize, long modification_time, long access_time,
FsPermission permission, String owner, String group,
Path symlink,
Path path,
BlockLocation[] locations) {
super(length, isdir, block_replication, blocksize, modification_time,
access_time, permission, owner, group, symlink, path);
this.locations = locations;
}
/**
* Get the file's block locations
* @return the file's block locations
*/
public BlockLocation[] getBlockLocations() {
return locations;
}
/**
* Compare this object to another object
*
* @param o the object to be compared.
* @return a negative integer, zero, or a positive integer as this object
* is less than, equal to, or greater than the specified object.
*
* @throws ClassCastException if the specified object's is not of
* type FileStatus
*/
public int compareTo(Object o) {
return super.compareTo(o);
}
/** Compare if this object is equal to another object
* @param o the object to be compared.
* @return true if two file status has the same path name; false if not.
*/
public boolean equals(Object o) {
return super.equals(o);
}
/**
* Returns a hash code value for the object, which is defined as
* the hash code of the path name.
*
* @return a hash code value for the path name.
*/
public int hashCode() {
return super.hashCode();
}
}

View File

@ -21,6 +21,7 @@
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.EnumSet;
import java.util.Iterator;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
@ -107,6 +108,10 @@ public void processDeleteOnExit() { }
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) {
return null;
}
public Iterator<LocatedFileStatus> listFiles(
final Path path, final boolean isRecursive) {
return null;
}
public void copyFromLocalFile(Path src, Path dst) { }
public void moveFromLocalFile(Path[] srcs, Path dst) { }
public void moveFromLocalFile(Path src, Path dst) { }

View File

@ -35,6 +35,9 @@ public void checkScheme(URI uri, String supportedScheme) { }
public Iterator<FileStatus> listStatusIterator(Path f) {
return null;
}
public Iterator<LocatedFileStatus> listLocatedStatus(final Path f) {
return null;
}
}
public void testFilterFileSystem() throws Exception {

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Level;
import static org.junit.Assert.*;
import org.junit.Test;
import org.junit.BeforeClass;
/**
* This class tests the FileStatus API.
*/
public class TestListFiles {
{
((Log4JLogger)FileSystem.LOG).getLogger().setLevel(Level.ALL);
}
static final long seed = 0xDEADBEEFL;
final protected static Configuration conf = new Configuration();
protected static FileSystem fs;
final protected static Path TEST_DIR = getTestDir();
final private static int FILE_LEN = 10;
final private static Path FILE1 = new Path(TEST_DIR, "file1");
final private static Path DIR1 = new Path(TEST_DIR, "dir1");
final private static Path FILE2 = new Path(DIR1, "file2");
final private static Path FILE3 = new Path(DIR1, "file3");
protected static Path getTestDir() {
return new Path(
System.getProperty("test.build.data","build/test/data/work-dir/localfs"),
"main_");
}
@BeforeClass
public static void testSetUp() throws Exception {
fs = FileSystem.getLocal(conf);
fs.delete(TEST_DIR, true);
}
private static void writeFile(FileSystem fileSys, Path name, int fileSize)
throws IOException {
// Create and write a file that contains three blocks of data
FSDataOutputStream stm = fileSys.create(name);
byte[] buffer = new byte[fileSize];
Random rand = new Random(seed);
rand.nextBytes(buffer);
stm.write(buffer);
stm.close();
}
/** Test when input path is a file */
@Test
public void testFile() throws IOException {
fs.mkdirs(TEST_DIR);
writeFile(fs, FILE1, FILE_LEN);
Iterator<LocatedFileStatus> itor = fs.listFiles(
FILE1, true);
LocatedFileStatus stat = itor.next();
assertFalse(itor.hasNext());
assertTrue(stat.isFile());
assertEquals(FILE_LEN, stat.getLen());
assertEquals(fs.makeQualified(FILE1), stat.getPath());
assertEquals(1, stat.getBlockLocations().length);
itor = fs.listFiles(FILE1, false);
stat = itor.next();
assertFalse(itor.hasNext());
assertTrue(stat.isFile());
assertEquals(FILE_LEN, stat.getLen());
assertEquals(fs.makeQualified(FILE1), stat.getPath());
assertEquals(1, stat.getBlockLocations().length);
fs.delete(FILE1, true);
}
/** Test when input path is a directory */
@Test
public void testDirectory() throws IOException {
fs.mkdirs(DIR1);
Iterator<LocatedFileStatus> itor = fs.listFiles(
DIR1, true);
assertFalse(itor.hasNext());
itor = fs.listFiles(DIR1, false);
assertFalse(itor.hasNext());
writeFile(fs, FILE2, FILE_LEN);
// test empty directory
itor = fs.listFiles(DIR1, true);
LocatedFileStatus stat = itor.next();
assertFalse(itor.hasNext());
assertTrue(stat.isFile());
assertEquals(FILE_LEN, stat.getLen());
assertEquals(fs.makeQualified(FILE2), stat.getPath());
assertEquals(1, stat.getBlockLocations().length);
// testing directory with 1 file
itor = fs.listFiles(DIR1, false);
stat = itor.next();
assertFalse(itor.hasNext());
assertTrue(stat.isFile());
assertEquals(FILE_LEN, stat.getLen());
assertEquals(fs.makeQualified(FILE2), stat.getPath());
assertEquals(1, stat.getBlockLocations().length);
// test more complicated directory
writeFile(fs, FILE1, FILE_LEN);
writeFile(fs, FILE3, FILE_LEN);
itor = fs.listFiles(TEST_DIR, true);
stat = itor.next();
assertTrue(stat.isFile());
assertEquals(fs.makeQualified(FILE1), stat.getPath());
stat = itor.next();
assertTrue(stat.isFile());
assertEquals(fs.makeQualified(FILE2), stat.getPath());
stat = itor.next();
assertTrue(stat.isFile());
assertEquals(fs.makeQualified(FILE3), stat.getPath());
assertFalse(itor.hasNext());
itor = fs.listFiles(TEST_DIR, false);
stat = itor.next();
assertTrue(stat.isFile());
assertEquals(fs.makeQualified(FILE1), stat.getPath());
assertFalse(itor.hasNext());
fs.delete(TEST_DIR, true);
}
}