diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f5d2d1a3d0..160c49c20c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -513,6 +513,9 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-6285. ClientServiceDelegate should not retry upon AuthenticationException. (Jonathan Eagles via ozawa) + MAPREDUCE-6303. Read timeout when retrying a fetch error can be fatal + to a reducer. (Jason Lowe via junping_du) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 3f408531b2..d867e4b406 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -258,6 +258,39 @@ private void abortConnect(MapHost host, Set remaining) { closeConnection(); } + private DataInputStream openShuffleUrl(MapHost host, + Set remaining, URL url) { + DataInputStream input = null; + + try { + setupConnectionsWithRetry(host, remaining, url); + if (stopped) { + abortConnect(host, remaining); + } else { + input = new DataInputStream(connection.getInputStream()); + } + } catch (IOException ie) { + boolean connectExcpt = ie instanceof ConnectException; + ioErrs.increment(1); + LOG.warn("Failed to connect to " + host + " with " + remaining.size() + + " map outputs", ie); + + // If connect did not succeed, just mark all the maps as failed, + // indirectly penalizing the host + scheduler.hostFailed(host.getHostName()); + for(TaskAttemptID left: remaining) { + scheduler.copyFailed(left, host, false, connectExcpt); + } + + // Add back all the remaining maps, WITHOUT marking them as failed + for(TaskAttemptID left: remaining) { + scheduler.putBackKnownMapOutput(host, left); + } + } + + return input; + } + /** * The crux of the matter... * @@ -286,38 +319,12 @@ protected void copyFromHost(MapHost host) throws IOException { Set remaining = new HashSet(maps); // Construct the url and connect - DataInputStream input = null; URL url = getMapOutputURL(host, maps); - try { - setupConnectionsWithRetry(host, remaining, url); - - if (stopped) { - abortConnect(host, remaining); - return; - } - } catch (IOException ie) { - boolean connectExcpt = ie instanceof ConnectException; - ioErrs.increment(1); - LOG.warn("Failed to connect to " + host + " with " + remaining.size() + - " map outputs", ie); - - // If connect did not succeed, just mark all the maps as failed, - // indirectly penalizing the host - scheduler.hostFailed(host.getHostName()); - for(TaskAttemptID left: remaining) { - scheduler.copyFailed(left, host, false, connectExcpt); - } - - // Add back all the remaining maps, WITHOUT marking them as failed - for(TaskAttemptID left: remaining) { - scheduler.putBackKnownMapOutput(host, left); - } - + DataInputStream input = openShuffleUrl(host, remaining, url); + if (input == null) { return; } - input = new DataInputStream(connection.getInputStream()); - try { // Loop through available map-outputs and fetch them // On any error, faildTasks is not null and we exit @@ -333,14 +340,10 @@ protected void copyFromHost(MapHost host) throws IOException { connection.disconnect(); // Get map output from remaining tasks only. url = getMapOutputURL(host, remaining); - - // Connect with retry as expecting host's recovery take sometime. - setupConnectionsWithRetry(host, remaining, url); - if (stopped) { - abortConnect(host, remaining); + input = openShuffleUrl(host, remaining, url); + if (input == null) { return; } - input = new DataInputStream(connection.getInputStream()); } } @@ -591,7 +594,7 @@ private void checkTimeoutOrRetry(MapHost host, IOException ioe) // Retry is not timeout, let's do retry with throwing an exception. if (currentTime - retryStartTime < this.fetchRetryTimeout) { LOG.warn("Shuffle output from " + host.getHostName() + - " failed, retry it."); + " failed, retry it.", ioe); throw ioe; } else { // timeout, prepare to be failed. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index 929c0ae335..723df17535 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -388,6 +388,39 @@ public Void answer(InvocationOnMock ignore) throws IOException { anyBoolean(), anyBoolean()); } + @SuppressWarnings("unchecked") + @Test(timeout=10000) + public void testCopyFromHostWithRetryThenTimeout() throws Exception { + InMemoryMapOutput immo = mock(InMemoryMapOutput.class); + Fetcher underTest = new FakeFetcher(jobWithRetry, + id, ss, mm, r, metrics, except, key, connection); + + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200) + .thenThrow(new SocketTimeoutException("forced timeout")); + when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) + .thenReturn(replyHash); + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + header.write(new DataOutputStream(bout)); + ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); + when(connection.getInputStream()).thenReturn(in); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) + .thenReturn(immo); + doThrow(new IOException("forced error")).when(immo).shuffle( + any(MapHost.class), any(InputStream.class), anyLong(), + anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class)); + + underTest.copyFromHost(host); + verify(allErrs).increment(1); + verify(ss).copyFailed(map1ID, host, false, false); + } + @Test public void testCopyFromHostExtraBytes() throws Exception { Fetcher underTest = new FakeFetcher(job, id, ss, mm,