HADOOP-10003. HarFileSystem.listLocatedStatus() fails. Contributed by Jason Dere and suresh.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1528256 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-10-01 22:57:59 +00:00
parent 82f4348f27
commit 4e9c652c52
9 changed files with 115 additions and 74 deletions

View File

@ -420,6 +420,9 @@ Release 2.1.2 - UNRELEASED
HADOOP-9761. ViewFileSystem#rename fails when using DistributedFileSystem. HADOOP-9761. ViewFileSystem#rename fails when using DistributedFileSystem.
(Andrew Wang via Colin Patrick McCabe) (Andrew Wang via Colin Patrick McCabe)
HADOOP-10003. HarFileSystem.listLocatedStatus() fails.
(Jason Dere and suresh via suresh)
Release 2.1.1-beta - 2013-09-23 Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -464,6 +464,10 @@
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc.c</exclude> <exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc.c</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc_encoder.h</exclude> <exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc_encoder.h</exclude>
<exclude>src/test/java/org/apache/hadoop/fs/test-untar.tgz</exclude> <exclude>src/test/java/org/apache/hadoop/fs/test-untar.tgz</exclude>
<exclude>src/test/resources/test.har/_SUCCESS</exclude>
<exclude>src/test/resources/test.har/_index</exclude>
<exclude>src/test/resources/test.har/_masterindex</exclude>
<exclude>src/test/resources/test.har/part-0</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -17,20 +17,6 @@
*/ */
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.HashMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -40,6 +26,14 @@
import org.apache.hadoop.util.LineReader; import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.util.*;
/** /**
* This is an implementation of the Hadoop Archive * This is an implementation of the Hadoop Archive
* Filesystem. This archive Filesystem has index files * Filesystem. This archive Filesystem has index files
@ -53,7 +47,7 @@
* index for ranges of hashcodes. * index for ranges of hashcodes.
*/ */
public class HarFileSystem extends FilterFileSystem { public class HarFileSystem extends FileSystem {
private static final Log LOG = LogFactory.getLog(HarFileSystem.class); private static final Log LOG = LogFactory.getLog(HarFileSystem.class);
@ -75,11 +69,13 @@ public class HarFileSystem extends FilterFileSystem {
// pointer into the static metadata cache // pointer into the static metadata cache
private HarMetaData metadata; private HarMetaData metadata;
private FileSystem fs;
/** /**
* public construction of harfilesystem * public construction of harfilesystem
*
*/ */
public HarFileSystem() { public HarFileSystem() {
// Must call #initialize() method to set the underlying file system
} }
/** /**
@ -96,10 +92,11 @@ public String getScheme() {
/** /**
* Constructor to create a HarFileSystem with an * Constructor to create a HarFileSystem with an
* underlying filesystem. * underlying filesystem.
* @param fs * @param fs underlying file system
*/ */
public HarFileSystem(FileSystem fs) { public HarFileSystem(FileSystem fs) {
super(fs); this.fs = fs;
this.statistics = fs.statistics;
} }
private synchronized void initializeMetadataCache(Configuration conf) { private synchronized void initializeMetadataCache(Configuration conf) {
@ -171,6 +168,11 @@ public void initialize(URI name, Configuration conf) throws IOException {
} }
} }
@Override
public Configuration getConf() {
return fs.getConf();
}
// get the version of the filesystem from the masterindex file // get the version of the filesystem from the masterindex file
// the version is currently not useful since its the first version // the version is currently not useful since its the first version
// of archives // of archives
@ -236,8 +238,7 @@ private URI decodeHarURI(URI rawURI, Configuration conf) throws IOException {
throw new IOException("query component in Path not supported " + rawURI); throw new IOException("query component in Path not supported " + rawURI);
} }
URI tmp = null; URI tmp;
try { try {
// convert <scheme>-<host> to <scheme>://<host> // convert <scheme>-<host> to <scheme>://<host>
URI baseUri = new URI(authority.replaceFirst("-", "://")); URI baseUri = new URI(authority.replaceFirst("-", "://"));
@ -256,7 +257,7 @@ private static String decodeString(String str)
return URLDecoder.decode(str, "UTF-8"); return URLDecoder.decode(str, "UTF-8");
} }
private String decodeFileName(String fname) private String decodeFileName(String fname)
throws UnsupportedEncodingException { throws UnsupportedEncodingException {
int version = metadata.getVersion(); int version = metadata.getVersion();
if (version == 2 || version == 3){ if (version == 2 || version == 3){
@ -276,7 +277,7 @@ public Path getWorkingDirectory() {
/** /**
* Create a har specific auth * Create a har specific auth
* har-underlyingfs:port * har-underlyingfs:port
* @param underLyingURI the uri of underlying * @param underLyingUri the uri of underlying
* filesystem * filesystem
* @return har specific auth * @return har specific auth
*/ */
@ -294,7 +295,12 @@ private String getHarAuth(URI underLyingUri) {
} }
return auth; return auth;
} }
@Override
protected URI getCanonicalUri() {
return fs.canonicalizeUri(getUri());
}
/** /**
* Returns the uri of this filesystem. * Returns the uri of this filesystem.
* The uri is of the form * The uri is of the form
@ -419,7 +425,7 @@ static BlockLocation[] fixBlockLocations(BlockLocation[] locations,
/** /**
* Get block locations from the underlying fs and fix their * Get block locations from the underlying fs and fix their
* offsets and lengths. * offsets and lengths.
* @param file the input filestatus to get block locations * @param file the input file status to get block locations
* @param start the start of the desired range in the contained file * @param start the start of the desired range in the contained file
* @param len the length of the desired range * @param len the length of the desired range
* @return block locations for this segment of file * @return block locations for this segment of file
@ -441,8 +447,7 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
} }
/** /**
* the hash of the path p inside iniside * the hash of the path p inside the filesystem
* the filesystem
* @param p the path in the harfilesystem * @param p the path in the harfilesystem
* @return the hash code of the path. * @return the hash code of the path.
*/ */
@ -475,13 +480,9 @@ public Store(long begin, long end, int startHash, int endHash) {
* the parent path directory * the parent path directory
* @param statuses * @param statuses
* the list to add the children filestatuses to * the list to add the children filestatuses to
* @param children
* the string list of children for this parent
* @param archiveIndexStat
* the archive index filestatus
*/ */
private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses, private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses)
List<String> children) throws IOException { throws IOException {
String parentString = parent.getName(); String parentString = parent.getName();
if (!parentString.endsWith(Path.SEPARATOR)){ if (!parentString.endsWith(Path.SEPARATOR)){
parentString += Path.SEPARATOR; parentString += Path.SEPARATOR;
@ -547,7 +548,7 @@ private FileStatus toFileStatus(HarStatus h,
// stored in a single line in the index files // stored in a single line in the index files
// the format is of the form // the format is of the form
// filename "dir"/"file" partFileName startIndex length // filename "dir"/"file" partFileName startIndex length
// <space seperated children> // <space separated children>
private class HarStatus { private class HarStatus {
boolean isDir; boolean isDir;
String name; String name;
@ -666,7 +667,6 @@ public FileChecksum getFileChecksum(Path f) {
public FSDataInputStream open(Path f, int bufferSize) throws IOException { public FSDataInputStream open(Path f, int bufferSize) throws IOException {
// get the fs DataInputStream for the underlying file // get the fs DataInputStream for the underlying file
HarStatus hstatus = getFileHarStatus(f); HarStatus hstatus = getFileHarStatus(f);
// we got it.. woo hooo!!!
if (hstatus.isDir()) { if (hstatus.isDir()) {
throw new FileNotFoundException(f + " : not a file in " + throw new FileNotFoundException(f + " : not a file in " +
archivePath); archivePath);
@ -686,7 +686,12 @@ public FSDataOutputStream create(Path f,
Progressable progress) throws IOException { Progressable progress) throws IOException {
throw new IOException("Har: create not allowed."); throw new IOException("Har: create not allowed.");
} }
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
throw new IOException("Har: append not allowed.");
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (fs != null) { if (fs != null) {
@ -704,9 +709,19 @@ public void close() throws IOException {
*/ */
@Override @Override
public boolean setReplication(Path src, short replication) throws IOException{ public boolean setReplication(Path src, short replication) throws IOException{
throw new IOException("Har: setreplication not allowed"); throw new IOException("Har: setReplication not allowed");
} }
@Override
public boolean rename(Path src, Path dst) throws IOException {
throw new IOException("Har: rename not allowed");
}
@Override
public FSDataOutputStream append(Path f) throws IOException {
throw new IOException("Har: append not allowed");
}
/** /**
* Not implemented. * Not implemented.
*/ */
@ -714,7 +729,7 @@ public boolean setReplication(Path src, short replication) throws IOException{
public boolean delete(Path f, boolean recursive) throws IOException { public boolean delete(Path f, boolean recursive) throws IOException {
throw new IOException("Har: delete not allowed"); throw new IOException("Har: delete not allowed");
} }
/** /**
* liststatus returns the children of a directory * liststatus returns the children of a directory
* after looking up the index files. * after looking up the index files.
@ -733,7 +748,7 @@ public FileStatus[] listStatus(Path f) throws IOException {
throw new FileNotFoundException("File " + f + " not found in " + archivePath); throw new FileNotFoundException("File " + f + " not found in " + archivePath);
} }
if (hstatus.isDir()) { if (hstatus.isDir()) {
fileStatusesInIndex(hstatus, statuses, hstatus.children); fileStatusesInIndex(hstatus, statuses);
} else { } else {
statuses.add(toFileStatus(hstatus, null)); statuses.add(toFileStatus(hstatus, null));
} }
@ -748,7 +763,7 @@ public FileStatus[] listStatus(Path f) throws IOException {
public Path getHomeDirectory() { public Path getHomeDirectory() {
return new Path(uri.toString()); return new Path(uri.toString());
} }
@Override @Override
public void setWorkingDirectory(Path newDir) { public void setWorkingDirectory(Path newDir) {
//does nothing. //does nothing.
@ -811,7 +826,7 @@ public void setOwner(Path p, String username, String groupname)
* Not implemented. * Not implemented.
*/ */
@Override @Override
public void setPermission(Path p, FsPermission permisssion) public void setPermission(Path p, FsPermission permission)
throws IOException { throws IOException {
throw new IOException("Har: setPermission not allowed"); throw new IOException("Har: setPermission not allowed");
} }
@ -900,7 +915,7 @@ public synchronized int read(byte[] b, int offset, int len)
newlen = (int) (end - position); newlen = (int) (end - position);
} }
// end case // end case
if (newlen == 0) if (newlen == 0)
return ret; return ret;
ret = underLyingStream.read(b, offset, newlen); ret = underLyingStream.read(b, offset, newlen);
position += ret; position += ret;
@ -937,8 +952,8 @@ public synchronized void seek(long pos) throws IOException {
@Override @Override
public boolean seekToNewSource(long targetPos) throws IOException { public boolean seekToNewSource(long targetPos) throws IOException {
//do not need to implement this // do not need to implement this
// hdfs in itself does seektonewsource // hdfs in itself does seektonewsource
// while reading. // while reading.
return false; return false;
} }
@ -974,14 +989,12 @@ public void readFully(long pos, byte[] b) throws IOException {
} }
@Override @Override
public void setReadahead(Long readahead) public void setReadahead(Long readahead) throws IOException {
throws IOException, UnsupportedEncodingException {
underLyingStream.setReadahead(readahead); underLyingStream.setReadahead(readahead);
} }
@Override @Override
public void setDropBehind(Boolean dropBehind) public void setDropBehind(Boolean dropBehind) throws IOException {
throws IOException, UnsupportedEncodingException {
underLyingStream.setDropBehind(dropBehind); underLyingStream.setDropBehind(dropBehind);
} }
} }
@ -999,19 +1012,6 @@ public HarFSDataInputStream(FileSystem fs, Path p, long start,
long length, int bufsize) throws IOException { long length, int bufsize) throws IOException {
super(new HarFsInputStream(fs, p, start, length, bufsize)); super(new HarFsInputStream(fs, p, start, length, bufsize));
} }
/**
* constructor for har input stream.
* @param fs the underlying filesystem
* @param p the path in the underlying file system
* @param start the start position in the part file
* @param length the length of valid data in the part file.
* @throws IOException
*/
public HarFSDataInputStream(FileSystem fs, Path p, long start, long length)
throws IOException {
super(new HarFsInputStream(fs, p, start, length, 0));
}
} }
private class HarMetaData { private class HarMetaData {
@ -1058,7 +1058,7 @@ private int getVersion() {
} }
private void parseMetaData() throws IOException { private void parseMetaData() throws IOException {
Text line; Text line = new Text();
long read; long read;
FSDataInputStream in = null; FSDataInputStream in = null;
LineReader lin = null; LineReader lin = null;
@ -1068,7 +1068,6 @@ private void parseMetaData() throws IOException {
FileStatus masterStat = fs.getFileStatus(masterIndexPath); FileStatus masterStat = fs.getFileStatus(masterIndexPath);
masterIndexTimestamp = masterStat.getModificationTime(); masterIndexTimestamp = masterStat.getModificationTime();
lin = new LineReader(in, getConf()); lin = new LineReader(in, getConf());
line = new Text();
read = lin.readLine(line); read = lin.readLine(line);
// the first line contains the version of the index file // the first line contains the version of the index file
@ -1082,7 +1081,7 @@ private void parseMetaData() throws IOException {
} }
// each line contains a hashcode range and the index file name // each line contains a hashcode range and the index file name
String[] readStr = null; String[] readStr;
while(read < masterStat.getLen()) { while(read < masterStat.getLen()) {
int b = lin.readLine(line); int b = lin.readLine(line);
read += b; read += b;
@ -1094,6 +1093,9 @@ private void parseMetaData() throws IOException {
endHash)); endHash));
line.clear(); line.clear();
} }
} catch (IOException ioe) {
LOG.warn("Encountered exception ", ioe);
throw ioe;
} finally { } finally {
IOUtils.cleanup(LOG, lin, in); IOUtils.cleanup(LOG, lin, in);
} }

View File

@ -18,14 +18,6 @@
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
@ -34,6 +26,14 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;
import static org.junit.Assert.*;
/** /**
* This test class checks basic operations with {@link HarFileSystem} including * This test class checks basic operations with {@link HarFileSystem} including
* various initialization cases, getters, and modification methods. * various initialization cases, getters, and modification methods.
@ -69,7 +69,7 @@ public class TestHarFileSystemBasics {
/* /*
* creates and returns fully initialized HarFileSystem * creates and returns fully initialized HarFileSystem
*/ */
private HarFileSystem createHarFileSysten(final Configuration conf) private HarFileSystem createHarFileSystem(final Configuration conf)
throws Exception { throws Exception {
localFileSystem = FileSystem.getLocal(conf); localFileSystem = FileSystem.getLocal(conf);
localFileSystem.initialize(new URI("file:///"), conf); localFileSystem.initialize(new URI("file:///"), conf);
@ -130,7 +130,7 @@ public void before() throws Exception {
} }
// create Har to test: // create Har to test:
conf = new Configuration(); conf = new Configuration();
harFileSystem = createHarFileSysten(conf); harFileSystem = createHarFileSystem(conf);
} }
@After @After
@ -232,6 +232,32 @@ public void testPositiveListFilesNotEndInColon() throws Exception {
assertTrue(p2.toUri().toString().startsWith("har://file-localhost/")); assertTrue(p2.toUri().toString().startsWith("har://file-localhost/"));
} }
@Test
public void testListLocatedStatus() throws Exception {
String testHarPath = this.getClass().getResource("/test.har").getPath();
URI uri = new URI("har://" + testHarPath);
HarFileSystem hfs = new HarFileSystem(localFileSystem);
hfs.initialize(uri, new Configuration());
// test.har has the following contents:
// dir1/1.txt
// dir1/2.txt
Set<String> expectedFileNames = new HashSet<String>();
expectedFileNames.add("1.txt");
expectedFileNames.add("2.txt");
// List contents of dir, and ensure we find all expected files
Path path = new Path("dir1");
RemoteIterator<LocatedFileStatus> fileList = hfs.listLocatedStatus(path);
while (fileList.hasNext()) {
String fileName = fileList.next().getPath().getName();
assertTrue(fileName + " not in expected files list", expectedFileNames.contains(fileName));
expectedFileNames.remove(fileName);
}
assertEquals("Didn't find all of the expected file names: " + expectedFileNames,
0, expectedFileNames.size());
}
// ========== Negative: // ========== Negative:
@Test @Test

View File

@ -0,0 +1,4 @@
%2F dir 1380270822000+511+root+wheel 0 0 dir1
%2Fdir1 dir 1380270441000+493+jdere+wheel 0 0 1.txt 2.txt
%2Fdir1%2F1.txt file part-0 0 0 1380270439000+420+jdere+wheel
%2Fdir1%2F2.txt file part-0 0 0 1380270441000+420+jdere+wheel

View File

@ -0,0 +1,2 @@
3
0 1210114968 0 232