diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2134d43e67..73168486e6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -189,6 +189,9 @@ Release 2.0.4-beta - UNRELEASED MAPREDUCE-4951. Container preemption interpreted as task failure. (Sandy Ryza via tomwhite) + MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER. + (Sandy Ryza via tomwhite) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index 3a82555e18..138ea43fdd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -475,9 +475,9 @@ public void merge(List> inputs) throws IOException { combineCollector.setWriter(writer); combineAndSpill(rIter, reduceCombineInputCounter); } + writer.close(); compressAwarePath = new CompressAwarePath(outputPath, writer.getRawLength()); - writer.close(); LOG.info(reduceId + " Merge of the " + noInMemorySegments + @@ -552,9 +552,9 @@ public void merge(List inputs) throws IOException { mergedMapOutputsCounter, null); Merger.writeFile(iter, writer, reporter, jobConf); + writer.close(); compressAwarePath = new CompressAwarePath(outputPath, writer.getRawLength()); - writer.close(); } catch (IOException e) { localFS.delete(outputPath, true); throw e; @@ -713,13 +713,15 @@ private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, reporter, spilledRecordsCounter, null, mergePhase); - final Writer writer = new Writer(job, fs, outputPath, + Writer writer = new Writer(job, fs, outputPath, keyClass, valueClass, codec, null); try { Merger.writeFile(rIter, writer, reporter, job); - // add to list of final disk outputs. + writer.close(); onDiskMapOutputs.add(new CompressAwarePath(outputPath, writer.getRawLength())); + writer = null; + // add to list of final disk outputs. } catch (IOException e) { if (null != outputPath) { try {