From 57803245ecc806249fcd0cd5fb3ca593098ac877 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Tue, 12 Mar 2013 22:51:28 +0000 Subject: [PATCH] MAPREDUCE-5060. Fetch failures that time out only count against the first map task. Contributed by Robert Joseph Evans git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1455740 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/task/reduce/Fetcher.java | 16 ++---- .../mapreduce/task/reduce/TestFetcher.java | 49 +++++++++++++++++++ 3 files changed, 55 insertions(+), 13 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 54d4ac7292..76fd2b0a8e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -807,6 +807,9 @@ Release 0.23.7 - UNRELEASED MAPREDUCE-5023. History Server Web Services missing Job Counters (Ravi Prakash via tgraves) + MAPREDUCE-5060. Fetch failures that time out only count against the first + map task (Robert Joseph Evans via jlowe) + Release 0.23.6 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index e35cb6cdcf..c3a05304cc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -221,7 +221,6 @@ protected void copyFromHost(MapHost host) throws IOException { // Construct the url and connect DataInputStream input; - boolean connectSucceeded = false; try { URL url = getMapOutputURL(host, maps); @@ -237,7 +236,6 @@ protected void copyFromHost(MapHost host) throws IOException { // set the read timeout connection.setReadTimeout(readTimeout); connect(connection, connectionTimeout); - connectSucceeded = true; input = new DataInputStream(connection.getInputStream()); // Validate response code @@ -265,18 +263,10 @@ protected void copyFromHost(MapHost host) throws IOException { // If connect did not succeed, just mark all the maps as failed, // indirectly penalizing the host - if (!connectSucceeded) { - for(TaskAttemptID left: remaining) { - scheduler.copyFailed(left, host, connectSucceeded, connectExcpt); - } - } else { - // If we got a read error at this stage, it implies there was a problem - // with the first map, typically lost map. So, penalize only that map - // and add the rest - TaskAttemptID firstMap = maps.get(0); - scheduler.copyFailed(firstMap, host, connectSucceeded, connectExcpt); + for(TaskAttemptID left: remaining) { + scheduler.copyFailed(left, host, false, connectExcpt); } - + // Add back all the remaining maps, WITHOUT marking them as failed for(TaskAttemptID left: remaining) { scheduler.putBackKnownMapOutput(host, left); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index db4308fc99..ee9a18a6b9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -26,6 +26,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.net.HttpURLConnection; +import java.net.SocketTimeoutException; import java.net.URL; import java.util.ArrayList; @@ -70,6 +71,54 @@ protected HttpURLConnection openConnection(URL url) throws IOException { } } + @SuppressWarnings("unchecked") + @Test(timeout=30000) + public void testCopyFromHostConnectionTimeout() throws Exception { + LOG.info("testCopyFromHostConnectionTimeout"); + JobConf job = new JobConf(); + TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1"); + ShuffleScheduler ss = mock(ShuffleScheduler.class); + MergeManagerImpl mm = mock(MergeManagerImpl.class); + Reporter r = mock(Reporter.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); + ExceptionReporter except = mock(ExceptionReporter.class); + SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0}); + HttpURLConnection connection = mock(HttpURLConnection.class); + when(connection.getInputStream()).thenThrow( + new SocketTimeoutException("This is a fake timeout :)")); + + Counters.Counter allErrs = mock(Counters.Counter.class); + when(r.getCounter(anyString(), anyString())) + .thenReturn(allErrs); + + Fetcher underTest = new FakeFetcher(job, id, ss, mm, + r, metrics, except, key, connection); + + MapHost host = new MapHost("localhost", "http://localhost:8080/"); + + ArrayList maps = new ArrayList(1); + TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1"); + maps.add(map1ID); + TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1"); + maps.add(map2ID); + when(ss.getMapsForHost(host)).thenReturn(maps); + + String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg="; + + underTest.copyFromHost(host); + + verify(connection) + .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, + encHash); + + verify(allErrs).increment(1); + verify(ss).copyFailed(map1ID, host, false, false); + verify(ss).copyFailed(map2ID, host, false, false); + + verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); + verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); + } + @SuppressWarnings("unchecked") @Test public void testCopyFromHostBogusHeader() throws Exception {