diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java index f8f5015493..456f2a66c8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; @@ -46,12 +47,14 @@ */ @SuppressWarnings({"unchecked", "rawtypes"}) public class TaskHeartbeatHandler extends AbstractService { - - private static class ReportTime { + + static class ReportTime { private long lastProgress; - + private final AtomicBoolean reported; + public ReportTime(long time) { setLastProgress(time); + reported = new AtomicBoolean(false); } public synchronized void setLastProgress(long time) { @@ -61,6 +64,10 @@ public synchronized void setLastProgress(long time) { public synchronized long getLastProgress() { return lastProgress; } + + public boolean isReported(){ + return reported.get(); + } } private static final Logger LOG = @@ -72,6 +79,7 @@ public synchronized long getLastProgress() { private volatile boolean stopped; private long taskTimeOut; private long unregisterTimeOut; + private long taskStuckTimeOut; private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds. private final EventHandler eventHandler; @@ -98,6 +106,8 @@ protected void serviceInit(Configuration conf) throws Exception { MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS); unregisterTimeOut = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT, MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT); + taskStuckTimeOut = conf.getLong(MRJobConfig.TASK_STUCK_TIMEOUT_MS, + MRJobConfig.DEFAULT_TASK_STUCK_TIMEOUT_MS); // enforce task timeout is at least twice as long as task report interval long taskProgressReportIntervalMillis = MRJobConfUtil. @@ -135,6 +145,7 @@ public void progressing(TaskAttemptId attemptID) { //TODO throw an exception if the task isn't registered. ReportTime time = runningAttempts.get(attemptID); if(time != null) { + time.reported.compareAndSet(false, true); time.setLastProgress(clock.getTime()); } } @@ -179,13 +190,21 @@ private void checkRunning(long currentTime) { Map.Entry entry = iterator.next(); boolean taskTimedOut = (taskTimeOut > 0) && (currentTime > (entry.getValue().getLastProgress() + taskTimeOut)); + // when container in NM not started in a long time, + // we think the taskAttempt is stuck + boolean taskStuck = (!entry.getValue().isReported()) && + (currentTime > + (entry.getValue().getLastProgress() + taskStuckTimeOut)); - if(taskTimedOut) { + if(taskTimedOut || taskStuck) { // task is lost, remove from the list and raise lost event iterator.remove(); eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry .getKey(), "AttemptID:" + entry.getKey().toString() - + " Timed out after " + taskTimeOut / 1000 + " secs")); + + " task timeout set: " + taskTimeOut / 1000 + "s," + + " taskTimedOut: " + taskTimedOut + ";" + + " task stuck timeout set: " + taskStuckTimeOut / 1000 + "s," + + " taskStuck: " + taskStuck)); eventHandler.handle(new TaskAttemptEvent(entry.getKey(), TaskAttemptEventType.TA_TIMED_OUT)); } @@ -205,6 +224,11 @@ private void checkRecentlyUnregistered(long currentTime) { } } + @VisibleForTesting + ConcurrentMap getRunningAttempts(){ + return runningAttempts; + } + @VisibleForTesting public long getTaskTimeOut() { return taskTimeOut; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java index 5d86479ef8..0fbde2c776 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.junit.Assert.assertFalse; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -41,12 +42,15 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + public class TestTaskHeartbeatHandler { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test - public void testTimeout() throws InterruptedException { + public void testTaskTimeout() throws InterruptedException { EventHandler mockHandler = mock(EventHandler.class); Clock clock = SystemClock.getInstance(); TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1); @@ -62,11 +66,13 @@ public void testTimeout() throws InterruptedException { hb.init(conf); hb.start(); try { - ApplicationId appId = ApplicationId.newInstance(0l, 5); + ApplicationId appId = ApplicationId.newInstance(0L, 5); JobId jobId = MRBuilderUtils.newJobId(appId, 4); TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP); TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2); hb.register(taid); + // Task heartbeat once to avoid stuck + hb.progressing(taid); Thread.sleep(100); //Events only happen when the task is canceled verify(mockHandler, times(2)).handle(any(Event.class)); @@ -75,6 +81,47 @@ public void testTimeout() throws InterruptedException { } } + @SuppressWarnings("unchecked") + @Test + public void testTaskStuck() throws InterruptedException { + EventHandler mockHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1); + + + Configuration conf = new Configuration(); + conf.setLong(MRJobConfig.TASK_STUCK_TIMEOUT_MS, 10); // 10ms + conf.setInt(MRJobConfig.TASK_TIMEOUT, 1000); //1000 ms + // set TASK_PROGRESS_REPORT_INTERVAL to a value smaller than TASK_TIMEOUT + // so that TASK_TIMEOUT is not overridden + conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 5); + conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms + + hb.init(conf); + hb.start(); + try { + ApplicationId appId = ApplicationId.newInstance(0L, 5); + JobId jobId = MRBuilderUtils.newJobId(appId, 4); + TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP); + TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2); + hb.register(taid); + + ConcurrentMap + runningAttempts = hb.getRunningAttempts(); + for (Map.Entry entry + : runningAttempts.entrySet()) { + assertFalse(entry.getValue().isReported()); + } + + Thread.sleep(100); + + //Events only happen when the task is canceled + verify(mockHandler, times(2)).handle(any(Event.class)); + } finally { + hb.stop(); + } + } + /** * Test if the final heartbeat timeout is set correctly when task progress * report interval is set bigger than the task timeout in the configuration. @@ -120,7 +167,7 @@ public void testTaskUnregistered() throws Exception { hb.init(conf); hb.start(); try { - ApplicationId appId = ApplicationId.newInstance(0l, 5); + ApplicationId appId = ApplicationId.newInstance(0L, 5); JobId jobId = MRBuilderUtils.newJobId(appId, 4); TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP); final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 565c05200d..b36b5ce5c6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -353,6 +353,14 @@ public interface MRJobConfig { public static final String TASK_TIMEOUT = "mapreduce.task.timeout"; long DEFAULT_TASK_TIMEOUT_MILLIS = 5 * 60 * 1000L; + /** + * The max timeout before receiving remote task's first heartbeat. + * This parameter is in order to avoid waiting for the container + * to start indefinitely, which made task stuck in the NEW state. + */ + String TASK_STUCK_TIMEOUT_MS = "mapreduce.task.stuck.timeout-ms"; + long DEFAULT_TASK_STUCK_TIMEOUT_MS = 10 * 60 * 1000L; + String TASK_PROGRESS_REPORT_INTERVAL = "mapreduce.task.progress-report.interval"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index c993537ad7..fa26e4d738 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -276,6 +276,15 @@ + + mapreduce.task.stuck.timeout-ms + 600000 + The max timeout before receiving remote task's first heartbeat. + This parameter is in order to avoid waiting for the container + to start indefinitely, which made task stuck in the NEW state. + + + mapreduce.map.memory.mb -1