diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java index b0ec979648..fe43991a0e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java @@ -78,10 +78,13 @@ public enum Counter { public static final String NUM_INPUT_FILES = org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES; - + public static final String INPUT_DIR_RECURSIVE = org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE; + public static final String INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS = + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS; + private static final double SPLIT_SLOP = 1.1; // 10% slop @@ -319,16 +322,24 @@ protected FileSplit makeSplit(Path file, long start, long length, public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { StopWatch sw = new StopWatch().start(); - FileStatus[] files = listStatus(job); - + FileStatus[] stats = listStatus(job); + // Save the number of input files for metrics/loadgen - job.setLong(NUM_INPUT_FILES, files.length); + job.setLong(NUM_INPUT_FILES, stats.length); long totalSize = 0; // compute total size - for (FileStatus file: files) { // check we have valid files + boolean ignoreDirs = !job.getBoolean(INPUT_DIR_RECURSIVE, false) + && job.getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false); + + List files = new ArrayList<>(stats.length); + for (FileStatus file: stats) { // check we have valid files if (file.isDirectory()) { - throw new IOException("Not a file: "+ file.getPath()); + if (!ignoreDirs) { + throw new IOException("Not a file: "+ file.getPath()); + } + } else { + files.add(file); + totalSize += file.getLen(); } - totalSize += file.getLen(); } long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java index 9868e8ecde..e2d8e6fa7c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java @@ -76,6 +76,8 @@ public abstract class FileInputFormat extends InputFormat { "mapreduce.input.fileinputformat.numinputfiles"; public static final String INPUT_DIR_RECURSIVE = "mapreduce.input.fileinputformat.input.dir.recursive"; + public static final String INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS = + "mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs"; public static final String LIST_STATUS_NUM_THREADS = "mapreduce.input.fileinputformat.list-status.num-threads"; public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1; @@ -392,7 +394,13 @@ public List getSplits(JobContext job) throws IOException { // generate splits List splits = new ArrayList(); List files = listStatus(job); + + boolean ignoreDirs = !getInputDirRecursive(job) + && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false); for (FileStatus file: files) { + if (ignoreDirs && file.isDirectory()) { + continue; + } Path path = file.getPath(); long length = file.getLen(); if (length != 0) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java index d322011068..879cd3df32 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java @@ -102,7 +102,22 @@ public void testListLocatedStatus() throws Exception { 1, mockFs.numListLocatedStatusCalls); FileSystem.closeAll(); } - + + @Test + public void testIgnoreDirs() throws Exception { + Configuration conf = getConfiguration(); + conf.setBoolean(FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, true); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, "test:///a1"); + MockFileSystem mockFs = (MockFileSystem) new Path("test:///").getFileSystem(conf); + JobConf job = new JobConf(conf); + TextInputFormat fileInputFormat = new TextInputFormat(); + fileInputFormat.configure(job); + InputSplit[] splits = fileInputFormat.getSplits(job, 1); + Assert.assertEquals("Input splits are not correct", 1, splits.length); + FileSystem.closeAll(); + } + @Test public void testSplitLocationInfo() throws Exception { Configuration conf = getConfiguration(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java index 4c847fa293..3897a9b2b3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java @@ -123,6 +123,18 @@ public void testNumInputFilesWithoutRecursively() throws Exception { verifySplits(Lists.newArrayList("test:/a1/a2", "test:/a1/file1"), splits); } + @Test + public void testNumInputFilesIgnoreDirs() throws Exception { + Configuration conf = getConfiguration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + conf.setBoolean(FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, true); + Job job = Job.getInstance(conf); + FileInputFormat fileInputFormat = new TextInputFormat(); + List splits = fileInputFormat.getSplits(job); + Assert.assertEquals("Input splits are not correct", 1, splits.size()); + verifySplits(Lists.newArrayList("test:/a1/file1"), splits); + } + @Test public void testListLocatedStatus() throws Exception { Configuration conf = getConfiguration();