MAPREDUCE-2450. Fixed a corner case with interrupted communication threads leading to a long timeout in Task. Contributed by Rajesh Balamohan.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1232314 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2012-01-17 06:10:50 +00:00
parent c53c94ec46
commit d05e6d2671
2 changed files with 23 additions and 0 deletions

View File

@ -503,6 +503,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3657. State machine visualize build fails. (Jason Lowe MAPREDUCE-3657. State machine visualize build fails. (Jason Lowe
via mahadev) via mahadev)
MAPREDUCE-2450. Fixed a corner case with interrupted communication threads
leading to a long timeout in Task. (Rajesh Balamohan via acmurthy)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -552,6 +552,8 @@ protected class TaskReporter
private InputSplit split = null; private InputSplit split = null;
private Progress taskProgress; private Progress taskProgress;
private Thread pingThread = null; private Thread pingThread = null;
private boolean done = true;
private Object lock = new Object();
/** /**
* flag that indicates whether progress update needs to be sent to parent. * flag that indicates whether progress update needs to be sent to parent.
@ -648,6 +650,9 @@ public void run() {
// get current flag value and reset it as well // get current flag value and reset it as well
boolean sendProgress = resetProgressFlag(); boolean sendProgress = resetProgressFlag();
while (!taskDone.get()) { while (!taskDone.get()) {
synchronized (lock) {
done = false;
}
try { try {
boolean taskFound = true; // whether TT knows about this task boolean taskFound = true; // whether TT knows about this task
// sleep for a bit // sleep for a bit
@ -680,6 +685,7 @@ public void run() {
// came back up), kill ourselves // came back up), kill ourselves
if (!taskFound) { if (!taskFound) {
LOG.warn("Parent died. Exiting "+taskId); LOG.warn("Parent died. Exiting "+taskId);
resetDoneFlag();
System.exit(66); System.exit(66);
} }
@ -692,10 +698,19 @@ public void run() {
if (remainingRetries == 0) { if (remainingRetries == 0) {
ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0); ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
LOG.warn("Last retry, killing "+taskId); LOG.warn("Last retry, killing "+taskId);
resetDoneFlag();
System.exit(65); System.exit(65);
} }
} }
} }
//Notify that we are done with the work
resetDoneFlag();
}
void resetDoneFlag() {
synchronized (lock) {
done = true;
lock.notify();
}
} }
public void startCommunicationThread() { public void startCommunicationThread() {
if (pingThread == null) { if (pingThread == null) {
@ -706,6 +721,11 @@ public void startCommunicationThread() {
} }
public void stopCommunicationThread() throws InterruptedException { public void stopCommunicationThread() throws InterruptedException {
if (pingThread != null) { if (pingThread != null) {
synchronized (lock) {
while (!done) {
lock.wait();
}
}
pingThread.interrupt(); pingThread.interrupt();
pingThread.join(); pingThread.join();
} }