MAPREDUCE-6303. Read timeout when retrying a fetch error can be fatal to a reducer. Contributed by Jason Lowe.
This commit is contained in:
parent
96649c38f9
commit
eccb7d46ef
@ -513,6 +513,9 @@ Release 2.7.0 - UNRELEASED
|
||||
MAPREDUCE-6285. ClientServiceDelegate should not retry upon
|
||||
AuthenticationException. (Jonathan Eagles via ozawa)
|
||||
|
||||
MAPREDUCE-6303. Read timeout when retrying a fetch error can be fatal
|
||||
to a reducer. (Jason Lowe via junping_du)
|
||||
|
||||
Release 2.6.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -258,6 +258,39 @@ private void abortConnect(MapHost host, Set<TaskAttemptID> remaining) {
|
||||
closeConnection();
|
||||
}
|
||||
|
||||
private DataInputStream openShuffleUrl(MapHost host,
|
||||
Set<TaskAttemptID> remaining, URL url) {
|
||||
DataInputStream input = null;
|
||||
|
||||
try {
|
||||
setupConnectionsWithRetry(host, remaining, url);
|
||||
if (stopped) {
|
||||
abortConnect(host, remaining);
|
||||
} else {
|
||||
input = new DataInputStream(connection.getInputStream());
|
||||
}
|
||||
} catch (IOException ie) {
|
||||
boolean connectExcpt = ie instanceof ConnectException;
|
||||
ioErrs.increment(1);
|
||||
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
|
||||
" map outputs", ie);
|
||||
|
||||
// If connect did not succeed, just mark all the maps as failed,
|
||||
// indirectly penalizing the host
|
||||
scheduler.hostFailed(host.getHostName());
|
||||
for(TaskAttemptID left: remaining) {
|
||||
scheduler.copyFailed(left, host, false, connectExcpt);
|
||||
}
|
||||
|
||||
// Add back all the remaining maps, WITHOUT marking them as failed
|
||||
for(TaskAttemptID left: remaining) {
|
||||
scheduler.putBackKnownMapOutput(host, left);
|
||||
}
|
||||
}
|
||||
|
||||
return input;
|
||||
}
|
||||
|
||||
/**
|
||||
* The crux of the matter...
|
||||
*
|
||||
@ -286,37 +319,11 @@ protected void copyFromHost(MapHost host) throws IOException {
|
||||
Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);
|
||||
|
||||
// Construct the url and connect
|
||||
DataInputStream input = null;
|
||||
URL url = getMapOutputURL(host, maps);
|
||||
try {
|
||||
setupConnectionsWithRetry(host, remaining, url);
|
||||
|
||||
if (stopped) {
|
||||
abortConnect(host, remaining);
|
||||
DataInputStream input = openShuffleUrl(host, remaining, url);
|
||||
if (input == null) {
|
||||
return;
|
||||
}
|
||||
} catch (IOException ie) {
|
||||
boolean connectExcpt = ie instanceof ConnectException;
|
||||
ioErrs.increment(1);
|
||||
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
|
||||
" map outputs", ie);
|
||||
|
||||
// If connect did not succeed, just mark all the maps as failed,
|
||||
// indirectly penalizing the host
|
||||
scheduler.hostFailed(host.getHostName());
|
||||
for(TaskAttemptID left: remaining) {
|
||||
scheduler.copyFailed(left, host, false, connectExcpt);
|
||||
}
|
||||
|
||||
// Add back all the remaining maps, WITHOUT marking them as failed
|
||||
for(TaskAttemptID left: remaining) {
|
||||
scheduler.putBackKnownMapOutput(host, left);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
input = new DataInputStream(connection.getInputStream());
|
||||
|
||||
try {
|
||||
// Loop through available map-outputs and fetch them
|
||||
@ -333,14 +340,10 @@ protected void copyFromHost(MapHost host) throws IOException {
|
||||
connection.disconnect();
|
||||
// Get map output from remaining tasks only.
|
||||
url = getMapOutputURL(host, remaining);
|
||||
|
||||
// Connect with retry as expecting host's recovery take sometime.
|
||||
setupConnectionsWithRetry(host, remaining, url);
|
||||
if (stopped) {
|
||||
abortConnect(host, remaining);
|
||||
input = openShuffleUrl(host, remaining, url);
|
||||
if (input == null) {
|
||||
return;
|
||||
}
|
||||
input = new DataInputStream(connection.getInputStream());
|
||||
}
|
||||
}
|
||||
|
||||
@ -591,7 +594,7 @@ private void checkTimeoutOrRetry(MapHost host, IOException ioe)
|
||||
// Retry is not timeout, let's do retry with throwing an exception.
|
||||
if (currentTime - retryStartTime < this.fetchRetryTimeout) {
|
||||
LOG.warn("Shuffle output from " + host.getHostName() +
|
||||
" failed, retry it.");
|
||||
" failed, retry it.", ioe);
|
||||
throw ioe;
|
||||
} else {
|
||||
// timeout, prepare to be failed.
|
||||
|
@ -388,6 +388,39 @@ public Void answer(InvocationOnMock ignore) throws IOException {
|
||||
anyBoolean(), anyBoolean());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test(timeout=10000)
|
||||
public void testCopyFromHostWithRetryThenTimeout() throws Exception {
|
||||
InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
|
||||
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry,
|
||||
id, ss, mm, r, metrics, except, key, connection);
|
||||
|
||||
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
|
||||
|
||||
when(connection.getResponseCode()).thenReturn(200)
|
||||
.thenThrow(new SocketTimeoutException("forced timeout"));
|
||||
when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
|
||||
.thenReturn(replyHash);
|
||||
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
|
||||
ByteArrayOutputStream bout = new ByteArrayOutputStream();
|
||||
header.write(new DataOutputStream(bout));
|
||||
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
|
||||
when(connection.getInputStream()).thenReturn(in);
|
||||
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
|
||||
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
|
||||
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
|
||||
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
|
||||
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
|
||||
.thenReturn(immo);
|
||||
doThrow(new IOException("forced error")).when(immo).shuffle(
|
||||
any(MapHost.class), any(InputStream.class), anyLong(),
|
||||
anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
|
||||
|
||||
underTest.copyFromHost(host);
|
||||
verify(allErrs).increment(1);
|
||||
verify(ss).copyFailed(map1ID, host, false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCopyFromHostExtraBytes() throws Exception {
|
||||
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
|
||||
|
Loading…
Reference in New Issue
Block a user