MAPREDUCE-3714. Fixed EventFetcher and Fetcher threads to shut-down properly so that reducers don't hang in corner cases. (vinodkv)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1235545 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-01-24 23:18:05 +00:00
parent dc615c312b
commit 078ae89a47
4 changed files with 39 additions and 18 deletions

View File

@ -546,6 +546,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3505. yarn APPLICATION_CLASSPATH needs to be overridable.
(ahmed via tucu)
MAPREDUCE-3714. Fixed EventFetcher and Fetcher threads to shut-down properly
so that reducers don't hang in corner cases. (vinodkv)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@SuppressWarnings("deprecation")
class EventFetcher<K,V> extends Thread {
private static final long SLEEP_TIME = 1000;
private static final int MAX_EVENTS_TO_FETCH = 10000;
@ -41,6 +42,8 @@ class EventFetcher<K,V> extends Thread {
private ExceptionReporter exceptionReporter = null;
private int maxMapRuntime = 0;
private volatile boolean stopped = false;
public EventFetcher(TaskAttemptID reduce,
TaskUmbilicalProtocol umbilical,
@ -60,7 +63,7 @@ public void run() {
LOG.info(reduce + " Thread started: " + getName());
try {
while (true && !Thread.currentThread().isInterrupted()) {
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
int numNewMaps = getMapCompletionEvents();
failures = 0;
@ -71,6 +74,9 @@ public void run() {
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(SLEEP_TIME);
}
} catch (InterruptedException e) {
LOG.info("EventFetcher is interrupted.. Returning");
return;
} catch (IOException ie) {
LOG.info("Exception in getting events", ie);
// check to see whether to abort
@ -90,6 +96,16 @@ public void run() {
return;
}
}
public void shutDown() {
this.stopped = true;
interrupt();
try {
join(5000);
} catch(InterruptedException ie) {
LOG.warn("Got interrupted while joining " + getName(), ie);
}
}
/**
* Queries the {@link TaskTracker} for a set of map-completion events

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
@SuppressWarnings({"deprecation"})
class Fetcher<K,V> extends Thread {
private static final Log LOG = LogFactory.getLog(Fetcher.class);
@ -88,6 +89,8 @@ private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
private final Decompressor decompressor;
private final SecretKey jobTokenSecret;
private volatile boolean stopped = false;
public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
@ -135,7 +138,7 @@ public Fetcher(JobConf job, TaskAttemptID reduceId,
public void run() {
try {
while (true && !Thread.currentThread().isInterrupted()) {
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
@ -160,7 +163,17 @@ public void run() {
exceptionReporter.reportException(t);
}
}
public void shutDown() throws InterruptedException {
this.stopped = true;
interrupt();
try {
join(5000);
} catch (InterruptedException ie) {
LOG.warn("Got interrupt while joining " + getName(), ie);
}
}
/**
* The crux of the matter...
*

View File

@ -19,8 +19,6 @@
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
@ -33,17 +31,17 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@SuppressWarnings({"deprecation", "unchecked", "rawtypes"})
public class Shuffle<K, V> implements ExceptionReporter {
private static final Log LOG = LogFactory.getLog(Shuffle.class);
private static final int PROGRESS_FREQUENCY = 2000;
private final TaskAttemptID reduceId;
@ -100,7 +98,6 @@ public Shuffle(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS,
this, mergePhase, mapOutputFile);
}
@SuppressWarnings("unchecked")
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Start the map-completion events fetcher thread
final EventFetcher<K,V> eventFetcher =
@ -130,19 +127,11 @@ public RawKeyValueIterator run() throws IOException, InterruptedException {
}
// Stop the event-fetcher thread
eventFetcher.interrupt();
try {
eventFetcher.join();
} catch(Throwable t) {
LOG.info("Failed to stop " + eventFetcher.getName(), t);
}
eventFetcher.shutDown();
// Stop the map-output fetcher threads
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.interrupt();
}
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.join();
fetcher.shutDown();
}
fetchers = null;