diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 338a160b16..d9a02feca6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -146,6 +146,8 @@ Trunk (Unreleased) MAPREDUCE-5191. TestQueue#testQueue fails with timeout on Windows. (Ivan Mitic via hitesh) + MAPREDUCE-5717. Task pings are interpreted as task progress (jlowe) + Release 2.4.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 9f4b9c7ec6..03b6e52db8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -361,7 +361,6 @@ public AMFeedback statusUpdate(TaskAttemptID taskAttemptID, if (taskStatus == null) { //We are using statusUpdate only as a simple ping LOG.info("Ping from " + taskAttemptID.toString()); - taskHeartbeatHandler.progressing(yarnAttemptID); return feedback; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index 6563cda9ae..256f0b7bb7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -381,4 +381,50 @@ protected void registerHeartbeatHandler(Configuration conf) { } + @SuppressWarnings("rawtypes") + @Test + public void testStatusUpdateProgress() + throws IOException, InterruptedException { + AppContext appCtx = mock(AppContext.class); + JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + RMHeartbeatHandler rmHeartbeatHandler = + mock(RMHeartbeatHandler.class); + TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); + Dispatcher dispatcher = mock(Dispatcher.class); + EventHandler ea = mock(EventHandler.class); + when(dispatcher.getEventHandler()).thenReturn(ea); + + when(appCtx.getEventHandler()).thenReturn(ea); + CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); + policy.init(appCtx); + MockTaskAttemptListenerImpl listener = + new MockTaskAttemptListenerImpl(appCtx, secret, + rmHeartbeatHandler, hbHandler, policy); + Configuration conf = new Configuration(); + listener.init(conf); + listener.start(); + JVMId id = new JVMId("foo",1, true, 1); + WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId()); + + TaskAttemptID attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1); + TaskAttemptId attemptId = TypeConverter.toYarn(attemptID); + Task task = mock(Task.class); + listener.registerPendingTask(task, wid); + listener.registerLaunchedTask(attemptId, wid); + verify(hbHandler).register(attemptId); + + // make sure a ping doesn't report progress + AMFeedback feedback = listener.statusUpdate(attemptID, null); + assertTrue(feedback.getTaskFound()); + verify(hbHandler, never()).progressing(eq(attemptId)); + + // make sure a status update does report progress + MapTaskStatus mockStatus = new MapTaskStatus(attemptID, 0.0f, 1, + TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.MAP, + new Counters()); + feedback = listener.statusUpdate(attemptID, mockStatus); + assertTrue(feedback.getTaskFound()); + verify(hbHandler).progressing(eq(attemptId)); + listener.close(); + } }