diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java index 303b4c1721..6a716c7332 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java @@ -23,10 +23,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.util.MRJobConfUtil; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; @@ -67,7 +69,7 @@ public synchronized long getLastProgress() { //received from a task. private Thread lostTaskCheckerThread; private volatile boolean stopped; - private int taskTimeOut = 5 * 60 * 1000;// 5 mins + private long taskTimeOut; private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds. private final EventHandler eventHandler; @@ -87,7 +89,19 @@ public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock, @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); - taskTimeOut = conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000); + taskTimeOut = conf.getLong( + MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS); + + // enforce task timeout is at least twice as long as task report interval + long taskProgressReportIntervalMillis = MRJobConfUtil. + getTaskProgressReportInterval(conf); + long minimumTaskTimeoutAllowed = taskProgressReportIntervalMillis * 2; + if(taskTimeOut < minimumTaskTimeoutAllowed) { + taskTimeOut = minimumTaskTimeoutAllowed; + LOG.info("Task timeout must be as least twice as long as the task " + + "status report interval. Setting task timeout to " + taskTimeOut); + } + taskTimeOutCheckInterval = conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000); } @@ -140,7 +154,7 @@ public void run() { while (iterator.hasNext()) { Map.Entry entry = iterator.next(); - boolean taskTimedOut = (taskTimeOut > 0) && + boolean taskTimedOut = (taskTimeOut > 0) && (currentTime > (entry.getValue().getLastProgress() + taskTimeOut)); if(taskTimedOut) { @@ -163,4 +177,8 @@ public void run() { } } + @VisibleForTesting + public long getTaskTimeOut() { + return taskTimeOut; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java index e9a09895ec..2623849406 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import org.junit.Assert; import org.junit.Test; @@ -50,6 +51,9 @@ public void testTimeout() throws InterruptedException { Configuration conf = new Configuration(); conf.setInt(MRJobConfig.TASK_TIMEOUT, 10); //10 ms + // set TASK_PROGRESS_REPORT_INTERVAL to a value smaller than TASK_TIMEOUT + // so that TASK_TIMEOUT is not overridden + conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 5); conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms hb.init(conf); @@ -68,4 +72,67 @@ public void testTimeout() throws InterruptedException { } } + /** + * Test if the final heartbeat timeout is set correctly when task progress + * report interval is set bigger than the task timeout in the configuration. + */ + @Test + public void testTaskTimeoutConfigSmallerThanTaskProgressReportInterval() { + testTaskTimeoutWrtProgressReportInterval(1000L, 5000L); + } + + /** + * Test if the final heartbeat timeout is set correctly when task progress + * report interval is set smaller than the task timeout in the configuration. + */ + @Test + public void testTaskTimeoutConfigBiggerThanTaskProgressReportInterval() { + testTaskTimeoutWrtProgressReportInterval(5000L, 1000L); + } + + /** + * Test if the final heartbeat timeout is set correctly when task progress + * report interval is not set in the configuration. + */ + @Test + public void testTaskTimeoutConfigWithoutTaskProgressReportInterval() { + final long taskTimeoutConfiged = 2000L; + + final Configuration conf = new Configuration(); + conf.setLong(MRJobConfig.TASK_TIMEOUT, taskTimeoutConfiged); + + final long expectedTimeout = taskTimeoutConfiged; + verifyTaskTimeoutConfig(conf, expectedTimeout); + } + + /** + * Test if task timeout is set properly in response to the configuration of + * the task progress report interval. + */ + private static void testTaskTimeoutWrtProgressReportInterval( + long timeoutConfig, long taskreportInterval) { + final Configuration conf = new Configuration(); + conf.setLong(MRJobConfig.TASK_TIMEOUT, timeoutConfig); + conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, taskreportInterval); + + // expected task timeout is at least twice as long as task report interval + final long expectedTimeout = Math.max(timeoutConfig, taskreportInterval*2); + verifyTaskTimeoutConfig(conf, expectedTimeout); + } + + /** + * Verify task timeout is set as expected in TaskHeartBeatHandler with given + * configuration. + * @param conf the configuration + * @param expectedTimeout expected timeout value + */ + private static void verifyTaskTimeoutConfig(final Configuration conf, + final long expectedTimeout) { + final TaskHeartbeatHandler hb = + new TaskHeartbeatHandler(null, SystemClock.getInstance(), 1); + hb.init(conf); + + Assert.assertTrue("The value of the task timeout is incorrect.", + hb.getTaskTimeOut() == expectedTimeout); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index d20e9bdb7a..119d6a7393 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -63,6 +63,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; import org.apache.hadoop.mapreduce.task.ReduceContextImpl; +import org.apache.hadoop.mapreduce.util.MRJobConfUtil; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.ExitUtil; @@ -781,9 +782,10 @@ public void run() { int remainingRetries = MAX_RETRIES; // get current flag value and reset it as well boolean sendProgress = resetProgressFlag(); - long taskProgressInterval = - conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, - MRJobConfig.DEFAULT_TASK_PROGRESS_REPORT_INTERVAL); + + long taskProgressInterval = MRJobConfUtil. + getTaskProgressReportInterval(conf); + while (!taskDone.get()) { synchronized (lock) { done = false; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index d740747e91..5716404557 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -57,11 +57,6 @@ public interface MRJobConfig { // negative values disable the limit public static final long DEFAULT_TASK_LOCAL_WRITE_LIMIT_BYTES = -1; - public static final String TASK_PROGRESS_REPORT_INTERVAL = - "mapreduce.task.progress-report.interval"; - /** The number of milliseconds between progress reports. */ - public static final int DEFAULT_TASK_PROGRESS_REPORT_INTERVAL = 3000; - public static final String JAR = "mapreduce.job.jar"; public static final String ID = "mapreduce.job.id"; @@ -258,6 +253,10 @@ public interface MRJobConfig { public static final String TASK_REDUCE_PROFILE_PARAMS = "mapreduce.task.profile.reduce.params"; public static final String TASK_TIMEOUT = "mapreduce.task.timeout"; + long DEFAULT_TASK_TIMEOUT_MILLIS = 5 * 60 * 1000L; + + String TASK_PROGRESS_REPORT_INTERVAL = + "mapreduce.task.progress-report.interval"; public static final String TASK_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.timeout.check-interval-ms"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java index 11d49a45fa..afedef3785 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java @@ -42,4 +42,20 @@ public static void redact(final Configuration conf) { */ private MRJobConfUtil() { } + + /** + * Get the progress heartbeat interval configuration for mapreduce tasks. + * By default, the value of progress heartbeat interval is a proportion of + * that of task timeout. + * @param conf the job configuration to read from + * @return the value of task progress report interval + */ + public static long getTaskProgressReportInterval(final Configuration conf) { + long taskHeartbeatTimeOut = conf.getLong( + MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS); + return conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, + (long) (TASK_REPORT_INTERVAL_TO_TIMEOUT_RATIO * taskHeartbeatTimeOut)); + } + + public static final float TASK_REPORT_INTERVAL_TO_TIMEOUT_RATIO = 0.01f; }