diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2bffc8a165..b344c3d5aa 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1750,6 +1750,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3249. Ensure shuffle-port is correctly used duringMR AM recovery. (vinodkv via acmurthy) + MAPREDUCE-3252. Fix map tasks to not rewrite data an extra time when + map output fits in spill buffer. (todd) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index 01c0b1bba4..7c47aa91d5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -36,8 +37,10 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.SequenceFile; @@ -1727,10 +1730,10 @@ private void mergeParts() throws IOException, InterruptedException, finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); } if (numSpills == 1) { //the spill is the final output - rfs.rename(filename[0], + sameVolRename(filename[0], mapOutputFile.getOutputFileForWriteInVolume(filename[0])); if (indexCacheList.size() == 0) { - rfs.rename(mapOutputFile.getSpillIndexFile(0), + sameVolRename(mapOutputFile.getSpillIndexFile(0), mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0])); } else { indexCacheList.get(0).writeToFile( @@ -1847,7 +1850,29 @@ private void mergeParts() throws IOException, InterruptedException, } } } - + + /** + * Rename srcPath to dstPath on the same volume. This is the same + * as RawLocalFileSystem's rename method, except that it will not + * fall back to a copy, and it will create the target directory + * if it doesn't exist. + */ + private void sameVolRename(Path srcPath, + Path dstPath) throws IOException { + RawLocalFileSystem rfs = (RawLocalFileSystem)this.rfs; + File src = rfs.pathToFile(srcPath); + File dst = rfs.pathToFile(dstPath); + if (!dst.getParentFile().exists()) { + if (!dst.getParentFile().mkdirs()) { + throw new IOException("Unable to rename " + src + " to " + + dst + ": couldn't create parent directory"); + } + } + + if (!src.renameTo(dst)) { + throw new IOException("Unable to rename " + src + " to " + dst); + } + } } // MapOutputBuffer /**