MAPREDUCE-6957. shuffle hangs after a node manager connection timeout. Contributed by Jooseong Kim
This commit is contained in:
parent
f153e60576
commit
4d98936eec
@ -278,9 +278,6 @@ private DataInputStream openShuffleUrl(MapHost host,
|
|||||||
LOG.warn("Connection rejected by the host " + te.host +
|
LOG.warn("Connection rejected by the host " + te.host +
|
||||||
". Will retry later.");
|
". Will retry later.");
|
||||||
scheduler.penalize(host, te.backoff);
|
scheduler.penalize(host, te.backoff);
|
||||||
for (TaskAttemptID left : remaining) {
|
|
||||||
scheduler.putBackKnownMapOutput(host, left);
|
|
||||||
}
|
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
boolean connectExcpt = ie instanceof ConnectException;
|
boolean connectExcpt = ie instanceof ConnectException;
|
||||||
ioErrs.increment(1);
|
ioErrs.increment(1);
|
||||||
@ -293,11 +290,6 @@ private DataInputStream openShuffleUrl(MapHost host,
|
|||||||
for(TaskAttemptID left: remaining) {
|
for(TaskAttemptID left: remaining) {
|
||||||
scheduler.copyFailed(left, host, false, connectExcpt);
|
scheduler.copyFailed(left, host, false, connectExcpt);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add back all the remaining maps, WITHOUT marking them as failed
|
|
||||||
for(TaskAttemptID left: remaining) {
|
|
||||||
scheduler.putBackKnownMapOutput(host, left);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return input;
|
return input;
|
||||||
@ -332,12 +324,14 @@ protected void copyFromHost(MapHost host) throws IOException {
|
|||||||
|
|
||||||
// Construct the url and connect
|
// Construct the url and connect
|
||||||
URL url = getMapOutputURL(host, maps);
|
URL url = getMapOutputURL(host, maps);
|
||||||
DataInputStream input = openShuffleUrl(host, remaining, url);
|
DataInputStream input = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
input = openShuffleUrl(host, remaining, url);
|
||||||
if (input == null) {
|
if (input == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
|
||||||
// Loop through available map-outputs and fetch them
|
// Loop through available map-outputs and fetch them
|
||||||
// On any error, faildTasks is not null and we exit
|
// On any error, faildTasks is not null and we exit
|
||||||
// after putting back the remaining maps to the
|
// after putting back the remaining maps to the
|
||||||
|
@ -217,6 +217,9 @@ public synchronized void copySucceeded(TaskAttemptID mapId,
|
|||||||
reduceShuffleBytes.increment(bytes);
|
reduceShuffleBytes.increment(bytes);
|
||||||
lastProgressTime = Time.monotonicNow();
|
lastProgressTime = Time.monotonicNow();
|
||||||
LOG.debug("map " + mapId + " done " + status.getStateString());
|
LOG.debug("map " + mapId + " done " + status.getStateString());
|
||||||
|
} else {
|
||||||
|
LOG.warn("Aborting already-finished MapOutput for " + mapId);
|
||||||
|
output.abort();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -471,7 +471,10 @@ public void testCopyFromHostWithRetryThenTimeout() throws Exception {
|
|||||||
|
|
||||||
underTest.copyFromHost(host);
|
underTest.copyFromHost(host);
|
||||||
verify(allErrs).increment(1);
|
verify(allErrs).increment(1);
|
||||||
verify(ss).copyFailed(map1ID, host, false, false);
|
verify(ss, times(1)).copyFailed(map1ID, host, false, false);
|
||||||
|
verify(ss, times(1)).copyFailed(map2ID, host, false, false);
|
||||||
|
verify(ss, times(1)).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
|
||||||
|
verify(ss, times(1)).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -18,6 +18,8 @@
|
|||||||
package org.apache.hadoop.mapreduce.task.reduce;
|
package org.apache.hadoop.mapreduce.task.reduce;
|
||||||
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
@ -283,6 +285,84 @@ public void addFetchFailedMap(TaskAttemptID mapTaskId) {
|
|||||||
scheduler.copyFailed(failedAttemptID, host1, true, false);
|
scheduler.copyFailed(failedAttemptID, host1, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public <K, V> void testDuplicateCopySucceeded() throws Exception {
|
||||||
|
JobConf job = new JobConf();
|
||||||
|
job.setNumMapTasks(2);
|
||||||
|
//mock creation
|
||||||
|
TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
|
||||||
|
Reporter mockReporter = mock(Reporter.class);
|
||||||
|
FileSystem mockFileSystem = mock(FileSystem.class);
|
||||||
|
Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass =
|
||||||
|
job.getCombinerClass();
|
||||||
|
@SuppressWarnings("unchecked") // needed for mock with generic
|
||||||
|
CombineOutputCollector<K, V> mockCombineOutputCollector =
|
||||||
|
(CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
|
||||||
|
org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
|
||||||
|
mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
|
||||||
|
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
|
||||||
|
CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
|
||||||
|
Counter mockCounter = mock(Counter.class);
|
||||||
|
TaskStatus mockTaskStatus = mock(TaskStatus.class);
|
||||||
|
Progress mockProgress = mock(Progress.class);
|
||||||
|
MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
|
||||||
|
Task mockTask = mock(Task.class);
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
MapOutput<K, V> output1 = mock(MapOutput.class);
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
MapOutput<K, V> output2 = mock(MapOutput.class);
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
MapOutput<K, V> output3 = mock(MapOutput.class);
|
||||||
|
|
||||||
|
ShuffleConsumerPlugin.Context<K, V> context =
|
||||||
|
new ShuffleConsumerPlugin.Context<K, V>(
|
||||||
|
mockTaskAttemptID, job, mockFileSystem,
|
||||||
|
mockUmbilical, mockLocalDirAllocator,
|
||||||
|
mockReporter, mockCompressionCodec,
|
||||||
|
combinerClass, mockCombineOutputCollector,
|
||||||
|
mockCounter, mockCounter, mockCounter,
|
||||||
|
mockCounter, mockCounter, mockCounter,
|
||||||
|
mockTaskStatus, mockProgress, mockProgress,
|
||||||
|
mockTask, mockMapOutputFile, null);
|
||||||
|
TaskStatus status = new TaskStatus() {
|
||||||
|
@Override
|
||||||
|
public boolean getIsMap() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Progress progress = new Progress();
|
||||||
|
ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
|
||||||
|
status, null, null, progress, context.getShuffledMapsCounter(),
|
||||||
|
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
|
||||||
|
|
||||||
|
MapHost host1 = new MapHost("host1", null);
|
||||||
|
TaskAttemptID succeedAttempt1ID = new TaskAttemptID(
|
||||||
|
new org.apache.hadoop.mapred.TaskID(
|
||||||
|
new JobID("test", 0), TaskType.MAP, 0), 0);
|
||||||
|
TaskAttemptID succeedAttempt2ID = new TaskAttemptID(
|
||||||
|
new org.apache.hadoop.mapred.TaskID(
|
||||||
|
new JobID("test", 0), TaskType.MAP, 0), 1);
|
||||||
|
TaskAttemptID succeedAttempt3ID = new TaskAttemptID(
|
||||||
|
new org.apache.hadoop.mapred.TaskID(
|
||||||
|
new JobID("test", 0), TaskType.MAP, 1), 0);
|
||||||
|
|
||||||
|
long bytes = (long)500 * 1024 * 1024;
|
||||||
|
//First successful copy for map 0 should commit output
|
||||||
|
scheduler.copySucceeded(succeedAttempt1ID, host1, bytes, 0, 1, output1);
|
||||||
|
verify(output1).commit();
|
||||||
|
|
||||||
|
//Second successful copy for map 0 should abort output
|
||||||
|
scheduler.copySucceeded(succeedAttempt2ID, host1, bytes, 0, 1, output2);
|
||||||
|
verify(output2).abort();
|
||||||
|
|
||||||
|
//First successful copy for map 1 should commit output
|
||||||
|
scheduler.copySucceeded(succeedAttempt3ID, host1, bytes, 0, 1, output3);
|
||||||
|
verify(output3).commit();
|
||||||
|
}
|
||||||
|
|
||||||
private static String copyMessage(int attemptNo, double rate1, double rate2) {
|
private static String copyMessage(int attemptNo, double rate1, double rate2) {
|
||||||
int attemptZero = attemptNo - 1;
|
int attemptZero = attemptNo - 1;
|
||||||
return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)"
|
return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)"
|
||||||
|
Loading…
Reference in New Issue
Block a user