MAPREDUCE-1806. CombineFileInputFormat does not work with paths not on default FS. (Gera Shegalov via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1403614 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ba7a2e3898
commit
16e21dfe92
@ -192,6 +192,8 @@ Release 2.0.3-alpha - Unreleased
|
|||||||
MAPREDUCE-4637. Handle TaskAttempt diagnostic updates while in the NEW and
|
MAPREDUCE-4637. Handle TaskAttempt diagnostic updates while in the NEW and
|
||||||
UNASSIGNED states. (Mayank Bansal via sseth)
|
UNASSIGNED states. (Mayank Bansal via sseth)
|
||||||
|
|
||||||
|
MAPREDUCE-1806. CombineFileInputFormat does not work with paths not on default FS. (Gera Shegalov via tucu)
|
||||||
|
|
||||||
Release 2.0.2-alpha - 2012-09-07
|
Release 2.0.2-alpha - 2012-09-07
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -215,7 +215,8 @@ public List<InputSplit> getSplits(JobContext job)
|
|||||||
// times, one time each for each pool in the next loop.
|
// times, one time each for each pool in the next loop.
|
||||||
List<Path> newpaths = new LinkedList<Path>();
|
List<Path> newpaths = new LinkedList<Path>();
|
||||||
for (int i = 0; i < paths.length; i++) {
|
for (int i = 0; i < paths.length; i++) {
|
||||||
Path p = new Path(paths[i].toUri().getPath());
|
FileSystem fs = paths[i].getFileSystem(conf);
|
||||||
|
Path p = fs.makeQualified(paths[i]);
|
||||||
newpaths.add(p);
|
newpaths.add(p);
|
||||||
}
|
}
|
||||||
paths = null;
|
paths = null;
|
||||||
|
@ -76,6 +76,8 @@ public class TestCombineFileInputFormat extends TestCase {
|
|||||||
static final int BLOCKSIZE = 1024;
|
static final int BLOCKSIZE = 1024;
|
||||||
static final byte[] databuf = new byte[BLOCKSIZE];
|
static final byte[] databuf = new byte[BLOCKSIZE];
|
||||||
|
|
||||||
|
private static final String DUMMY_FS_URI = "dummyfs:///";
|
||||||
|
|
||||||
/** Dummy class to extend CombineFileInputFormat*/
|
/** Dummy class to extend CombineFileInputFormat*/
|
||||||
private class DummyInputFormat extends CombineFileInputFormat<Text, Text> {
|
private class DummyInputFormat extends CombineFileInputFormat<Text, Text> {
|
||||||
@Override
|
@Override
|
||||||
@ -1145,6 +1147,38 @@ public void testForEmptyFile() throws Exception {
|
|||||||
fileSys.delete(file.getParent(), true);
|
fileSys.delete(file.getParent(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test when input files are from non-default file systems
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testForNonDefaultFileSystem() throws Throwable {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
// use a fake file system scheme as default
|
||||||
|
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI);
|
||||||
|
|
||||||
|
// default fs path
|
||||||
|
assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString());
|
||||||
|
// add a local file
|
||||||
|
Path localPath = new Path("testFile1");
|
||||||
|
FileSystem lfs = FileSystem.getLocal(conf);
|
||||||
|
FSDataOutputStream dos = lfs.create(localPath);
|
||||||
|
dos.writeChars("Local file for CFIF");
|
||||||
|
dos.close();
|
||||||
|
|
||||||
|
Job job = Job.getInstance(conf);
|
||||||
|
FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath));
|
||||||
|
DummyInputFormat inFormat = new DummyInputFormat();
|
||||||
|
List<InputSplit> splits = inFormat.getSplits(job);
|
||||||
|
assertTrue(splits.size() > 0);
|
||||||
|
for (InputSplit s : splits) {
|
||||||
|
CombineFileSplit cfs = (CombineFileSplit)s;
|
||||||
|
for (Path p : cfs.getPaths()) {
|
||||||
|
assertEquals(p.toUri().getScheme(), "file");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static class TestFilter implements PathFilter {
|
static class TestFilter implements PathFilter {
|
||||||
private Path p;
|
private Path p;
|
||||||
|
|
||||||
@ -1156,7 +1190,7 @@ public TestFilter(Path p) {
|
|||||||
// returns true if the specified path matches the prefix stored
|
// returns true if the specified path matches the prefix stored
|
||||||
// in this TestFilter.
|
// in this TestFilter.
|
||||||
public boolean accept(Path path) {
|
public boolean accept(Path path) {
|
||||||
if (path.toString().indexOf(p.toString()) == 0) {
|
if (path.toUri().getPath().indexOf(p.toString()) == 0) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
Loading…
Reference in New Issue
Block a user