From 277e520579a3452b95a5ffe2616d4f252d3c53fb Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Wed, 12 Oct 2011 23:12:01 +0000 Subject: [PATCH] MAPREDUCE-2666. Retrieve shuffle port number from JobHistory on MR AM restart. Contributed by Jonathan Eagles. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1182613 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../v2/app/job/impl/TaskAttemptImpl.java | 5 ++-- .../src/main/avro/Events.avpr | 3 ++- .../jobhistory/JobHistoryParser.java | 6 +++++ .../jobhistory/TaskAttemptStartedEvent.java | 6 ++++- .../v2/hs/TestJobHistoryParsing.java | 24 ++++++++++++++++--- .../apache/hadoop/mapred/JobInProgress.java | 4 ++-- .../jobhistory/TestJobHistoryEvents.java | 2 +- .../rumen/TaskAttempt20LineEventEmitter.java | 2 +- 9 files changed, 44 insertions(+), 11 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 041ad65a6c..89afcdfe57 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1597,6 +1597,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3125. Modified TaskImpl to consider only non-failed, non-killed task-attempts for obtaining task's progress. (Hitesh Shah via vinodkv) + MAPREDUCE-2666. Retrieve shuffle port number from JobHistory on MR AM + restart. (Jonathan Eagles via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 3da2ebfab4..933347fe63 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1095,6 +1095,8 @@ public void transition(TaskAttemptImpl taskAttempt, //set the launch time taskAttempt.launchTime = taskAttempt.clock.getTime(); + taskAttempt.shufflePort = event.getShufflePort(); + // register it to TaskAttemptListener so that it start listening // for it taskAttempt.taskAttemptListener.register( @@ -1116,7 +1118,7 @@ public void transition(TaskAttemptImpl taskAttempt, new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId), TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), taskAttempt.launchTime, - nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort()); + nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(), taskAttempt.shufflePort); taskAttempt.eventHandler.handle (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase)); taskAttempt.eventHandler.handle @@ -1125,7 +1127,6 @@ public void transition(TaskAttemptImpl taskAttempt, //make remoteTask reference as null as it is no more needed //and free up the memory taskAttempt.remoteTask = null; - taskAttempt.shufflePort = event.getShufflePort(); //tell the Task that attempt has started taskAttempt.eventHandler.handle(new TaskTAttemptEvent( diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr index baef951e53..89844ebd5c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr @@ -173,7 +173,8 @@ {"name": "attemptId", "type": "string"}, {"name": "startTime", "type": "long"}, {"name": "trackerName", "type": "string"}, - {"name": "httpPort", "type": "int"} + {"name": "httpPort", "type": "int"}, + {"name": "shufflePort", "type": "int"} ] }, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java index fe92cfe376..81294d5b65 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java @@ -240,6 +240,7 @@ private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) { attemptInfo.httpPort = event.getHttpPort(); attemptInfo.trackerName = event.getTrackerName(); attemptInfo.taskType = event.getTaskType(); + attemptInfo.shufflePort = event.getShufflePort(); taskInfo.attemptsMap.put(attemptId, attemptInfo); } @@ -506,6 +507,7 @@ public static class TaskAttemptInfo { String trackerName; Counters counters; int httpPort; + int shufflePort; String hostname; /** Create a Task Attempt Info which will store attempt level information @@ -516,6 +518,7 @@ public TaskAttemptInfo() { mapFinishTime = -1; error = state = trackerName = hostname = ""; httpPort = -1; + shufflePort = -1; } /** * Print all the information about this attempt. @@ -530,6 +533,7 @@ public void printAll() { System.out.println("TASK_TYPE:" + taskType); System.out.println("TRACKER_NAME:" + trackerName); System.out.println("HTTP_PORT:" + httpPort); + System.out.println("SHUFFLE_PORT:" + shufflePort); if (counters != null) { System.out.println("COUNTERS:" + counters.toString()); } @@ -563,5 +567,7 @@ public void printAll() { public Counters getCounters() { return counters; } /** @return the HTTP port for the tracker */ public int getHttpPort() { return httpPort; } + /** @return the Shuffle port for the tracker */ + public int getShufflePort() { return shufflePort; } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java index 204e6ba9a8..af61487ed8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java @@ -44,16 +44,18 @@ public class TaskAttemptStartedEvent implements HistoryEvent { * @param startTime Start time of the attempt * @param trackerName Name of the Task Tracker where attempt is running * @param httpPort The port number of the tracker + * @param shufflePort The shuffle port number of the container */ public TaskAttemptStartedEvent( TaskAttemptID attemptId, TaskType taskType, long startTime, String trackerName, - int httpPort) { + int httpPort, int shufflePort) { datum.attemptId = new Utf8(attemptId.toString()); datum.taskid = new Utf8(attemptId.getTaskID().toString()); datum.startTime = startTime; datum.taskType = new Utf8(taskType.name()); datum.trackerName = new Utf8(trackerName); datum.httpPort = httpPort; + datum.shufflePort = shufflePort; } TaskAttemptStartedEvent() {} @@ -75,6 +77,8 @@ public TaskType getTaskType() { } /** Get the HTTP port */ public int getHttpPort() { return datum.httpPort; } + /** Get the shuffle port */ + public int getShufflePort() { return datum.shufflePort; } /** Get the attempt id */ public TaskAttemptID getTaskAttemptId() { return TaskAttemptID.forName(datum.attemptId.toString()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index 64fd88559c..da64718099 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -35,10 +35,13 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; @@ -101,12 +104,27 @@ public void testHistoryParsing() throws Exception { Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks); //Assert at taskAttempt level - for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) { + for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) { int taskAttemptCount = taskInfo.getAllTaskAttempts().size(); - Assert.assertEquals("total number of task attempts ", + Assert.assertEquals("total number of task attempts ", 1, taskAttemptCount); } - + + // Deep compare Job and JobInfo + for (Task task : job.getTasks().values()) { + TaskInfo taskInfo = jobInfo.getAllTasks().get( + TypeConverter.fromYarn(task.getID())); + Assert.assertNotNull("TaskInfo not found", taskInfo); + for (TaskAttempt taskAttempt : task.getAttempts().values()) { + TaskAttemptInfo taskAttemptInfo = + taskInfo.getAllTaskAttempts().get( + TypeConverter.fromYarn((taskAttempt.getID()))); + Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo); + Assert.assertEquals("Incorrect shuffle port for task attempt", + taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort()); + } + } + String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobId); Path summaryFile = new Path(jobhistoryDir, summaryFileName); diff --git a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java index c32ff9108d..cd430e83a6 100644 --- a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java +++ b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java @@ -2676,7 +2676,7 @@ public synchronized boolean completedTask(TaskInProgress tip, TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent( status.getTaskID(), taskType, status.getStartTime(), - status.getTaskTracker(), ttStatus.getHttpPort()); + status.getTaskTracker(), ttStatus.getHttpPort(), -1); jobHistory.logEvent(tse, status.getTaskID().getJobID()); TaskAttemptID statusAttemptID = status.getTaskID(); @@ -3197,7 +3197,7 @@ private void failedTask(TaskInProgress tip, TaskAttemptID taskid, StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0])); TaskType taskType = getTaskType(tip); TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent( - taskid, taskType, startTime, taskTrackerName, taskTrackerPort); + taskid, taskType, startTime, taskTrackerName, taskTrackerPort, -1); jobHistory.logEvent(tse, taskid.getJobID()); diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java index 75c38d8183..31bd95b1cc 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java +++ b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java @@ -48,7 +48,7 @@ private static void testAttemptStartedEventForTypes(EventType expected, TaskType[] types) { for (TaskType t : types) { TaskAttemptStartedEvent tase = - new TaskAttemptStartedEvent(id, t, 0L, "", 0); + new TaskAttemptStartedEvent(id, t, 0L, "", 0, -1); assertEquals(expected, tase.getEventType()); } } diff --git a/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java b/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java index 77f35a7ceb..016b8d9fed 100644 --- a/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java +++ b/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java @@ -79,7 +79,7 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, .parseInt(httpPort); return new TaskAttemptStartedEvent(taskAttemptID, - that.originalTaskType, that.originalStartTime, trackerName, port); + that.originalTaskType, that.originalStartTime, trackerName, port, -1); } return null;