From 8a2a74159540f2e8d8f767639200e5e92bb6efbc Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Fri, 14 Mar 2014 20:33:03 +0000 Subject: [PATCH] MAPREDUCE-5570. Map task attempt with fetch failure has incorrect attempt finish time. Contributed by Rushabh S Shah git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1577692 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/job/impl/TaskAttemptImpl.java | 2 - .../v2/app/job/impl/TestTaskAttempt.java | 71 ++++++++++++++++++- 3 files changed, 73 insertions(+), 3 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f194425cfe..8e60ecc9c1 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -240,6 +240,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5769. Unregistration to RM should not be called if AM is crashed before registering with RM (Rohith via jlowe) + MAPREDUCE-5570. Map task attempt with fetch failure has incorrect attempt + finish time (Rushabh S Shah via jlowe) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 78d70241fb..c36511186a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1774,8 +1774,6 @@ public abstract class TaskAttemptImpl implements .checkArgument(taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP); //add to diagnostic taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt"); - //set the finish time - taskAttempt.setFinishTime(); if (taskAttempt.getLaunchTime() != 0) { taskAttempt.eventHandler diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 28babaa2d6..9dc738d858 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -91,7 +92,7 @@ import org.mockito.ArgumentCaptor; @SuppressWarnings({"unchecked", "rawtypes"}) public class TestTaskAttempt{ - + static public class StubbedFS extends RawLocalFileSystem { @Override public FileStatus getFileStatus(Path f) throws IOException { @@ -725,6 +726,74 @@ public class TestTaskAttempt{ eventHandler.internalError); } + @Test + public void testFetchFailureAttemptFinishTime() throws Exception{ + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + splits, jobConf, taListener,mock(Token.class), new Credentials(), + new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, + container, mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_DONE)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + + assertEquals("Task attempt is not in succeeded state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + + assertTrue("Task Attempt finish time is not greater than 0", + taImpl.getFinishTime() > 0); + + Long finishTime = taImpl.getFinishTime(); + Thread.sleep(5); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); + + assertEquals("Task attempt is not in Too Many Fetch Failure state", + taImpl.getState(), TaskAttemptState.FAILED); + + assertEquals("After TA_TOO_MANY_FETCH_FAILURE," + + " Task attempt finish time is not the same ", + finishTime, Long.valueOf(taImpl.getFinishTime())); + } public static class MockEventHandler implements EventHandler { public boolean internalError;