From a5037559905db131efaa590a605001a7361098bf Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Mon, 24 Oct 2011 21:19:31 +0000 Subject: [PATCH] MAPREDUCE-3249. Ensure shuffle-port is correctly used duringMR AM recovery. Contributed by Vinod K V. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1188388 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../mapreduce/v2/app/job/impl/TaskImpl.java | 3 ++- .../v2/app/recover/RecoveryService.java | 24 +++++++++---------- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 18 +++++++------- .../hadoop/mapreduce/v2/app/TestRecovery.java | 18 +++++++++++++- 5 files changed, 42 insertions(+), 24 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d69f04f5f9..2bffc8a165 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1747,6 +1747,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3028. Added job-end notification support. (Ravi Prakash via acmurthy) + MAPREDUCE-3249. Ensure shuffle-port is correctly used duringMR AM recovery. + (vinodkv via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index e192079463..a7c6491512 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -568,7 +568,8 @@ private void handleTaskAttemptCompletion(TaskAttemptId attemptId, //raise the completion event only if the container is assigned // to nextAttemptNumber if (attempt.getNodeHttpAddress() != null) { - TaskAttemptCompletionEvent tce = recordFactory.newRecordInstance(TaskAttemptCompletionEvent.class); + TaskAttemptCompletionEvent tce = recordFactory + .newRecordInstance(TaskAttemptCompletionEvent.class); tce.setEventId(-1); tce.setMapOutputServerAddress("http://" + attempt.getNodeHttpAddress().split(":")[0] + ":" diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index 7940b6ae00..7d3fac6f43 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; +import org.apache.hadoop.yarn.util.BuilderUtils; /* * Recovers the completed tasks from the previous life of Application Master. @@ -313,8 +314,8 @@ else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event) .getTaskAttemptID(); TaskAttemptInfo attInfo = getTaskAttemptInfo(aId); - //TODO need to get the real port number MAPREDUCE-2666 - actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId, -1)); + actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId, + attInfo.getShufflePort())); // send the status update event sendStatusUpdateEvent(aId, attInfo); @@ -392,16 +393,15 @@ private void sendAssignedEvent(TaskAttemptId yarnAttemptID, TaskAttemptInfo attemptInfo) { LOG.info("Sending assigned event to " + yarnAttemptID); ContainerId cId = attemptInfo.getContainerId(); - Container container = recordFactory - .newRecordInstance(Container.class); - container.setId(cId); - container.setNodeId(recordFactory - .newRecordInstance(NodeId.class)); - // NodeId can be obtained from TaskAttemptInfo.hostname - but this will - // eventually contain rack info. - container.setContainerToken(null); - container.setNodeHttpAddress(attemptInfo.getTrackerName() + ":" + - attemptInfo.getHttpPort()); + String[] splits = attemptInfo.getHostname().split(":"); + NodeId nodeId = BuilderUtils.newNodeId(splits[0], Integer + .parseInt(splits[1])); + // Resource/Priority/ApplicationACLs are only needed while launching the + // container on an NM, these are already completed tasks, so setting them + // to null + Container container = BuilderUtils.newContainer(cId, nodeId, + attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(), + null, null, null); actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID, container, null)); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 6f9dc71c07..0d5097ae28 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -315,15 +315,17 @@ protected ContainerLauncher createContainerLauncher(AppContext context) { } class MockContainerLauncher implements ContainerLauncher { + + //We are running locally so set the shuffle port to -1 + int shufflePort = -1; + @Override public void handle(ContainerLauncherEvent event) { switch (event.getType()) { case CONTAINER_REMOTE_LAUNCH: - //We are running locally so set the shuffle port to -1 getContext().getEventHandler().handle( new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(), - -1) - ); + shufflePort)); attemptLaunched(event.getTaskAttemptID()); break; @@ -355,13 +357,9 @@ public void handle(ContainerAllocatorEvent event) { ContainerId cId = recordFactory.newRecordInstance(ContainerId.class); cId.setApplicationAttemptId(getContext().getApplicationAttemptId()); cId.setId(containerCount++); - Container container = recordFactory.newRecordInstance(Container.class); - container.setId(cId); - container.setNodeId(recordFactory.newRecordInstance(NodeId.class)); - container.getNodeId().setHost("dummy"); - container.getNodeId().setPort(1234); - container.setContainerToken(null); - container.setNodeHttpAddress("localhost:9999"); + NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234); + Container container = BuilderUtils.newContainer(cId, nodeId, + "localhost:9999", null, null, null); getContext().getEventHandler().handle( new TaskAttemptContainerAssignedEvent(event.getAttemptID(), container, null)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 425bc5b413..89e60b547d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Test; @@ -269,6 +270,9 @@ public void testOutputRecovery() throws Exception { //wait for map task to complete app.waitForState(mapTask1, TaskState.SUCCEEDED); + + // Verify the shuffle-port + Assert.assertEquals(5467, task1Attempt1.getShufflePort()); app.waitForState(reduceTask1, TaskState.RUNNING); TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next(); @@ -290,7 +294,8 @@ public void testOutputRecovery() throws Exception { //rerun //in rerun the map will be recovered from previous run - app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, ++runCount); + app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, + ++runCount); conf = new Configuration(); conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); conf.setBoolean("mapred.mapper.new-api", true); @@ -308,6 +313,10 @@ public void testOutputRecovery() throws Exception { // map will be recovered, no need to send done app.waitForState(mapTask1, TaskState.SUCCEEDED); + + // Verify the shuffle-port after recovery + task1Attempt1 = mapTask1.getAttempts().values().iterator().next(); + Assert.assertEquals(5467, task1Attempt1.getShufflePort()); // first reduce will be recovered, no need to send done app.waitForState(reduceTask1, TaskState.SUCCEEDED); @@ -397,6 +406,13 @@ public MRAppWithHistory(int maps, int reduces, boolean autoComplete, super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); } + @Override + protected ContainerLauncher createContainerLauncher(AppContext context) { + MockContainerLauncher launcher = new MockContainerLauncher(); + launcher.shufflePort = 5467; + return launcher; + } + @Override protected EventHandler createJobHistoryHandler( AppContext context) {