diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2705eb196d..1d35a6ced2 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -160,6 +160,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5713. InputFormat and JobConf JavaDoc Fixes (Chen He via jeagles) + MAPREDUCE-5456. TestFetcher.testCopyFromHostExtraBytes is missing (Jason + Lowe via jeagles) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index 07b6f62593..3db382e4f4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -53,6 +53,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.IFileOutputStream; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -301,6 +302,56 @@ public void testCopyFromHostCompressFailure() throws Exception { verify(ss, times(1)).copyFailed(map1ID, host, true, false); } + @Test + public void testCopyFromHostExtraBytes() throws Exception { + Fetcher underTest = new FakeFetcher(job, id, ss, mm, + r, metrics, except, key, connection); + + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + when(connection.getHeaderField( + SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash); + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 14, 10, 1); + + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bout); + IFileOutputStream ios = new IFileOutputStream(dos); + header.write(dos); + ios.write("MAPDATA123".getBytes()); + ios.finish(); + + ShuffleHeader header2 = new ShuffleHeader(map2ID.toString(), 14, 10, 1); + IFileOutputStream ios2 = new IFileOutputStream(dos); + header2.write(dos); + ios2.write("MAPDATA456".getBytes()); + ios2.finish(); + + ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); + when(connection.getInputStream()).thenReturn(in); + // 8 < 10 therefore there appear to be extra bytes in the IFileInputStream + InMemoryMapOutput mapOut = new InMemoryMapOutput( + job, map1ID, mm, 8, null, true ); + InMemoryMapOutput mapOut2 = new InMemoryMapOutput( + job, map2ID, mm, 10, null, true ); + + when(mm.reserve(eq(map1ID), anyLong(), anyInt())).thenReturn(mapOut); + when(mm.reserve(eq(map2ID), anyLong(), anyInt())).thenReturn(mapOut2); + + underTest.copyFromHost(host); + + verify(allErrs).increment(1); + verify(ss).copyFailed(map1ID, host, true, false); + verify(ss, never()).copyFailed(map2ID, host, true, false); + + verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); + verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); + } + @Test(timeout=10000) public void testInterruptInMemory() throws Exception { final int FETCHER = 2;