diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 8d4e41ceb1..efdd1071d6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -275,6 +275,9 @@ Release 2.0.3-alpha - Unreleased MAPREDUCE-4803. Remove duplicate copy of TestIndexCache. (Mariappan Asokan via sseth) + MAPREDUCE-2264. Job status exceeds 100% in some cases. + (devaraj.k and sandyr via tucu) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java index 484bd89cd4..d007470765 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java @@ -218,6 +218,7 @@ public class Merger { CompressionCodec codec = null; long segmentOffset = 0; long segmentLength = -1; + long rawDataLength = -1; Counters.Counter mapOutputsCounter = null; @@ -234,6 +235,15 @@ public class Merger { this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve, mergedMapOutputsCounter); } + + public Segment(Configuration conf, FileSystem fs, Path file, + CompressionCodec codec, boolean preserve, + Counters.Counter mergedMapOutputsCounter, long rawDataLength) + throws IOException { + this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve, + mergedMapOutputsCounter); + this.rawDataLength = rawDataLength; + } public Segment(Configuration conf, FileSystem fs, Path file, long segmentOffset, long segmentLength, @@ -261,6 +271,11 @@ public class Merger { public Segment(Reader reader, boolean preserve) { this(reader, preserve, null); } + + public Segment(Reader reader, boolean preserve, long rawDataLength) { + this(reader, preserve, null); + this.rawDataLength = rawDataLength; + } public Segment(Reader reader, boolean preserve, Counters.Counter mapOutputsCounter) { @@ -300,6 +315,10 @@ public class Merger { segmentLength : reader.getLength(); } + public long getRawDataLength() { + return (rawDataLength > 0) ? rawDataLength : getLength(); + } + boolean nextRawKey() throws IOException { return reader.nextRawKey(key); } @@ -633,7 +652,7 @@ public class Merger { totalBytesProcessed = 0; totalBytes = 0; for (int i = 0; i < segmentsToMerge.size(); i++) { - totalBytes += segmentsToMerge.get(i).getLength(); + totalBytes += segmentsToMerge.get(i).getRawDataLength(); } } if (totalBytes != 0) //being paranoid @@ -702,7 +721,7 @@ public class Merger { // size will match(almost) if combiner is not called in merge. long inputBytesOfThisMerge = totalBytesProcessed - bytesProcessedInPrevMerges; - totalBytes -= inputBytesOfThisMerge - tempSegment.getLength(); + totalBytes -= inputBytesOfThisMerge - tempSegment.getRawDataLength(); if (totalBytes != 0) { progPerByte = 1.0f / (float)totalBytes; } @@ -768,7 +787,7 @@ public class Merger { for (int i = 0; i < numSegments; i++) { // Not handling empty segments here assuming that it would not affect // much in calculation of mergeProgress. - segmentSizes.add(segments.get(i).getLength()); + segmentSizes.add(segments.get(i).getRawDataLength()); } // If includeFinalMerge is true, allow the following while loop iterate diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index 007897f17f..3a82555e18 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -89,7 +89,7 @@ public class MergeManagerImpl implements MergeManager { new TreeSet>(new MapOutputComparator()); private final MergeThread, K,V> inMemoryMerger; - Set onDiskMapOutputs = new TreeSet(); + Set onDiskMapOutputs = new TreeSet(); private final OnDiskMerger onDiskMerger; private final long memoryLimit; @@ -336,7 +336,7 @@ public class MergeManagerImpl implements MergeManager { inMemoryMergedMapOutputs.size()); } - public synchronized void closeOnDiskFile(Path file) { + public synchronized void closeOnDiskFile(CompressAwarePath file) { onDiskMapOutputs.add(file); if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) { @@ -356,7 +356,7 @@ public class MergeManagerImpl implements MergeManager { List> memory = new ArrayList>(inMemoryMergedMapOutputs); memory.addAll(inMemoryMapOutputs); - List disk = new ArrayList(onDiskMapOutputs); + List disk = new ArrayList(onDiskMapOutputs); return finalMerge(jobConf, rfs, memory, disk); } @@ -456,6 +456,7 @@ public class MergeManagerImpl implements MergeManager { codec, null); RawKeyValueIterator rIter = null; + CompressAwarePath compressAwarePath; try { LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments..."); @@ -474,6 +475,8 @@ public class MergeManagerImpl implements MergeManager { combineCollector.setWriter(writer); combineAndSpill(rIter, reduceCombineInputCounter); } + compressAwarePath = new CompressAwarePath(outputPath, + writer.getRawLength()); writer.close(); LOG.info(reduceId + @@ -489,12 +492,12 @@ public class MergeManagerImpl implements MergeManager { } // Note the output of the merge - closeOnDiskFile(outputPath); + closeOnDiskFile(compressAwarePath); } } - private class OnDiskMerger extends MergeThread { + private class OnDiskMerger extends MergeThread { public OnDiskMerger(MergeManagerImpl manager) { super(manager, Integer.MAX_VALUE, exceptionReporter); @@ -503,7 +506,7 @@ public class MergeManagerImpl implements MergeManager { } @Override - public void merge(List inputs) throws IOException { + public void merge(List inputs) throws IOException { // sanity check if (inputs == null || inputs.isEmpty()) { LOG.info("No ondisk files to merge..."); @@ -518,7 +521,7 @@ public class MergeManagerImpl implements MergeManager { " map outputs on disk. Triggering merge..."); // 1. Prepare the list of files to be merged. - for (Path file : inputs) { + for (CompressAwarePath file : inputs) { approxOutputSize += localFS.getFileStatus(file).getLen(); } @@ -536,6 +539,7 @@ public class MergeManagerImpl implements MergeManager { (Class) jobConf.getMapOutputValueClass(), codec, null); RawKeyValueIterator iter = null; + CompressAwarePath compressAwarePath; Path tmpDir = new Path(reduceId.toString()); try { iter = Merger.merge(jobConf, rfs, @@ -548,13 +552,15 @@ public class MergeManagerImpl implements MergeManager { mergedMapOutputsCounter, null); Merger.writeFile(iter, writer, reporter, jobConf); + compressAwarePath = new CompressAwarePath(outputPath, + writer.getRawLength()); writer.close(); } catch (IOException e) { localFS.delete(outputPath, true); throw e; } - closeOnDiskFile(outputPath); + closeOnDiskFile(compressAwarePath); LOG.info(reduceId + " Finished merging " + inputs.size() + @@ -653,7 +659,7 @@ public class MergeManagerImpl implements MergeManager { private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, List> inMemoryMapOutputs, - List onDiskMapOutputs + List onDiskMapOutputs ) throws IOException { LOG.info("finalMerge called with " + inMemoryMapOutputs.size() + " in-memory map-outputs and " + @@ -712,7 +718,8 @@ public class MergeManagerImpl implements MergeManager { try { Merger.writeFile(rIter, writer, reporter, job); // add to list of final disk outputs. - onDiskMapOutputs.add(outputPath); + onDiskMapOutputs.add(new CompressAwarePath(outputPath, + writer.getRawLength())); } catch (IOException e) { if (null != outputPath) { try { @@ -742,15 +749,19 @@ public class MergeManagerImpl implements MergeManager { // segments on disk List> diskSegments = new ArrayList>(); long onDiskBytes = inMemToDiskBytes; - Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]); - for (Path file : onDisk) { - onDiskBytes += fs.getFileStatus(file).getLen(); - LOG.debug("Disk file: " + file + " Length is " + - fs.getFileStatus(file).getLen()); + long rawBytes = inMemToDiskBytes; + CompressAwarePath[] onDisk = onDiskMapOutputs.toArray( + new CompressAwarePath[onDiskMapOutputs.size()]); + for (CompressAwarePath file : onDisk) { + long fileLength = fs.getFileStatus(file).getLen(); + onDiskBytes += fileLength; + rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength; + + LOG.debug("Disk file: " + file + " Length is " + fileLength); diskSegments.add(new Segment(job, fs, file, codec, keepInputs, (file.toString().endsWith( Task.MERGED_OUTPUT_PREFIX) ? - null : mergedMapOutputsCounter) + null : mergedMapOutputsCounter), file.getRawDataLength() )); } LOG.info("Merging " + onDisk.length + " files, " + @@ -786,7 +797,7 @@ public class MergeManagerImpl implements MergeManager { return diskMerge; } finalSegments.add(new Segment( - new RawKVIteratorReader(diskMerge, onDiskBytes), true)); + new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes)); } return Merger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, @@ -794,4 +805,27 @@ public class MergeManagerImpl implements MergeManager { null); } + + static class CompressAwarePath extends Path { + private long rawDataLength; + + public CompressAwarePath(Path path, long rawDataLength) { + super(path.toUri()); + this.rawDataLength = rawDataLength; + } + + public long getRawDataLength() { + return rawDataLength; + } + + @Override + public boolean equals(Object other) { + return super.equals(other); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java index 2cb86449e5..bf69798c12 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java @@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; @InterfaceAudience.Private @InterfaceStability.Unstable @@ -112,7 +113,9 @@ class OnDiskMapOutput extends MapOutput { @Override public void commit() throws IOException { localFS.rename(tmpOutputPath, outputPath); - merger.closeOnDiskFile(outputPath); + CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath, + getSize()); + merger.closeOnDiskFile(compressAwarePath); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java new file mode 100644 index 0000000000..41a1848c35 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.reduce; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doAnswer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.IFile.Reader; +import org.apache.hadoop.mapred.IFile; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MROutputFiles; +import org.apache.hadoop.mapred.Merger; +import org.apache.hadoop.mapred.Merger.Segment; +import org.apache.hadoop.mapred.RawKeyValueIterator; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.util.Progressable; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestMerger { + + private Configuration conf; + private JobConf jobConf; + private FileSystem fs; + + @Before + public void setup() throws IOException { + conf = new Configuration(); + jobConf = new JobConf(); + fs = FileSystem.getLocal(conf); + } + + @After + public void cleanup() throws IOException { + fs.delete(new Path(jobConf.getLocalDirs()[0]), true); + } + + @Test + public void testInMemoryMerger() throws IOException { + JobID jobId = new JobID("a", 0); + TaskAttemptID reduceId = new TaskAttemptID( + new TaskID(jobId, TaskType.REDUCE, 0), 0); + TaskAttemptID mapId1 = new TaskAttemptID( + new TaskID(jobId, TaskType.MAP, 1), 0); + TaskAttemptID mapId2 = new TaskAttemptID( + new TaskID(jobId, TaskType.MAP, 2), 0); + + LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR); + + MergeManagerImpl mergeManager = new MergeManagerImpl( + reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, + null, null, new Progress(), new MROutputFiles()); + + // write map outputs + Map map1 = new TreeMap(); + map1.put("apple", "disgusting"); + map1.put("carrot", "delicious"); + Map map2 = new TreeMap(); + map1.put("banana", "pretty good"); + byte[] mapOutputBytes1 = writeMapOutput(conf, map1); + byte[] mapOutputBytes2 = writeMapOutput(conf, map2); + InMemoryMapOutput mapOutput1 = new InMemoryMapOutput( + conf, mapId1, mergeManager, mapOutputBytes1.length, null, true); + InMemoryMapOutput mapOutput2 = new InMemoryMapOutput( + conf, mapId2, mergeManager, mapOutputBytes2.length, null, true); + System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0, + mapOutputBytes1.length); + System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0, + mapOutputBytes2.length); + + // create merger and run merge + MergeThread, Text, Text> inMemoryMerger = + mergeManager.createInMemoryMerger(); + List> mapOutputs = + new ArrayList>(); + mapOutputs.add(mapOutput1); + mapOutputs.add(mapOutput2); + + inMemoryMerger.merge(mapOutputs); + + Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); + Path outPath = mergeManager.onDiskMapOutputs.iterator().next(); + + List keys = new ArrayList(); + List values = new ArrayList(); + readOnDiskMapOutput(conf, fs, outPath, keys, values); + Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot")); + Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious")); + } + + private byte[] writeMapOutput(Configuration conf, Map keysToValues) + throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + FSDataOutputStream fsdos = new FSDataOutputStream(baos, null); + IFile.Writer writer = new IFile.Writer(conf, fsdos, + Text.class, Text.class, null, null); + for (String key : keysToValues.keySet()) { + String value = keysToValues.get(key); + writer.append(new Text(key), new Text(value)); + } + writer.close(); + return baos.toByteArray(); + } + + private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path, + List keys, List values) throws IOException { + IFile.Reader reader = new IFile.Reader(conf, fs, + path, null, null); + DataInputBuffer keyBuff = new DataInputBuffer(); + DataInputBuffer valueBuff = new DataInputBuffer(); + Text key = new Text(); + Text value = new Text(); + while (reader.nextRawKey(keyBuff)) { + key.readFields(keyBuff); + keys.add(key.toString()); + reader.nextRawValue(valueBuff); + value.readFields(valueBuff); + values.add(value.toString()); + } + } + + @Test + public void testCompressed() throws IOException { + testMergeShouldReturnProperProgress(getCompressedSegments()); + } + + @Test + public void testUncompressed() throws IOException { + testMergeShouldReturnProperProgress(getUncompressedSegments()); + } + + @SuppressWarnings( { "deprecation", "unchecked" }) + public void testMergeShouldReturnProperProgress( + List> segments) throws IOException { + Path tmpDir = new Path("localpath"); + Class keyClass = (Class) jobConf.getMapOutputKeyClass(); + Class valueClass = (Class) jobConf.getMapOutputValueClass(); + RawComparator comparator = jobConf.getOutputKeyComparator(); + Counter readsCounter = new Counter(); + Counter writesCounter = new Counter(); + Progress mergePhase = new Progress(); + RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass, + valueClass, segments, 2, tmpDir, comparator, getReporter(), + readsCounter, writesCounter, mergePhase); + Assert.assertEquals(1.0f, mergeQueue.getProgress().get()); + } + + private Progressable getReporter() { + Progressable reporter = new Progressable() { + @Override + public void progress() { + } + }; + return reporter; + } + + private List> getUncompressedSegments() throws IOException { + List> segments = new ArrayList>(); + for (int i = 1; i < 1; i++) { + segments.add(getUncompressedSegment(i)); + } + return segments; + } + + private List> getCompressedSegments() throws IOException { + List> segments = new ArrayList>(); + for (int i = 1; i < 1; i++) { + segments.add(getCompressedSegment(i)); + } + return segments; + } + + private Segment getUncompressedSegment(int i) throws IOException { + return new Segment(getReader(i), false); + } + + private Segment getCompressedSegment(int i) throws IOException { + return new Segment(getReader(i), false, 3000l); + } + + @SuppressWarnings("unchecked") + private Reader getReader(int i) throws IOException { + Reader readerMock = mock(Reader.class); + when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn( + 20l); + when( + readerMock.nextRawKey(any(DataInputBuffer.class))) + .thenAnswer(getKeyAnswer("Segment" + i)); + doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue( + any(DataInputBuffer.class)); + + return readerMock; + } + + private Answer getKeyAnswer(final String segmentName) { + return new Answer() { + int i = 0; + + public Boolean answer(InvocationOnMock invocation) { + Object[] args = invocation.getArguments(); + DataInputBuffer key = (DataInputBuffer) args[0]; + if (i++ == 2) { + return false; + } + key.reset(("Segment Key " + segmentName + i).getBytes(), 20); + return true; + } + }; + } + + private Answer getValueAnswer(final String segmentName) { + return new Answer() { + int i = 0; + + public Void answer(InvocationOnMock invocation) { + Object[] args = invocation.getArguments(); + DataInputBuffer key = (DataInputBuffer) args[0]; + if (i++ == 2) { + return null; + } + key.reset(("Segment Value " + segmentName + i).getBytes(), 20); + return null; + } + }; + } +} \ No newline at end of file