From d45922de2c5645e11339b94e4c31935ead66fefc Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 20 Jul 2012 20:18:48 +0000 Subject: [PATCH] svn merge --change -1363454 for reverting MAPREDUCE-4423 git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1363935 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 - .../hadoop/mapreduce/task/reduce/Fetcher.java | 62 ++++----- .../mapreduce/task/reduce/TestFetcher.java | 121 ------------------ 3 files changed, 22 insertions(+), 164 deletions(-) delete mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5df9c136f7..8639a6f762 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -739,9 +739,6 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4448. Fix NM crash during app cleanup if aggregation didn't init. (Jason Lowe via daryn) - MAPREDUCE-4423. Potential infinite fetching of map output (Robert Evans - via tgraves) - Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 7852da93a5..f3e7fd61c2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -49,8 +49,7 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; -import com.google.common.annotations.VisibleForTesting; - +@SuppressWarnings({"deprecation"}) class Fetcher extends Thread { private static final Log LOG = LogFactory.getLog(Fetcher.class); @@ -176,18 +175,13 @@ public void shutDown() throws InterruptedException { } } - @VisibleForTesting - protected HttpURLConnection openConnection(URL url) throws IOException { - return (HttpURLConnection)url.openConnection(); - } - /** * The crux of the matter... * * @param host {@link MapHost} from which we need to * shuffle available map-outputs. */ - protected void copyFromHost(MapHost host) throws IOException { + private void copyFromHost(MapHost host) throws IOException { // Get completed maps on 'host' List maps = scheduler.getMapsForHost(host); @@ -197,11 +191,9 @@ protected void copyFromHost(MapHost host) throws IOException { return; } - if(LOG.isDebugEnabled()) { - LOG.debug("Fetcher " + id + " going to fetch from " + host); - for (TaskAttemptID tmp: maps) { - LOG.debug(tmp); - } + LOG.debug("Fetcher " + id + " going to fetch from " + host); + for (TaskAttemptID tmp: maps) { + LOG.debug(tmp); } // List of maps to be fetched yet @@ -213,7 +205,7 @@ protected void copyFromHost(MapHost host) throws IOException { try { URL url = getMapOutputURL(host, maps); - HttpURLConnection connection = openConnection(url); + HttpURLConnection connection = (HttpURLConnection)url.openConnection(); // generate hash of the url String msgToEncode = SecureShuffleUtils.buildMsgFrom(url); @@ -274,24 +266,17 @@ protected void copyFromHost(MapHost host) throws IOException { try { // Loop through available map-outputs and fetch them - // On any error, faildTasks is not null and we exit - // after putting back the remaining maps to the - // yet_to_be_fetched list and marking the failed tasks. - TaskAttemptID[] failedTasks = null; - while (!remaining.isEmpty() && failedTasks == null) { - failedTasks = copyMapOutput(host, input, remaining); - } - - if(failedTasks != null) { - for(TaskAttemptID left: failedTasks) { - scheduler.copyFailed(left, host, true); - } + // On any error, good becomes false and we exit after putting back + // the remaining maps to the yet_to_be_fetched list + boolean good = true; + while (!remaining.isEmpty() && good) { + good = copyMapOutput(host, input, remaining); } IOUtils.cleanup(LOG, input); // Sanity check - if (failedTasks == null && !remaining.isEmpty()) { + if (good && !remaining.isEmpty()) { throw new IOException("server didn't return all expected map outputs: " + remaining.size() + " left."); } @@ -300,9 +285,10 @@ protected void copyFromHost(MapHost host) throws IOException { scheduler.putBackKnownMapOutput(host, left); } } - } + + } - private TaskAttemptID[] copyMapOutput(MapHost host, + private boolean copyMapOutput(MapHost host, DataInputStream input, Set remaining) { MapOutput mapOutput = null; @@ -324,15 +310,14 @@ private TaskAttemptID[] copyMapOutput(MapHost host, } catch (IllegalArgumentException e) { badIdErrs.increment(1); LOG.warn("Invalid map id ", e); - //Don't know which one was bad, so consider all of them as bad - return remaining.toArray(new TaskAttemptID[remaining.size()]); + return false; } // Do some basic sanity verification if (!verifySanity(compressedLength, decompressedLength, forReduce, remaining, mapId)) { - return new TaskAttemptID[] {mapId}; + return false; } LOG.debug("header: " + mapId + ", len: " + compressedLength + @@ -344,7 +329,7 @@ private TaskAttemptID[] copyMapOutput(MapHost host, // Check if we can shuffle *now* ... if (mapOutput.getType() == Type.WAIT) { LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ..."); - return new TaskAttemptID[] {mapId}; + return false; } // Go! @@ -366,18 +351,14 @@ private TaskAttemptID[] copyMapOutput(MapHost host, // Note successful shuffle remaining.remove(mapId); metrics.successFetch(); - return null; + return true; } catch (IOException ioe) { ioErrs.increment(1); if (mapId == null || mapOutput == null) { LOG.info("fetcher#" + id + " failed to read map header" + mapId + " decomp: " + decompressedLength + ", " + compressedLength, ioe); - if(mapId == null) { - return remaining.toArray(new TaskAttemptID[remaining.size()]); - } else { - return new TaskAttemptID[] {mapId}; - } + return false; } LOG.info("Failed to shuffle output of " + mapId + @@ -385,8 +366,9 @@ private TaskAttemptID[] copyMapOutput(MapHost host, // Inform the shuffle-scheduler mapOutput.abort(); + scheduler.copyFailed(mapId, host, true); metrics.failedFetch(); - return new TaskAttemptID[] {mapId}; + return false; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java deleted file mode 100644 index 7f7c3f50b8..0000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapreduce.task.reduce; - -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.ArrayList; - -import javax.crypto.SecretKey; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; -import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; -import org.junit.Test; - -/** - * Test that the Fetcher does what we expect it to. - */ -public class TestFetcher { - - public static class FakeFetcher extends Fetcher { - - private HttpURLConnection connection; - - public FakeFetcher(JobConf job, TaskAttemptID reduceId, - ShuffleScheduler scheduler, MergeManager merger, Reporter reporter, - ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, - SecretKey jobTokenSecret, HttpURLConnection connection) { - super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter, - jobTokenSecret); - this.connection = connection; - } - - @Override - protected HttpURLConnection openConnection(URL url) throws IOException { - if(connection != null) { - return connection; - } - return super.openConnection(url); - } - } - - @SuppressWarnings("unchecked") - @Test - public void testCopyFromHostBogusHeader() throws Exception { - JobConf job = new JobConf(); - TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1"); - ShuffleScheduler ss = mock(ShuffleScheduler.class); - MergeManager mm = mock(MergeManager.class); - Reporter r = mock(Reporter.class); - ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); - ExceptionReporter except = mock(ExceptionReporter.class); - SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0}); - HttpURLConnection connection = mock(HttpURLConnection.class); - - Counters.Counter allErrs = mock(Counters.Counter.class); - when(r.getCounter(anyString(), anyString())) - .thenReturn(allErrs); - - Fetcher underTest = new FakeFetcher(job, id, ss, mm, - r, metrics, except, key, connection); - - - MapHost host = new MapHost("localhost", "http://localhost:8080/"); - - ArrayList maps = new ArrayList(1); - TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1"); - maps.add(map1ID); - TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1"); - maps.add(map2ID); - when(ss.getMapsForHost(host)).thenReturn(maps); - - String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg="; - String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); - - when(connection.getResponseCode()).thenReturn(200); - when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) - .thenReturn(replyHash); - ByteArrayInputStream in = new ByteArrayInputStream( - "5 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes()); - when(connection.getInputStream()).thenReturn(in); - - underTest.copyFromHost(host); - - verify(connection) - .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, - encHash); - - verify(allErrs).increment(1); - verify(ss).copyFailed(map1ID, host, true); - verify(ss).copyFailed(map2ID, host, true); - } - -}