From 16e21dfe92d61deb539c78bc2e43e04d7b474f88 Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Tue, 30 Oct 2012 05:41:59 +0000 Subject: [PATCH] MAPREDUCE-1806. CombineFileInputFormat does not work with paths not on default FS. (Gera Shegalov via tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1403614 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 2 ++ .../lib/input/CombineFileInputFormat.java | 3 +- .../lib/input/TestCombineFileInputFormat.java | 36 ++++++++++++++++++- 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f0c4d25398..d3d3b4398d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -192,6 +192,8 @@ Release 2.0.3-alpha - Unreleased MAPREDUCE-4637. Handle TaskAttempt diagnostic updates while in the NEW and UNASSIGNED states. (Mayank Bansal via sseth) + MAPREDUCE-1806. CombineFileInputFormat does not work with paths not on default FS. (Gera Shegalov via tucu) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java index b62c2fb0aa..984c9cc2dd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java @@ -215,7 +215,8 @@ public List getSplits(JobContext job) // times, one time each for each pool in the next loop. List newpaths = new LinkedList(); for (int i = 0; i < paths.length; i++) { - Path p = new Path(paths[i].toUri().getPath()); + FileSystem fs = paths[i].getFileSystem(conf); + Path p = fs.makeQualified(paths[i]); newpaths.add(p); } paths = null; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java index df08c539da..889443a84c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java @@ -76,6 +76,8 @@ public class TestCombineFileInputFormat extends TestCase { static final int BLOCKSIZE = 1024; static final byte[] databuf = new byte[BLOCKSIZE]; + private static final String DUMMY_FS_URI = "dummyfs:///"; + /** Dummy class to extend CombineFileInputFormat*/ private class DummyInputFormat extends CombineFileInputFormat { @Override @@ -1145,6 +1147,38 @@ public void testForEmptyFile() throws Exception { fileSys.delete(file.getParent(), true); } + /** + * Test when input files are from non-default file systems + */ + @Test + public void testForNonDefaultFileSystem() throws Throwable { + Configuration conf = new Configuration(); + + // use a fake file system scheme as default + conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI); + + // default fs path + assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString()); + // add a local file + Path localPath = new Path("testFile1"); + FileSystem lfs = FileSystem.getLocal(conf); + FSDataOutputStream dos = lfs.create(localPath); + dos.writeChars("Local file for CFIF"); + dos.close(); + + Job job = Job.getInstance(conf); + FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath)); + DummyInputFormat inFormat = new DummyInputFormat(); + List splits = inFormat.getSplits(job); + assertTrue(splits.size() > 0); + for (InputSplit s : splits) { + CombineFileSplit cfs = (CombineFileSplit)s; + for (Path p : cfs.getPaths()) { + assertEquals(p.toUri().getScheme(), "file"); + } + } + } + static class TestFilter implements PathFilter { private Path p; @@ -1156,7 +1190,7 @@ public TestFilter(Path p) { // returns true if the specified path matches the prefix stored // in this TestFilter. public boolean accept(Path path) { - if (path.toString().indexOf(p.toString()) == 0) { + if (path.toUri().getPath().indexOf(p.toString()) == 0) { return true; } return false;