From 5565f2c532f5a2bee6999155672dce8bf3179519 Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Thu, 23 May 2019 10:21:11 +0900 Subject: [PATCH] MAPREDUCE-7198. mapreduce.task.timeout=0 configuration used to disable timeout doesn't work. --- .../v2/app/TaskHeartbeatHandler.java | 5 ++- .../v2/app/TestTaskHeartbeatHandler.java | 43 ++++++++++++++++++- .../src/main/resources/mapred-default.xml | 1 + 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java index 456f2a66c8..9439a7be8d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java @@ -192,7 +192,8 @@ private void checkRunning(long currentTime) { (currentTime > (entry.getValue().getLastProgress() + taskTimeOut)); // when container in NM not started in a long time, // we think the taskAttempt is stuck - boolean taskStuck = (!entry.getValue().isReported()) && + boolean taskStuck = (taskStuckTimeOut > 0) && + (!entry.getValue().isReported()) && (currentTime > (entry.getValue().getLastProgress() + taskStuckTimeOut)); @@ -225,7 +226,7 @@ private void checkRecentlyUnregistered(long currentTime) { } @VisibleForTesting - ConcurrentMap getRunningAttempts(){ + ConcurrentMap getRunningAttempts(){ return runningAttempts; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java index f0368ebe87..418f09e8d3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -48,7 +49,7 @@ public class TestTaskHeartbeatHandler { - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings("unchecked") @Test public void testTaskTimeout() throws InterruptedException { EventHandler mockHandler = mock(EventHandler.class); @@ -81,6 +82,46 @@ public void testTaskTimeout() throws InterruptedException { } } + @Test + @SuppressWarnings("unchecked") + public void testTaskTimeoutDisable() throws InterruptedException { + EventHandler mockHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1); + + Configuration conf = new Configuration(); + conf.setLong(MRJobConfig.TASK_STUCK_TIMEOUT_MS, 0); // no timeout + conf.setInt(MRJobConfig.TASK_TIMEOUT, 0); // no timeout + // set TASK_PROGRESS_REPORT_INTERVAL to a value smaller than TASK_TIMEOUT + // so that TASK_TIMEOUT is not overridden + conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 0); + conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms + + hb.init(conf); + hb.start(); + try { + ApplicationId appId = ApplicationId.newInstance(0L, 5); + JobId jobId = MRBuilderUtils.newJobId(appId, 4); + TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP); + TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2); + hb.register(taid); + + ConcurrentMap + runningAttempts = hb.getRunningAttempts(); + for (Map.Entry entry + : runningAttempts.entrySet()) { + assertFalse(entry.getValue().isReported()); + } + + Thread.sleep(100); + + // Timeout is disabled, so the task should not be canceled + verify(mockHandler, never()).handle(any(Event.class)); + } finally { + hb.stop(); + } + } + @SuppressWarnings("unchecked") @Test public void testTaskStuck() throws InterruptedException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index fa26e4d738..1ba82d2b25 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -282,6 +282,7 @@ The max timeout before receiving remote task's first heartbeat. This parameter is in order to avoid waiting for the container to start indefinitely, which made task stuck in the NEW state. + A value of 0 disables the timeout.