diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 6112fb53f9..ce1551b321 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -278,9 +278,6 @@ private DataInputStream openShuffleUrl(MapHost host, LOG.warn("Connection rejected by the host " + te.host + ". Will retry later."); scheduler.penalize(host, te.backoff); - for (TaskAttemptID left : remaining) { - scheduler.putBackKnownMapOutput(host, left); - } } catch (IOException ie) { boolean connectExcpt = ie instanceof ConnectException; ioErrs.increment(1); @@ -293,11 +290,6 @@ private DataInputStream openShuffleUrl(MapHost host, for(TaskAttemptID left: remaining) { scheduler.copyFailed(left, host, false, connectExcpt); } - - // Add back all the remaining maps, WITHOUT marking them as failed - for(TaskAttemptID left: remaining) { - scheduler.putBackKnownMapOutput(host, left); - } } return input; @@ -332,12 +324,14 @@ protected void copyFromHost(MapHost host) throws IOException { // Construct the url and connect URL url = getMapOutputURL(host, maps); - DataInputStream input = openShuffleUrl(host, remaining, url); - if (input == null) { - return; - } + DataInputStream input = null; try { + input = openShuffleUrl(host, remaining, url); + if (input == null) { + return; + } + // Loop through available map-outputs and fetch them // On any error, faildTasks is not null and we exit // after putting back the remaining maps to the diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java index 2b6dc57c34..d9ce32cefa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java @@ -217,6 +217,9 @@ public synchronized void copySucceeded(TaskAttemptID mapId, reduceShuffleBytes.increment(bytes); lastProgressTime = Time.monotonicNow(); LOG.debug("map " + mapId + " done " + status.getStateString()); + } else { + LOG.warn("Aborting already-finished MapOutput for " + mapId); + output.abort(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index 8f9434d3d5..934afd747e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -471,7 +471,10 @@ public void testCopyFromHostWithRetryThenTimeout() throws Exception { underTest.copyFromHost(host); verify(allErrs).increment(1); - verify(ss).copyFailed(map1ID, host, false, false); + verify(ss, times(1)).copyFailed(map1ID, host, false, false); + verify(ss, times(1)).copyFailed(map2ID, host, false, false); + verify(ss, times(1)).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); + verify(ss, times(1)).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); } @Test diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java index 654b7488b9..5d0a0270e0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.task.reduce; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.io.compress.CompressionCodec; @@ -283,6 +285,84 @@ public void addFetchFailedMap(TaskAttemptID mapTaskId) { scheduler.copyFailed(failedAttemptID, host1, true, false); } + @Test + public void testDuplicateCopySucceeded() throws Exception { + JobConf job = new JobConf(); + job.setNumMapTasks(2); + //mock creation + TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class); + Reporter mockReporter = mock(Reporter.class); + FileSystem mockFileSystem = mock(FileSystem.class); + Class combinerClass = + job.getCombinerClass(); + @SuppressWarnings("unchecked") // needed for mock with generic + CombineOutputCollector mockCombineOutputCollector = + (CombineOutputCollector) mock(CombineOutputCollector.class); + org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID = + mock(org.apache.hadoop.mapreduce.TaskAttemptID.class); + LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class); + CompressionCodec mockCompressionCodec = mock(CompressionCodec.class); + Counter mockCounter = mock(Counter.class); + TaskStatus mockTaskStatus = mock(TaskStatus.class); + Progress mockProgress = mock(Progress.class); + MapOutputFile mockMapOutputFile = mock(MapOutputFile.class); + Task mockTask = mock(Task.class); + @SuppressWarnings("unchecked") + MapOutput output1 = mock(MapOutput.class); + @SuppressWarnings("unchecked") + MapOutput output2 = mock(MapOutput.class); + @SuppressWarnings("unchecked") + MapOutput output3 = mock(MapOutput.class); + + ShuffleConsumerPlugin.Context context = + new ShuffleConsumerPlugin.Context( + mockTaskAttemptID, job, mockFileSystem, + mockUmbilical, mockLocalDirAllocator, + mockReporter, mockCompressionCodec, + combinerClass, mockCombineOutputCollector, + mockCounter, mockCounter, mockCounter, + mockCounter, mockCounter, mockCounter, + mockTaskStatus, mockProgress, mockProgress, + mockTask, mockMapOutputFile, null); + TaskStatus status = new TaskStatus() { + @Override + public boolean getIsMap() { + return false; + } + @Override + public void addFetchFailedMap(TaskAttemptID mapTaskId) { + } + }; + Progress progress = new Progress(); + ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, + status, null, null, progress, context.getShuffledMapsCounter(), + context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); + + MapHost host1 = new MapHost("host1", null); + TaskAttemptID succeedAttempt1ID = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test", 0), TaskType.MAP, 0), 0); + TaskAttemptID succeedAttempt2ID = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test", 0), TaskType.MAP, 0), 1); + TaskAttemptID succeedAttempt3ID = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test", 0), TaskType.MAP, 1), 0); + + long bytes = (long)500 * 1024 * 1024; + //First successful copy for map 0 should commit output + scheduler.copySucceeded(succeedAttempt1ID, host1, bytes, 0, 1, output1); + verify(output1).commit(); + + //Second successful copy for map 0 should abort output + scheduler.copySucceeded(succeedAttempt2ID, host1, bytes, 0, 1, output2); + verify(output2).abort(); + + //First successful copy for map 1 should commit output + scheduler.copySucceeded(succeedAttempt3ID, host1, bytes, 0, 1, output3); + verify(output3).commit(); + } + private static String copyMessage(int attemptNo, double rate1, double rate2) { int attemptZero = attemptNo - 1; return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)"