MAPREDUCE-6190. If a task stucks before its first heartbeat, it never timeouts and the MR job becomes stuck. Contributed by Zhaohui Xin.

This commit is contained in:
Akira Ajisaka 2018-11-28 17:57:42 +09:00
parent b3a052d199
commit 13a21f6607
No known key found for this signature in database
GPG Key ID: C1EDBB9CA400FD50
4 changed files with 96 additions and 8 deletions

View File

@ -22,6 +22,7 @@
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -47,11 +48,13 @@
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public class TaskHeartbeatHandler extends AbstractService { public class TaskHeartbeatHandler extends AbstractService {
private static class ReportTime { static class ReportTime {
private long lastProgress; private long lastProgress;
private final AtomicBoolean reported;
public ReportTime(long time) { public ReportTime(long time) {
setLastProgress(time); setLastProgress(time);
reported = new AtomicBoolean(false);
} }
public synchronized void setLastProgress(long time) { public synchronized void setLastProgress(long time) {
@ -61,6 +64,10 @@ public synchronized void setLastProgress(long time) {
public synchronized long getLastProgress() { public synchronized long getLastProgress() {
return lastProgress; return lastProgress;
} }
public boolean isReported(){
return reported.get();
}
} }
private static final Logger LOG = private static final Logger LOG =
@ -72,6 +79,7 @@ public synchronized long getLastProgress() {
private volatile boolean stopped; private volatile boolean stopped;
private long taskTimeOut; private long taskTimeOut;
private long unregisterTimeOut; private long unregisterTimeOut;
private long taskStuckTimeOut;
private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds. private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds.
private final EventHandler eventHandler; private final EventHandler eventHandler;
@ -98,6 +106,8 @@ protected void serviceInit(Configuration conf) throws Exception {
MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS); MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS);
unregisterTimeOut = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT, unregisterTimeOut = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT); MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
taskStuckTimeOut = conf.getLong(MRJobConfig.TASK_STUCK_TIMEOUT_MS,
MRJobConfig.DEFAULT_TASK_STUCK_TIMEOUT_MS);
// enforce task timeout is at least twice as long as task report interval // enforce task timeout is at least twice as long as task report interval
long taskProgressReportIntervalMillis = MRJobConfUtil. long taskProgressReportIntervalMillis = MRJobConfUtil.
@ -135,6 +145,7 @@ public void progressing(TaskAttemptId attemptID) {
//TODO throw an exception if the task isn't registered. //TODO throw an exception if the task isn't registered.
ReportTime time = runningAttempts.get(attemptID); ReportTime time = runningAttempts.get(attemptID);
if(time != null) { if(time != null) {
time.reported.compareAndSet(false, true);
time.setLastProgress(clock.getTime()); time.setLastProgress(clock.getTime());
} }
} }
@ -179,13 +190,21 @@ private void checkRunning(long currentTime) {
Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next(); Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
boolean taskTimedOut = (taskTimeOut > 0) && boolean taskTimedOut = (taskTimeOut > 0) &&
(currentTime > (entry.getValue().getLastProgress() + taskTimeOut)); (currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
// when container in NM not started in a long time,
// we think the taskAttempt is stuck
boolean taskStuck = (!entry.getValue().isReported()) &&
(currentTime >
(entry.getValue().getLastProgress() + taskStuckTimeOut));
if(taskTimedOut) { if(taskTimedOut || taskStuck) {
// task is lost, remove from the list and raise lost event // task is lost, remove from the list and raise lost event
iterator.remove(); iterator.remove();
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
.getKey(), "AttemptID:" + entry.getKey().toString() .getKey(), "AttemptID:" + entry.getKey().toString()
+ " Timed out after " + taskTimeOut / 1000 + " secs")); + " task timeout set: " + taskTimeOut / 1000 + "s,"
+ " taskTimedOut: " + taskTimedOut + ";"
+ " task stuck timeout set: " + taskStuckTimeOut / 1000 + "s,"
+ " taskStuck: " + taskStuck));
eventHandler.handle(new TaskAttemptEvent(entry.getKey(), eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
TaskAttemptEventType.TA_TIMED_OUT)); TaskAttemptEventType.TA_TIMED_OUT));
} }
@ -205,6 +224,11 @@ private void checkRecentlyUnregistered(long currentTime) {
} }
} }
@VisibleForTesting
ConcurrentMap getRunningAttempts(){
return runningAttempts;
}
@VisibleForTesting @VisibleForTesting
public long getTaskTimeOut() { public long getTaskTimeOut() {
return taskTimeOut; return taskTimeOut;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app; package org.apache.hadoop.mapreduce.v2.app;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -41,12 +42,15 @@
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
public class TestTaskHeartbeatHandler { public class TestTaskHeartbeatHandler {
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings({ "rawtypes", "unchecked" })
@Test @Test
public void testTimeout() throws InterruptedException { public void testTaskTimeout() throws InterruptedException {
EventHandler mockHandler = mock(EventHandler.class); EventHandler mockHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance(); Clock clock = SystemClock.getInstance();
TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1); TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1);
@ -62,11 +66,13 @@ public void testTimeout() throws InterruptedException {
hb.init(conf); hb.init(conf);
hb.start(); hb.start();
try { try {
ApplicationId appId = ApplicationId.newInstance(0l, 5); ApplicationId appId = ApplicationId.newInstance(0L, 5);
JobId jobId = MRBuilderUtils.newJobId(appId, 4); JobId jobId = MRBuilderUtils.newJobId(appId, 4);
TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP); TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP);
TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2); TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2);
hb.register(taid); hb.register(taid);
// Task heartbeat once to avoid stuck
hb.progressing(taid);
Thread.sleep(100); Thread.sleep(100);
//Events only happen when the task is canceled //Events only happen when the task is canceled
verify(mockHandler, times(2)).handle(any(Event.class)); verify(mockHandler, times(2)).handle(any(Event.class));
@ -75,6 +81,47 @@ public void testTimeout() throws InterruptedException {
} }
} }
@SuppressWarnings("unchecked")
@Test
public void testTaskStuck() throws InterruptedException {
EventHandler mockHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance();
TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1);
Configuration conf = new Configuration();
conf.setLong(MRJobConfig.TASK_STUCK_TIMEOUT_MS, 10); // 10ms
conf.setInt(MRJobConfig.TASK_TIMEOUT, 1000); //1000 ms
// set TASK_PROGRESS_REPORT_INTERVAL to a value smaller than TASK_TIMEOUT
// so that TASK_TIMEOUT is not overridden
conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 5);
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms
hb.init(conf);
hb.start();
try {
ApplicationId appId = ApplicationId.newInstance(0L, 5);
JobId jobId = MRBuilderUtils.newJobId(appId, 4);
TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP);
TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2);
hb.register(taid);
ConcurrentMap<TaskAttemptId, TaskHeartbeatHandler.ReportTime>
runningAttempts = hb.getRunningAttempts();
for (Map.Entry<TaskAttemptId, TaskHeartbeatHandler.ReportTime> entry
: runningAttempts.entrySet()) {
assertFalse(entry.getValue().isReported());
}
Thread.sleep(100);
//Events only happen when the task is canceled
verify(mockHandler, times(2)).handle(any(Event.class));
} finally {
hb.stop();
}
}
/** /**
* Test if the final heartbeat timeout is set correctly when task progress * Test if the final heartbeat timeout is set correctly when task progress
* report interval is set bigger than the task timeout in the configuration. * report interval is set bigger than the task timeout in the configuration.
@ -120,7 +167,7 @@ public void testTaskUnregistered() throws Exception {
hb.init(conf); hb.init(conf);
hb.start(); hb.start();
try { try {
ApplicationId appId = ApplicationId.newInstance(0l, 5); ApplicationId appId = ApplicationId.newInstance(0L, 5);
JobId jobId = MRBuilderUtils.newJobId(appId, 4); JobId jobId = MRBuilderUtils.newJobId(appId, 4);
TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP); TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP);
final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2); final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2);

View File

@ -353,6 +353,14 @@ public interface MRJobConfig {
public static final String TASK_TIMEOUT = "mapreduce.task.timeout"; public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
long DEFAULT_TASK_TIMEOUT_MILLIS = 5 * 60 * 1000L; long DEFAULT_TASK_TIMEOUT_MILLIS = 5 * 60 * 1000L;
/**
* The max timeout before receiving remote task's first heartbeat.
* This parameter is in order to avoid waiting for the container
* to start indefinitely, which made task stuck in the NEW state.
*/
String TASK_STUCK_TIMEOUT_MS = "mapreduce.task.stuck.timeout-ms";
long DEFAULT_TASK_STUCK_TIMEOUT_MS = 10 * 60 * 1000L;
String TASK_PROGRESS_REPORT_INTERVAL = String TASK_PROGRESS_REPORT_INTERVAL =
"mapreduce.task.progress-report.interval"; "mapreduce.task.progress-report.interval";

View File

@ -276,6 +276,15 @@
</description> </description>
</property> </property>
<property>
<name>mapreduce.task.stuck.timeout-ms</name>
<value>600000</value>
<description>The max timeout before receiving remote task's first heartbeat.
This parameter is in order to avoid waiting for the container
to start indefinitely, which made task stuck in the NEW state.
</description>
</property>
<property> <property>
<name>mapreduce.map.memory.mb</name> <name>mapreduce.map.memory.mb</name>
<value>-1</value> <value>-1</value>