diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ee2485766d..a6475b18da 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -273,6 +273,9 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-6160. Potential NullPointerException in MRClientProtocol interface implementation. (Rohith via jlowe) + MAPREDUCE-4879. TeraOutputFormat may overwrite an existing output + directory. (gera) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraGen.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraGen.java index 7e679343b4..e8b65038e7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraGen.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraGen.java @@ -289,10 +289,6 @@ public int run(String[] args) } setNumberOfRows(job, parseHumanLong(args[0])); Path outputDir = new Path(args[1]); - if (outputDir.getFileSystem(getConf()).exists(outputDir)) { - throw new IOException("Output directory " + outputDir + - " already exists."); - } FileOutputFormat.setOutputPath(job, outputDir); job.setJobName("TeraGen"); job.setJarByClass(TeraGen.class); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java index 872e71917b..867f33ef7a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java @@ -20,10 +20,13 @@ import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapred.InvalidJobConfException; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -87,9 +90,31 @@ public void checkOutputSpecs(JobContext job throw new InvalidJobConfException("Output directory not set in JobConf."); } + final Configuration jobConf = job.getConfiguration(); + // get delegation token for outDir's file system TokenCache.obtainTokensForNamenodes(job.getCredentials(), - new Path[] { outDir }, job.getConfiguration()); + new Path[] { outDir }, jobConf); + + final FileSystem fs = outDir.getFileSystem(jobConf); + + if (fs.exists(outDir)) { + // existing output dir is considered empty iff its only content is the + // partition file. + // + final FileStatus[] outDirKids = fs.listStatus(outDir); + boolean empty = false; + if (outDirKids != null && outDirKids.length == 1) { + final FileStatus st = outDirKids[0]; + final String fname = st.getPath().getName(); + empty = + !st.isDirectory() && TeraInputFormat.PARTITION_FILENAME.equals(fname); + } + if (TeraSort.getUseSimplePartitioner(job) || !empty) { + throw new FileAlreadyExistsException("Output directory " + outDir + + " already exists"); + } + } } public RecordWriter getRecordWriter(TaskAttemptContext job diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java index 4a11c9a331..1956872b7e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java @@ -20,17 +20,19 @@ import java.io.File; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapred.HadoopTestCase; import org.apache.hadoop.util.ToolRunner; -import org.junit.Ignore; -@Ignore public class TestTeraSort extends HadoopTestCase { + private static Log LOG = LogFactory.getLog(TestTeraSort.class); public TestTeraSort() throws IOException { - super(CLUSTER_MR, DFS_FS, 1, 1); + super(LOCAL_MR, LOCAL_FS, 1, 1); } protected void tearDown() throws Exception { @@ -45,42 +47,58 @@ protected void tearDown() throws Exception { private static final Path SORT_INPUT_PATH = new Path(TEST_DIR, "sortin"); private static final Path SORT_OUTPUT_PATH = new Path(TEST_DIR, "sortout"); private static final Path TERA_OUTPUT_PATH = new Path(TEST_DIR, "validate"); - private static final String NUM_ROWS = "100"; + private static final String NUM_ROWS = "100"; - private void runTeraGen(Configuration conf, Path sortInput) + private void runTeraGen(Configuration conf, Path sortInput) throws Exception { String[] genArgs = {NUM_ROWS, sortInput.toString()}; - + // Run TeraGen assertEquals(ToolRunner.run(conf, new TeraGen(), genArgs), 0); } - + private void runTeraSort(Configuration conf, Path sortInput, Path sortOutput) throws Exception { // Setup command-line arguments to 'sort' String[] sortArgs = {sortInput.toString(), sortOutput.toString()}; - + // Run Sort assertEquals(ToolRunner.run(conf, new TeraSort(), sortArgs), 0); } - - private void runTeraValidator(Configuration job, - Path sortOutput, Path valOutput) + + private void runTeraValidator(Configuration job, + Path sortOutput, Path valOutput) throws Exception { String[] svArgs = {sortOutput.toString(), valOutput.toString()}; // Run Tera-Validator assertEquals(ToolRunner.run(job, new TeraValidate(), svArgs), 0); } - + public void testTeraSort() throws Exception { // Run TeraGen to generate input for 'terasort' runTeraGen(createJobConf(), SORT_INPUT_PATH); + // Run teragen again to check for FAE + try { + runTeraGen(createJobConf(), SORT_INPUT_PATH); + fail("Teragen output overwritten!"); + } catch (FileAlreadyExistsException fae) { + LOG.info("Expected exception: ", fae); + } + // Run terasort runTeraSort(createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH); + // Run terasort again to check for FAE + try { + runTeraSort(createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH); + fail("Terasort output overwritten!"); + } catch (FileAlreadyExistsException fae) { + LOG.info("Expected exception: ", fae); + } + // Run tera-validator to check if sort worked correctly runTeraValidator(createJobConf(), SORT_OUTPUT_PATH, TERA_OUTPUT_PATH);