diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 730f0bd1ce..17ec69020a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -152,6 +152,9 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5316. job -list-attempt-ids command does not handle illegal task-state (Ashwin Shankar via jlowe) + MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the + input path dir (Devaraj K via jlowe) + Release 2.2.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -1187,6 +1190,9 @@ Release 0.23.10 - UNRELEASED BUG FIXES + MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the + input path dir (Devaraj K via jlowe) + Release 0.23.9 - 2013-07-08 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java index a68db672bf..1c9ae3b79e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java @@ -69,6 +69,10 @@ public static enum Counter { public static final String NUM_INPUT_FILES = org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES; + + public static final String INPUT_DIR_RECURSIVE = + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE; + private static final double SPLIT_SLOP = 1.1; // 10% slop @@ -192,7 +196,7 @@ protected FileStatus[] listStatus(JobConf job) throws IOException { TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); // Whether we need to recursive look into the directory structure - boolean recursive = job.getBoolean("mapred.input.dir.recursive", false); + boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); List result = new ArrayList(); List errors = new ArrayList(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java index 56c2cdcc87..457e2c4838 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java @@ -64,6 +64,8 @@ public abstract class FileInputFormat extends InputFormat { "mapreduce.input.pathFilter.class"; public static final String NUM_INPUT_FILES = "mapreduce.input.fileinputformat.numinputfiles"; + public static final String INPUT_DIR_RECURSIVE = + "mapreduce.input.fileinputformat.input.dir.recursive"; private static final Log LOG = LogFactory.getLog(FileInputFormat.class); @@ -102,6 +104,27 @@ public boolean accept(Path path) { return true; } } + + /** + * @param job + * the job to modify + * @param inputDirRecursive + */ + public static void setInputDirRecursive(Job job, + boolean inputDirRecursive) { + job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE, + inputDirRecursive); + } + + /** + * @param job + * the job to look at. + * @return should the files to be read recursively? + */ + public static boolean getInputDirRecursive(JobContext job) { + return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE, + false); + } /** * Get the lower bound on split size imposed by the format. @@ -210,6 +233,9 @@ protected List listStatus(JobContext job TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job.getConfiguration()); + // Whether we need to recursive look into the directory structure + boolean recursive = getInputDirRecursive(job); + List errors = new ArrayList(); // creates a MultiPathFilter with the hiddenFileFilter and the @@ -235,7 +261,11 @@ protected List listStatus(JobContext job if (globStat.isDirectory()) { for(FileStatus stat: fs.listStatus(globStat.getPath(), inputFilter)) { - result.add(stat); + if (recursive && stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), inputFilter); + } else { + result.add(stat); + } } } else { result.add(globStat); @@ -251,6 +281,31 @@ protected List listStatus(JobContext job return result; } + /** + * Add files in the input path recursively into the results. + * @param result + * The List to store all files. + * @param fs + * The FileSystem. + * @param path + * The input path. + * @param inputFilter + * The input filter that can be used to filter files/dirs. + * @throws IOException + */ + protected void addInputPathRecursively(List result, + FileSystem fs, Path path, PathFilter inputFilter) + throws IOException { + for(FileStatus stat: fs.listStatus(path, inputFilter)) { + if (stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), inputFilter); + } else { + result.add(stat); + } + } + } + + /** * A factory that makes the split for this class. It can be overridden * by sub-classes to make sub-types diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java index be25a8f328..424fce3e5c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java @@ -23,6 +23,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; @@ -528,6 +529,8 @@ private static void addDeprecatedKeys() { MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST); Configuration.addDeprecation(JTConfig.JT_MAX_JOB_SPLIT_METAINFO_SIZE, MRJobConfig.SPLIT_METAINFO_MAXSIZE); + Configuration.addDeprecation("mapred.input.dir.recursive", + FileInputFormat.INPUT_DIR_RECURSIVE); } public static void main(String[] args) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java new file mode 100644 index 0000000000..e964782914 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Test; + +public class TestFileInputFormat { + + @Test + public void testNumInputFilesRecursively() throws Exception { + Configuration conf = getConfiguration(); + conf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true"); + Job job = Job.getInstance(conf); + FileInputFormat fileInputFormat = new TextInputFormat(); + List splits = fileInputFormat.getSplits(job); + Assert.assertEquals("Input splits are not correct", 3, splits.size()); + Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0)) + .getPath().toString()); + Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1)) + .getPath().toString()); + Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath() + .toString()); + + // Using the deprecated configuration + conf = getConfiguration(); + conf.set("mapred.input.dir.recursive", "true"); + job = Job.getInstance(conf); + splits = fileInputFormat.getSplits(job); + Assert.assertEquals("Input splits are not correct", 3, splits.size()); + Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0)) + .getPath().toString()); + Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1)) + .getPath().toString()); + Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath() + .toString()); + } + + @Test + public void testNumInputFilesWithoutRecursively() throws Exception { + Configuration conf = getConfiguration(); + Job job = Job.getInstance(conf); + FileInputFormat fileInputFormat = new TextInputFormat(); + List splits = fileInputFormat.getSplits(job); + Assert.assertEquals("Input splits are not correct", 2, splits.size()); + Assert.assertEquals("test:/a1/a2", ((FileSplit) splits.get(0)).getPath() + .toString()); + Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(1)).getPath() + .toString()); + } + + private Configuration getConfiguration() { + Configuration conf = new Configuration(); + conf.set("fs.test.impl.disable.cache", "true"); + conf.setClass("fs.test.impl", MockFileSystem.class, FileSystem.class); + conf.set(FileInputFormat.INPUT_DIR, "test:///a1"); + return conf; + } + + static class MockFileSystem extends RawLocalFileSystem { + + @Override + public FileStatus[] listStatus(Path f) throws FileNotFoundException, + IOException { + if (f.toString().equals("test:/a1")) { + return new FileStatus[] { + new FileStatus(10, true, 1, 150, 150, new Path("test:/a1/a2")), + new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) }; + } else if (f.toString().equals("test:/a1/a2")) { + return new FileStatus[] { + new FileStatus(10, false, 1, 150, 150, + new Path("test:/a1/a2/file2")), + new FileStatus(10, false, 1, 151, 150, + new Path("test:/a1/a2/file3")) }; + } + return new FileStatus[0]; + } + + @Override + public FileStatus[] globStatus(Path pathPattern, PathFilter filter) + throws IOException { + return new FileStatus[] { new FileStatus(10, true, 1, 150, 150, + pathPattern) }; + } + + @Override + public FileStatus[] listStatus(Path f, PathFilter filter) + throws FileNotFoundException, IOException { + return this.listStatus(f); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java index 6b0406f028..0e131e1074 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java @@ -190,7 +190,7 @@ public void testMultiLevelInput() throws Exception { + "directory with directories inside.", exceptionThrown); // Enable multi-level/recursive inputs - job.setBoolean("mapred.input.dir.recursive", true); + job.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, true); InputSplit[] splits = inFormat.getSplits(job, 1); assertEquals(splits.length, 2); }