From 8f701ae07a0b1dc70b8e1eb8d4a5c35c0a1e76da Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Thu, 6 Nov 2014 15:53:40 -0600 Subject: [PATCH] MAPREDUCE-5958. Wrong reduce task progress if map output is compressed. Contributed by Emilio Coppa and Jason Lowe. --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../java/org/apache/hadoop/mapred/Merger.java | 12 +-- .../mapreduce/task/reduce/TestMerger.java | 80 +++++++++++++------ 3 files changed, 64 insertions(+), 31 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index fd42f82c9a..573408e8f3 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -462,6 +462,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5960. JobSubmitter's check whether job.jar is local is incorrect with no authority in job jar path. (Gera Shegalov via jlowe) + MAPREDUCE-5958. Wrong reduce task progress if map output is compressed + (Emilio Coppa and jlowe via kihwal) + Release 2.5.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java index 92855169c8..b44e742305 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java @@ -515,9 +515,9 @@ public DataInputBuffer getValue() throws IOException { } private void adjustPriorityQueue(Segment reader) throws IOException{ - long startPos = reader.getPosition(); + long startPos = reader.getReader().bytesRead; boolean hasNext = reader.nextRawKey(); - long endPos = reader.getPosition(); + long endPos = reader.getReader().bytesRead; totalBytesProcessed += endPos - startPos; mergeProgress.set(totalBytesProcessed * progPerByte); if (hasNext) { @@ -543,7 +543,7 @@ public boolean next() throws IOException { } } minSegment = top(); - long startPos = minSegment.getPosition(); + long startPos = minSegment.getReader().bytesRead; key = minSegment.getKey(); if (!minSegment.inMemory()) { //When we load the value from an inmemory segment, we reset @@ -560,7 +560,7 @@ public boolean next() throws IOException { } else { minSegment.getValue(value); } - long endPos = minSegment.getPosition(); + long endPos = minSegment.getReader().bytesRead; totalBytesProcessed += endPos - startPos; mergeProgress.set(totalBytesProcessed * progPerByte); return true; @@ -638,9 +638,9 @@ RawKeyValueIterator merge(Class keyClass, Class valueClass, // Initialize the segment at the last possible moment; // this helps in ensuring we don't use buffers until we need them segment.init(readsCounter); - long startPos = segment.getPosition(); + long startPos = segment.getReader().bytesRead; boolean hasNext = segment.nextRawKey(); - long endPos = segment.getPosition(); + long endPos = segment.getReader().bytesRead; if (hasNext) { startBytes += endPos - startPos; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java index c5ab420b81..651dd38755 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java @@ -18,13 +18,12 @@ package org.apache.hadoop.mapreduce.task.reduce; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.doAnswer; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -32,9 +31,8 @@ import java.util.Map; import java.util.TreeMap; -import org.apache.hadoop.fs.FSDataInputStream; -import org.junit.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; @@ -43,14 +41,15 @@ import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.mapred.IFile.Reader; import org.apache.hadoop.mapred.IFile; +import org.apache.hadoop.mapred.IFile.Reader; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MROutputFiles; import org.apache.hadoop.mapred.Merger; import org.apache.hadoop.mapred.Merger.Segment; import org.apache.hadoop.mapred.RawKeyValueIterator; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -58,21 +57,17 @@ import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.security.TokenCache; -import org.apache.hadoop.mapreduce.CryptoUtils; -import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl; import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; -import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.google.common.collect.Lists; - public class TestMerger { private Configuration conf; @@ -254,7 +249,7 @@ public void testUncompressed() throws IOException { testMergeShouldReturnProperProgress(getUncompressedSegments()); } - @SuppressWarnings( { "deprecation", "unchecked" }) + @SuppressWarnings( { "unchecked" }) public void testMergeShouldReturnProperProgress( List> segments) throws IOException { Path tmpDir = new Path("localpath"); @@ -267,7 +262,38 @@ public void testMergeShouldReturnProperProgress( RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass, valueClass, segments, 2, tmpDir, comparator, getReporter(), readsCounter, writesCounter, mergePhase); - Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), 0.0f); + final float epsilon = 0.00001f; + + // Reading 6 keys total, 3 each in 2 segments, so each key read moves the + // progress forward 1/6th of the way. Initially the first keys from each + // segment have been read as part of the merge setup, so progress = 2/6. + Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon); + + // The first next() returns one of the keys already read during merge setup + Assert.assertTrue(mergeQueue.next()); + Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon); + + // Subsequent next() calls should read one key and move progress + Assert.assertTrue(mergeQueue.next()); + Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon); + Assert.assertTrue(mergeQueue.next()); + Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon); + + // At this point we've exhausted all of the keys in one segment + // so getting the next key will return the already cached key from the + // other segment + Assert.assertTrue(mergeQueue.next()); + Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon); + + // Subsequent next() calls should read one key and move progress + Assert.assertTrue(mergeQueue.next()); + Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon); + Assert.assertTrue(mergeQueue.next()); + Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon); + + // Now there should be no more input + Assert.assertFalse(mergeQueue.next()); + Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon); } private Progressable getReporter() { @@ -281,7 +307,7 @@ public void progress() { private List> getUncompressedSegments() throws IOException { List> segments = new ArrayList>(); - for (int i = 1; i < 1; i++) { + for (int i = 0; i < 2; i++) { segments.add(getUncompressedSegment(i)); } return segments; @@ -289,44 +315,51 @@ private List> getUncompressedSegments() throws IOException { private List> getCompressedSegments() throws IOException { List> segments = new ArrayList>(); - for (int i = 1; i < 1; i++) { + for (int i = 0; i < 2; i++) { segments.add(getCompressedSegment(i)); } return segments; } private Segment getUncompressedSegment(int i) throws IOException { - return new Segment(getReader(i), false); + return new Segment(getReader(i, false), false); } private Segment getCompressedSegment(int i) throws IOException { - return new Segment(getReader(i), false, 3000l); + return new Segment(getReader(i, true), false, 3000l); } @SuppressWarnings("unchecked") - private Reader getReader(int i) throws IOException { + private Reader getReader(int i, boolean isCompressedInput) + throws IOException { Reader readerMock = mock(Reader.class); + when(readerMock.getLength()).thenReturn(30l); when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn( 20l); when( readerMock.nextRawKey(any(DataInputBuffer.class))) - .thenAnswer(getKeyAnswer("Segment" + i)); + .thenAnswer(getKeyAnswer("Segment" + i, isCompressedInput)); doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue( any(DataInputBuffer.class)); return readerMock; } - private Answer getKeyAnswer(final String segmentName) { + private Answer getKeyAnswer(final String segmentName, + final boolean isCompressedInput) { return new Answer() { int i = 0; + @SuppressWarnings("unchecked") public Boolean answer(InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - DataInputBuffer key = (DataInputBuffer) args[0]; - if (i++ == 2) { + if (i++ == 3) { return false; } + Reader mock = (Reader) invocation.getMock(); + int multiplier = isCompressedInput ? 100 : 1; + mock.bytesRead += 10 * multiplier; + Object[] args = invocation.getArguments(); + DataInputBuffer key = (DataInputBuffer) args[0]; key.reset(("Segment Key " + segmentName + i).getBytes(), 20); return true; } @@ -340,9 +373,6 @@ private Answer getValueAnswer(final String segmentName) { public Void answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); DataInputBuffer key = (DataInputBuffer) args[0]; - if (i++ == 2) { - return null; - } key.reset(("Segment Value " + segmentName + i).getBytes(), 20); return null; }