MAPREDUCE-4879. TeraOutputFormat may overwrite an existing output directory. (gera)

This commit is contained in:
Gera Shegalov 2014-12-13 17:48:42 -08:00
parent cbfb996fb4
commit 25a0440238
4 changed files with 59 additions and 17 deletions

View File

@ -273,6 +273,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-6160. Potential NullPointerException in MRClientProtocol MAPREDUCE-6160. Potential NullPointerException in MRClientProtocol
interface implementation. (Rohith via jlowe) interface implementation. (Rohith via jlowe)
MAPREDUCE-4879. TeraOutputFormat may overwrite an existing output
directory. (gera)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -289,10 +289,6 @@ public int run(String[] args)
} }
setNumberOfRows(job, parseHumanLong(args[0])); setNumberOfRows(job, parseHumanLong(args[0]));
Path outputDir = new Path(args[1]); Path outputDir = new Path(args[1]);
if (outputDir.getFileSystem(getConf()).exists(outputDir)) {
throw new IOException("Output directory " + outputDir +
" already exists.");
}
FileOutputFormat.setOutputPath(job, outputDir); FileOutputFormat.setOutputPath(job, outputDir);
job.setJobName("TeraGen"); job.setJobName("TeraGen");
job.setJarByClass(TeraGen.class); job.setJarByClass(TeraGen.class);

View File

@ -20,10 +20,13 @@
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.InvalidJobConfException; import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
@ -87,9 +90,31 @@ public void checkOutputSpecs(JobContext job
throw new InvalidJobConfException("Output directory not set in JobConf."); throw new InvalidJobConfException("Output directory not set in JobConf.");
} }
final Configuration jobConf = job.getConfiguration();
// get delegation token for outDir's file system // get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(), TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir }, job.getConfiguration()); new Path[] { outDir }, jobConf);
final FileSystem fs = outDir.getFileSystem(jobConf);
if (fs.exists(outDir)) {
// existing output dir is considered empty iff its only content is the
// partition file.
//
final FileStatus[] outDirKids = fs.listStatus(outDir);
boolean empty = false;
if (outDirKids != null && outDirKids.length == 1) {
final FileStatus st = outDirKids[0];
final String fname = st.getPath().getName();
empty =
!st.isDirectory() && TeraInputFormat.PARTITION_FILENAME.equals(fname);
}
if (TeraSort.getUseSimplePartitioner(job) || !empty) {
throw new FileAlreadyExistsException("Output directory " + outDir
+ " already exists");
}
}
} }
public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job

View File

@ -20,17 +20,19 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.HadoopTestCase; import org.apache.hadoop.mapred.HadoopTestCase;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.junit.Ignore;
@Ignore
public class TestTeraSort extends HadoopTestCase { public class TestTeraSort extends HadoopTestCase {
private static Log LOG = LogFactory.getLog(TestTeraSort.class);
public TestTeraSort() public TestTeraSort()
throws IOException { throws IOException {
super(CLUSTER_MR, DFS_FS, 1, 1); super(LOCAL_MR, LOCAL_FS, 1, 1);
} }
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
@ -45,42 +47,58 @@ protected void tearDown() throws Exception {
private static final Path SORT_INPUT_PATH = new Path(TEST_DIR, "sortin"); private static final Path SORT_INPUT_PATH = new Path(TEST_DIR, "sortin");
private static final Path SORT_OUTPUT_PATH = new Path(TEST_DIR, "sortout"); private static final Path SORT_OUTPUT_PATH = new Path(TEST_DIR, "sortout");
private static final Path TERA_OUTPUT_PATH = new Path(TEST_DIR, "validate"); private static final Path TERA_OUTPUT_PATH = new Path(TEST_DIR, "validate");
private static final String NUM_ROWS = "100"; private static final String NUM_ROWS = "100";
private void runTeraGen(Configuration conf, Path sortInput) private void runTeraGen(Configuration conf, Path sortInput)
throws Exception { throws Exception {
String[] genArgs = {NUM_ROWS, sortInput.toString()}; String[] genArgs = {NUM_ROWS, sortInput.toString()};
// Run TeraGen // Run TeraGen
assertEquals(ToolRunner.run(conf, new TeraGen(), genArgs), 0); assertEquals(ToolRunner.run(conf, new TeraGen(), genArgs), 0);
} }
private void runTeraSort(Configuration conf, private void runTeraSort(Configuration conf,
Path sortInput, Path sortOutput) throws Exception { Path sortInput, Path sortOutput) throws Exception {
// Setup command-line arguments to 'sort' // Setup command-line arguments to 'sort'
String[] sortArgs = {sortInput.toString(), sortOutput.toString()}; String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
// Run Sort // Run Sort
assertEquals(ToolRunner.run(conf, new TeraSort(), sortArgs), 0); assertEquals(ToolRunner.run(conf, new TeraSort(), sortArgs), 0);
} }
private void runTeraValidator(Configuration job, private void runTeraValidator(Configuration job,
Path sortOutput, Path valOutput) Path sortOutput, Path valOutput)
throws Exception { throws Exception {
String[] svArgs = {sortOutput.toString(), valOutput.toString()}; String[] svArgs = {sortOutput.toString(), valOutput.toString()};
// Run Tera-Validator // Run Tera-Validator
assertEquals(ToolRunner.run(job, new TeraValidate(), svArgs), 0); assertEquals(ToolRunner.run(job, new TeraValidate(), svArgs), 0);
} }
public void testTeraSort() throws Exception { public void testTeraSort() throws Exception {
// Run TeraGen to generate input for 'terasort' // Run TeraGen to generate input for 'terasort'
runTeraGen(createJobConf(), SORT_INPUT_PATH); runTeraGen(createJobConf(), SORT_INPUT_PATH);
// Run teragen again to check for FAE
try {
runTeraGen(createJobConf(), SORT_INPUT_PATH);
fail("Teragen output overwritten!");
} catch (FileAlreadyExistsException fae) {
LOG.info("Expected exception: ", fae);
}
// Run terasort // Run terasort
runTeraSort(createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH); runTeraSort(createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
// Run terasort again to check for FAE
try {
runTeraSort(createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
fail("Terasort output overwritten!");
} catch (FileAlreadyExistsException fae) {
LOG.info("Expected exception: ", fae);
}
// Run tera-validator to check if sort worked correctly // Run tera-validator to check if sort worked correctly
runTeraValidator(createJobConf(), SORT_OUTPUT_PATH, runTeraValidator(createJobConf(), SORT_OUTPUT_PATH,
TERA_OUTPUT_PATH); TERA_OUTPUT_PATH);