MAPREDUCE-6633. AM should retry map attempts if the reduce task encounters commpression related errors. Contributed by Rushabh Shah

This commit is contained in:
Eric Payne 2016-04-09 16:51:57 +00:00
parent b9e3eff62a
commit 1fec06e037
2 changed files with 38 additions and 1 deletions

View File

@ -537,7 +537,7 @@ private TaskAttemptID[] copyMapOutput(MapHost host,
+ " len: " + compressedLength + " to " + mapOutput.getDescription()); + " len: " + compressedLength + " to " + mapOutput.getDescription());
mapOutput.shuffle(host, is, compressedLength, decompressedLength, mapOutput.shuffle(host, is, compressedLength, decompressedLength,
metrics, reporter); metrics, reporter);
} catch (java.lang.InternalError e) { } catch (java.lang.InternalError | Exception e) {
LOG.warn("Failed to shuffle for fetcher#"+id, e); LOG.warn("Failed to shuffle for fetcher#"+id, e);
throw new IOException(e); throw new IOException(e);
} }

View File

@ -344,6 +344,43 @@ public void testCopyFromHostCompressFailure() throws Exception {
verify(ss, times(1)).copyFailed(map1ID, host, true, false); verify(ss, times(1)).copyFailed(map1ID, host, true, false);
} }
@SuppressWarnings("unchecked")
@Test(timeout=10000)
public void testCopyFromHostOnAnyException() throws Exception {
InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
r, metrics, except, key, connection);
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
when(connection.getResponseCode()).thenReturn(200);
when(connection.getHeaderField(
SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash);
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
header.write(new DataOutputStream(bout));
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
when(connection.getInputStream()).thenReturn(in);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
.thenReturn(immo);
doThrow(new ArrayIndexOutOfBoundsException()).when(immo)
.shuffle(any(MapHost.class), any(InputStream.class), anyLong(),
anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
underTest.copyFromHost(host);
verify(connection)
.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
encHash);
verify(ss, times(1)).copyFailed(map1ID, host, true, false);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test(timeout=10000) @Test(timeout=10000)
public void testCopyFromHostWithRetry() throws Exception { public void testCopyFromHostWithRetry() throws Exception {