diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 467d00ee34..203a3259c6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -189,6 +189,9 @@ Release 2.0.3-alpha - Unreleased MAPREDUCE-4736. Remove obsolete option [-rootDir] from TestDFSIO. (Brandon Li via suresh) + MAPREDUCE-4637. Handle TaskAttempt diagnostic updates while in the NEW and + UNASSIGNED states. (Mayank Bansal via sseth) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 687edc379c..a39a1bc6b9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -200,6 +200,10 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_KILL, new KilledTransition()) .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAILMSG, new FailedTransition()) + .addTransition(TaskAttemptStateInternal.NEW, + TaskAttemptStateInternal.NEW, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, + DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Transitions from the UNASSIGNED state. .addTransition(TaskAttemptStateInternal.UNASSIGNED, @@ -211,6 +215,10 @@ public abstract class TaskAttemptImpl implements .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition( TaskAttemptStateInternal.FAILED, true)) + .addTransition(TaskAttemptStateInternal.UNASSIGNED, + TaskAttemptStateInternal.UNASSIGNED, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, + DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Transitions from the ASSIGNED state. .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 88e32b337c..0a381facaf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -546,6 +546,105 @@ public class TestTaskAttempt{ eventHandler.internalError); } + @Test + public void testAppDiognosticEventOnUnassignedTask() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(clusterInfo.getMinContainerCapability()).thenReturn(resource); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, + jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class), + mock(Token.class), new Credentials(), new SystemClock(), appCtx); + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId, + "Task got killed")); + assertFalse( + "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task", + eventHandler.internalError); + } + + @Test + public void testAppDiognosticEventOnNewTask() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(clusterInfo.getMinContainerCapability()).thenReturn(resource); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, + jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class), + mock(Token.class), new Credentials(), new SystemClock(), appCtx); + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId, + "Task got killed")); + assertFalse( + "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task", + eventHandler.internalError); + } + + public static class MockEventHandler implements EventHandler { public boolean internalError;