MAPREDUCE-3170. Fixed job output commit for deep hierarchies. Contributed by Hitesh Shah.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1183185 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-14 01:16:30 +00:00
parent c46dbedaf9
commit c7fb49b3c5
5 changed files with 252 additions and 68 deletions

View File

@ -1608,6 +1608,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2789. Complete schedulingInfo on CLI. (Eric Payne via acmurthy) MAPREDUCE-2789. Complete schedulingInfo on CLI. (Eric Payne via acmurthy)
MAPREDUCE-3170. Fixed job output commit for deep hierarchies. (Hitesh Shah
via acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -71,27 +71,30 @@ public void commitJob(JobContext context) throws IOException {
//delete the task temp directory from the current jobtempdir //delete the task temp directory from the current jobtempdir
JobConf conf = context.getJobConf(); JobConf conf = context.getJobConf();
Path outputPath = FileOutputFormat.getOutputPath(conf); Path outputPath = FileOutputFormat.getOutputPath(conf);
FileSystem outputFileSystem = outputPath.getFileSystem(conf); if (outputPath != null) {
Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + FileSystem outputFileSystem = outputPath.getFileSystem(conf);
Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME); Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration()); Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
if (fileSys.exists(tmpDir)) { FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
fileSys.delete(tmpDir, true); if (fileSys.exists(tmpDir)) {
} else { fileSys.delete(tmpDir, true);
LOG.warn("Task temp dir could not be deleted " + tmpDir); } else {
} LOG.warn("Task temp dir could not be deleted " + tmpDir);
}
//move the job output to final place
Path jobOutputPath = //move the job output to final place
new Path(outputPath, getJobAttemptBaseDirName(context)); Path jobOutputPath =
moveJobOutputs(outputFileSystem, outputPath, jobOutputPath); new Path(outputPath, getJobAttemptBaseDirName(context));
moveJobOutputs(outputFileSystem,
// delete the _temporary folder in the output folder jobOutputPath, outputPath, jobOutputPath);
cleanupJob(context);
// check if the output-dir marking is required // delete the _temporary folder in the output folder
if (shouldMarkOutputDir(context.getJobConf())) { cleanupJob(context);
// create a _success file in the output folder // check if the output-dir marking is required
markOutputDirSuccessful(context); if (shouldMarkOutputDir(context.getJobConf())) {
// create a _success file in the output folder
markOutputDirSuccessful(context);
}
} }
} }
@ -109,10 +112,14 @@ private void markOutputDirSuccessful(JobContext context) throws IOException {
} }
} }
private void moveJobOutputs(FileSystem fs, private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath,
Path finalOutputDir, Path jobOutput) throws IOException { Path finalOutputDir, Path jobOutput) throws IOException {
LOG.debug("Told to move job output from " + jobOutput
+ " to " + finalOutputDir +
" and orig job output path is " + origJobOutputPath);
if (fs.isFile(jobOutput)) { if (fs.isFile(jobOutput)) {
Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput); Path finalOutputPath =
getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
if (!fs.rename(jobOutput, finalOutputPath)) { if (!fs.rename(jobOutput, finalOutputPath)) {
if (!fs.delete(finalOutputPath, true)) { if (!fs.delete(finalOutputPath, true)) {
throw new IOException("Failed to delete earlier output of job"); throw new IOException("Failed to delete earlier output of job");
@ -121,18 +128,23 @@ private void moveJobOutputs(FileSystem fs,
throw new IOException("Failed to save output of job"); throw new IOException("Failed to save output of job");
} }
} }
LOG.debug("Moved " + jobOutput + " to " + finalOutputPath); LOG.debug("Moved job output file from " + jobOutput + " to " +
finalOutputPath);
} else if (fs.getFileStatus(jobOutput).isDirectory()) { } else if (fs.getFileStatus(jobOutput).isDirectory()) {
LOG.debug("Job output file " + jobOutput + " is a dir");
FileStatus[] paths = fs.listStatus(jobOutput); FileStatus[] paths = fs.listStatus(jobOutput);
Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput); Path finalOutputPath =
getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
fs.mkdirs(finalOutputPath); fs.mkdirs(finalOutputPath);
LOG.debug("Creating dirs along job output path " + finalOutputPath);
if (paths != null) { if (paths != null) {
for (FileStatus path : paths) { for (FileStatus path : paths) {
moveJobOutputs(fs, finalOutputDir, path.getPath()); moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
} }
} }
} }
} }
@Override @Override
@Deprecated @Deprecated
public void cleanupJob(JobContext context) throws IOException { public void cleanupJob(JobContext context) throws IOException {
@ -199,8 +211,10 @@ private void moveTaskOutputs(TaskAttemptContext context,
throws IOException { throws IOException {
TaskAttemptID attemptId = context.getTaskAttemptID(); TaskAttemptID attemptId = context.getTaskAttemptID();
context.getProgressible().progress(); context.getProgressible().progress();
LOG.debug("Told to move taskoutput from " + taskOutput
+ " to " + jobOutputDir);
if (fs.isFile(taskOutput)) { if (fs.isFile(taskOutput)) {
Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput,
getTempTaskOutputPath(context)); getTempTaskOutputPath(context));
if (!fs.rename(taskOutput, finalOutputPath)) { if (!fs.rename(taskOutput, finalOutputPath)) {
if (!fs.delete(finalOutputPath, true)) { if (!fs.delete(finalOutputPath, true)) {
@ -214,10 +228,12 @@ private void moveTaskOutputs(TaskAttemptContext context,
} }
LOG.debug("Moved " + taskOutput + " to " + finalOutputPath); LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
} else if(fs.getFileStatus(taskOutput).isDirectory()) { } else if(fs.getFileStatus(taskOutput).isDirectory()) {
LOG.debug("Taskoutput " + taskOutput + " is a dir");
FileStatus[] paths = fs.listStatus(taskOutput); FileStatus[] paths = fs.listStatus(taskOutput);
Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput,
getTempTaskOutputPath(context)); getTempTaskOutputPath(context));
fs.mkdirs(finalOutputPath); fs.mkdirs(finalOutputPath);
LOG.debug("Creating dirs along path " + finalOutputPath);
if (paths != null) { if (paths != null) {
for (FileStatus path : paths) { for (FileStatus path : paths) {
moveTaskOutputs(context, fs, jobOutputDir, path.getPath()); moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
@ -235,14 +251,16 @@ public void abortTask(TaskAttemptContext context) throws IOException {
} }
} }
private Path getFinalPath(Path jobOutputDir, Path taskOutput, @SuppressWarnings("deprecation")
private Path getFinalPath(FileSystem fs, Path jobOutputDir, Path taskOutput,
Path taskOutputPath) throws IOException { Path taskOutputPath) throws IOException {
URI taskOutputUri = taskOutput.toUri(); URI taskOutputUri = taskOutput.makeQualified(fs).toUri();
URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri); URI taskOutputPathUri = taskOutputPath.makeQualified(fs).toUri();
URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
if (taskOutputUri == relativePath) { if (taskOutputUri == relativePath) {
//taskOutputPath is not a parent of taskOutput //taskOutputPath is not a parent of taskOutput
throw new IOException("Can not get the relative path: base = " + throw new IOException("Can not get the relative path: base = " +
taskOutputPath + " child = " + taskOutput); taskOutputPathUri + " child = " + taskOutputUri);
} }
if (relativePath.getPath().length() > 0) { if (relativePath.getPath().length() > 0) {
return new Path(jobOutputDir, relativePath.getPath()); return new Path(jobOutputDir, relativePath.getPath());
@ -325,7 +343,10 @@ public void recoverTask(TaskAttemptContext context)
new Path(outputPath, getJobAttemptBaseDirName(previousAttempt)); new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
if (outputFileSystem.exists(pathToRecover)) { if (outputFileSystem.exists(pathToRecover)) {
// Move the task outputs to their final place // Move the task outputs to their final place
moveJobOutputs(outputFileSystem, jobOutputPath, pathToRecover); LOG.debug("Trying to recover task from " + pathToRecover
+ " into " + jobOutputPath);
moveJobOutputs(outputFileSystem,
pathToRecover, jobOutputPath, pathToRecover);
LOG.info("Saved output of job to " + jobOutputPath); LOG.info("Saved output of job to " + jobOutputPath);
} }
} }

View File

@ -111,32 +111,48 @@ private void markOutputDirSuccessful(MRJobConfig context) throws IOException {
* @param context the job's context * @param context the job's context
*/ */
public void commitJob(JobContext context) throws IOException { public void commitJob(JobContext context) throws IOException {
//delete the task temp directory from the current jobtempdir if (outputPath != null) {
Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + //delete the task temp directory from the current jobtempdir
Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME); Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration()); Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
if (fileSys.exists(tmpDir)) { FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
fileSys.delete(tmpDir, true); if (fileSys.exists(tmpDir)) {
} else { fileSys.delete(tmpDir, true);
LOG.warn("Task temp dir could not be deleted " + tmpDir); } else {
} LOG.warn("Task temp dir could not be deleted " + tmpDir);
}
//move the job output to final place
Path jobOutputPath = //move the job output to final place
new Path(outputPath, getJobAttemptBaseDirName(context)); Path jobOutputPath =
moveJobOutputs(outputFileSystem, outputPath, jobOutputPath); new Path(outputPath, getJobAttemptBaseDirName(context));
moveJobOutputs(outputFileSystem, jobOutputPath, outputPath, jobOutputPath);
// delete the _temporary folder and create a _done file in the o/p folder
cleanupJob(context); // delete the _temporary folder and create a _done file in the o/p folder
if (shouldMarkOutputDir(context.getConfiguration())) { cleanupJob(context);
markOutputDirSuccessful(context); if (shouldMarkOutputDir(context.getConfiguration())) {
markOutputDirSuccessful(context);
}
} }
} }
private void moveJobOutputs(FileSystem fs, /**
* Move job output to final location
* @param fs Filesystem handle
* @param origJobOutputPath The original location of the job output
* Required to generate the relative path for correct moving of data.
* @param finalOutputDir The final output directory to which the job output
* needs to be moved
* @param jobOutput The current job output directory being moved
* @throws IOException
*/
private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath,
Path finalOutputDir, Path jobOutput) throws IOException { Path finalOutputDir, Path jobOutput) throws IOException {
LOG.debug("Told to move job output from " + jobOutput
+ " to " + finalOutputDir +
" and orig job output path is " + origJobOutputPath);
if (fs.isFile(jobOutput)) { if (fs.isFile(jobOutput)) {
Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput); Path finalOutputPath =
getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
if (!fs.rename(jobOutput, finalOutputPath)) { if (!fs.rename(jobOutput, finalOutputPath)) {
if (!fs.delete(finalOutputPath, true)) { if (!fs.delete(finalOutputPath, true)) {
throw new IOException("Failed to delete earlier output of job"); throw new IOException("Failed to delete earlier output of job");
@ -145,14 +161,18 @@ private void moveJobOutputs(FileSystem fs,
throw new IOException("Failed to save output of job"); throw new IOException("Failed to save output of job");
} }
} }
LOG.debug("Moved " + jobOutput + " to " + finalOutputPath); LOG.debug("Moved job output file from " + jobOutput + " to " +
finalOutputPath);
} else if (fs.getFileStatus(jobOutput).isDirectory()) { } else if (fs.getFileStatus(jobOutput).isDirectory()) {
LOG.debug("Job output file " + jobOutput + " is a dir");
FileStatus[] paths = fs.listStatus(jobOutput); FileStatus[] paths = fs.listStatus(jobOutput);
Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput); Path finalOutputPath =
getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
fs.mkdirs(finalOutputPath); fs.mkdirs(finalOutputPath);
LOG.debug("Creating dirs along job output path " + finalOutputPath);
if (paths != null) { if (paths != null) {
for (FileStatus path : paths) { for (FileStatus path : paths) {
moveJobOutputs(fs, finalOutputDir, path.getPath()); moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
} }
} }
} }
@ -233,6 +253,8 @@ private void moveTaskOutputs(TaskAttemptContext context,
throws IOException { throws IOException {
TaskAttemptID attemptId = context.getTaskAttemptID(); TaskAttemptID attemptId = context.getTaskAttemptID();
context.progress(); context.progress();
LOG.debug("Told to move taskoutput from " + taskOutput
+ " to " + jobOutputDir);
if (fs.isFile(taskOutput)) { if (fs.isFile(taskOutput)) {
Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
workPath); workPath);
@ -248,9 +270,11 @@ private void moveTaskOutputs(TaskAttemptContext context,
} }
LOG.debug("Moved " + taskOutput + " to " + finalOutputPath); LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
} else if(fs.getFileStatus(taskOutput).isDirectory()) { } else if(fs.getFileStatus(taskOutput).isDirectory()) {
LOG.debug("Taskoutput " + taskOutput + " is a dir");
FileStatus[] paths = fs.listStatus(taskOutput); FileStatus[] paths = fs.listStatus(taskOutput);
Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath); Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath);
fs.mkdirs(finalOutputPath); fs.mkdirs(finalOutputPath);
LOG.debug("Creating dirs along path " + finalOutputPath);
if (paths != null) { if (paths != null) {
for (FileStatus path : paths) { for (FileStatus path : paths) {
moveTaskOutputs(context, fs, jobOutputDir, path.getPath()); moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
@ -281,12 +305,17 @@ public void abortTask(TaskAttemptContext context) throws IOException {
* @throws IOException * @throws IOException
*/ */
private Path getFinalPath(Path jobOutputDir, Path taskOutput, private Path getFinalPath(Path jobOutputDir, Path taskOutput,
Path taskOutputPath) throws IOException { Path taskOutputPath) throws IOException {
URI taskOutputUri = taskOutput.toUri(); URI taskOutputUri = taskOutput.makeQualified(outputFileSystem.getUri(),
URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri); outputFileSystem.getWorkingDirectory()).toUri();
URI taskOutputPathUri =
taskOutputPath.makeQualified(
outputFileSystem.getUri(),
outputFileSystem.getWorkingDirectory()).toUri();
URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
if (taskOutputUri == relativePath) { if (taskOutputUri == relativePath) {
throw new IOException("Can not get the relative path: base = " + throw new IOException("Can not get the relative path: base = " +
taskOutputPath + " child = " + taskOutput); taskOutputPathUri + " child = " + taskOutputUri);
} }
if (relativePath.getPath().length() > 0) { if (relativePath.getPath().length() > 0) {
return new Path(jobOutputDir, relativePath.getPath()); return new Path(jobOutputDir, relativePath.getPath());
@ -334,9 +363,12 @@ public void recoverTask(TaskAttemptContext context)
Path pathToRecover = Path pathToRecover =
new Path(outputPath, getJobAttemptBaseDirName(previousAttempt)); new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
LOG.debug("Trying to recover task from " + pathToRecover
+ " into " + jobOutputPath);
if (outputFileSystem.exists(pathToRecover)) { if (outputFileSystem.exists(pathToRecover)) {
// Move the task outputs to their final place // Move the task outputs to their final place
moveJobOutputs(outputFileSystem, jobOutputPath, pathToRecover); moveJobOutputs(outputFileSystem,
pathToRecover, jobOutputPath, pathToRecover);
LOG.info("Saved output of job to " + jobOutputPath); LOG.info("Saved output of job to " + jobOutputPath);
} }
} }

View File

@ -25,13 +25,17 @@
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class TestFileOutputCommitter extends TestCase { public class TestFileOutputCommitter extends TestCase {
private static Path outDir = new Path(System.getProperty("test.build.data", private static Path outDir = new Path(System.getProperty("test.build.data",
@ -65,6 +69,20 @@ private void writeOutput(RecordWriter theRecordWriter,
} }
} }
private void writeMapFileOutput(RecordWriter theRecordWriter,
TaskAttemptContext context) throws IOException, InterruptedException {
try {
int key = 0;
for (int i = 0 ; i < 10; ++i) {
key = i;
Text val = (i%2 == 1) ? val1 : val2;
theRecordWriter.write(new LongWritable(key),
val);
}
} finally {
theRecordWriter.close(null);
}
}
public void testRecovery() throws Exception { public void testRecovery() throws Exception {
JobConf conf = new JobConf(); JobConf conf = new JobConf();
@ -91,9 +109,7 @@ public void testRecovery() throws Exception {
FileOutputCommitter.getJobAttemptBaseDirName( FileOutputCommitter.getJobAttemptBaseDirName(
conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0))); conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
assertTrue((new File(jobTempDir1.toString()).exists())); assertTrue((new File(jobTempDir1.toString()).exists()));
validateContent(jobTempDir1); validateContent(jobTempDir1);
//now while running the second app attempt, //now while running the second app attempt,
//recover the task output from first attempt //recover the task output from first attempt
@ -131,6 +147,29 @@ private void validateContent(Path dir) throws IOException {
assertEquals(output, expectedOutput.toString()); assertEquals(output, expectedOutput.toString());
} }
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
public void testCommitter() throws Exception { public void testCommitter() throws Exception {
JobConf conf = new JobConf(); JobConf conf = new JobConf();
@ -159,6 +198,31 @@ public void testCommitter() throws Exception {
FileUtil.fullyDelete(new File(outDir.toString())); FileUtil.fullyDelete(new File(outDir.toString()));
} }
public void testMapFileOutputCommitter() throws Exception {
JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(null, conf, partFile, null);
writeMapFileOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
// validate output
validateMapFileOutputContent(FileSystem.get(conf), outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testAbort() throws IOException, InterruptedException { public void testAbort() throws IOException, InterruptedException {
JobConf conf = new JobConf(); JobConf conf = new JobConf();

View File

@ -26,10 +26,13 @@
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
@ -75,6 +78,20 @@ private void writeOutput(RecordWriter theRecordWriter,
} }
} }
private void writeMapFileOutput(RecordWriter theRecordWriter,
TaskAttemptContext context) throws IOException, InterruptedException {
try {
int key = 0;
for (int i = 0 ; i < 10; ++i) {
key = i;
Text val = (i%2 == 1) ? val1 : val2;
theRecordWriter.write(new LongWritable(key),
val);
}
} finally {
theRecordWriter.close(context);
}
}
public void testRecovery() throws Exception { public void testRecovery() throws Exception {
Job job = Job.getInstance(); Job job = Job.getInstance();
@ -101,9 +118,7 @@ public void testRecovery() throws Exception {
FileOutputCommitter.getJobAttemptBaseDirName( FileOutputCommitter.getJobAttemptBaseDirName(
conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0))); conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
assertTrue((new File(jobTempDir1.toString()).exists())); assertTrue((new File(jobTempDir1.toString()).exists()));
validateContent(jobTempDir1); validateContent(jobTempDir1);
//now while running the second app attempt, //now while running the second app attempt,
//recover the task output from first attempt //recover the task output from first attempt
@ -141,6 +156,29 @@ private void validateContent(Path dir) throws IOException {
assertEquals(output, expectedOutput.toString()); assertEquals(output, expectedOutput.toString());
} }
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
public void testCommitter() throws Exception { public void testCommitter() throws Exception {
Job job = Job.getInstance(); Job job = Job.getInstance();
@ -169,6 +207,32 @@ public void testCommitter() throws Exception {
FileUtil.fullyDelete(new File(outDir.toString())); FileUtil.fullyDelete(new File(outDir.toString()));
} }
public void testMapFileOutputCommitter() throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeMapFileOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
// validate output
validateMapFileOutputContent(FileSystem.get(job.getConfiguration()), outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testAbort() throws IOException, InterruptedException { public void testAbort() throws IOException, InterruptedException {
Job job = Job.getInstance(); Job job = Job.getInstance();