MAPREDUCE-5717. Task pings are interpreted as task progress. Contributed by Jason Lowe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1559256 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2014-01-17 21:38:37 +00:00
parent cdc54c8759
commit 0fd646b967
3 changed files with 48 additions and 1 deletions

View File

@ -146,6 +146,8 @@ Trunk (Unreleased)
MAPREDUCE-5191. TestQueue#testQueue fails with timeout on Windows. (Ivan
Mitic via hitesh)
MAPREDUCE-5717. Task pings are interpreted as task progress (jlowe)
Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -361,7 +361,6 @@ public AMFeedback statusUpdate(TaskAttemptID taskAttemptID,
if (taskStatus == null) {
//We are using statusUpdate only as a simple ping
LOG.info("Ping from " + taskAttemptID.toString());
taskHeartbeatHandler.progressing(yarnAttemptID);
return feedback;
}

View File

@ -381,4 +381,50 @@ protected void registerHeartbeatHandler(Configuration conf) {
}
@SuppressWarnings("rawtypes")
@Test
public void testStatusUpdateProgress()
throws IOException, InterruptedException {
AppContext appCtx = mock(AppContext.class);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
Dispatcher dispatcher = mock(Dispatcher.class);
EventHandler ea = mock(EventHandler.class);
when(dispatcher.getEventHandler()).thenReturn(ea);
when(appCtx.getEventHandler()).thenReturn(ea);
CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
policy.init(appCtx);
MockTaskAttemptListenerImpl listener =
new MockTaskAttemptListenerImpl(appCtx, secret,
rmHeartbeatHandler, hbHandler, policy);
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
JVMId id = new JVMId("foo",1, true, 1);
WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
TaskAttemptID attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
TaskAttemptId attemptId = TypeConverter.toYarn(attemptID);
Task task = mock(Task.class);
listener.registerPendingTask(task, wid);
listener.registerLaunchedTask(attemptId, wid);
verify(hbHandler).register(attemptId);
// make sure a ping doesn't report progress
AMFeedback feedback = listener.statusUpdate(attemptID, null);
assertTrue(feedback.getTaskFound());
verify(hbHandler, never()).progressing(eq(attemptId));
// make sure a status update does report progress
MapTaskStatus mockStatus = new MapTaskStatus(attemptID, 0.0f, 1,
TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.MAP,
new Counters());
feedback = listener.statusUpdate(attemptID, mockStatus);
assertTrue(feedback.getTaskFound());
verify(hbHandler).progressing(eq(attemptId));
listener.close();
}
}