From 03c1015d6ea92c782e9ad78794cf1e9640f71040 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Tue, 6 Sep 2011 23:36:19 +0000 Subject: [PATCH] MAPREDUCE-2800. Set final progress for tasks to ensure all task information is correctly logged to JobHistory. Contributed by Siddharth Seth. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1165930 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../v2/app/job/impl/TaskAttemptImpl.java | 2 ++ .../java/org/apache/hadoop/mapred/MapTask.java | 2 ++ .../apache/hadoop/mapreduce/v2/TestMRJobs.java | 16 +++++++++++++++- 4 files changed, 22 insertions(+), 1 deletion(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 8da61f381a..9b89babf96 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1207,6 +1207,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2687. Fix NodeManager to use the right version of LocalDirAllocator.getLocalPathToWrite. (mahadev & acmurthy) + MAPREDUCE-2800. Set final progress for tasks to ensure all task information + is correctly logged to JobHistory. (Siddharth Seth via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 9bf67ef27e..98cf9fbd9f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1380,6 +1380,8 @@ public void transition(TaskAttemptImpl taskAttempt, // for it taskAttempt.taskAttemptListener.unregister( taskAttempt.attemptId, taskAttempt.jvmID); + taskAttempt.reportedStatus.progress = 1.0f; + taskAttempt.updateProgressSplits(); //send the cleanup event to containerLauncher taskAttempt.eventHandler.handle(new ContainerLauncherEvent( taskAttempt.attemptId, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index 97ebfa1dc6..01c0b1bba4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -1736,6 +1736,7 @@ private void mergeParts() throws IOException, InterruptedException, indexCacheList.get(0).writeToFile( mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job); } + sortPhase.complete(); return; } @@ -1776,6 +1777,7 @@ private void mergeParts() throws IOException, InterruptedException, } finally { finalOut.close(); } + sortPhase.complete(); return; } { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index 00709f316b..c0747b0d53 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskCompletionEvent; import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; @@ -151,7 +152,7 @@ public void testSleepJob() throws IOException, InterruptedException, Assert.assertTrue(succeeded); Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); verifySleepJobCounters(job); - + verifyTaskProgress(job); // TODO later: add explicit "isUber()" checks of some sort (extend // JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value @@ -173,6 +174,18 @@ protected void verifySleepJobCounters(Job job) throws InterruptedException, .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); } + + protected void verifyTaskProgress(Job job) throws InterruptedException, + IOException { + for (TaskReport taskReport : job.getTaskReports(TaskType.MAP)) { + Assert.assertTrue(0.9999f < taskReport.getProgress() + && 1.0001f > taskReport.getProgress()); + } + for (TaskReport taskReport : job.getTaskReports(TaskType.REDUCE)) { + Assert.assertTrue(0.9999f < taskReport.getProgress() + && 1.0001f > taskReport.getProgress()); + } + } @Test public void testRandomWriter() throws IOException, InterruptedException, @@ -198,6 +211,7 @@ public void testRandomWriter() throws IOException, InterruptedException, boolean succeeded = job.waitForCompletion(true); Assert.assertTrue(succeeded); Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + // Make sure there are three files in the output-dir RemoteIterator iterator =