diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index edbb59af34..8ce787dcb2 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -712,6 +712,9 @@ Release 0.23.7 - UNRELEASED MAPREDUCE-4458. Warn if java.library.path is used for AM or Task (Robert Parker via jeagles) + MAPREDUCE-4992. AM hangs in RecoveryService when recovering tasks with + speculative attempts (Robert Parker via jlowe) + Release 0.23.6 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index 4ab61c5235..aca752721a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -21,9 +21,12 @@ package org.apache.hadoop.mapreduce.v2.app.recover; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +38,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; @@ -108,7 +112,7 @@ public class RecoveryService extends CompositeService implements Recovery { private JobInfo jobInfo = null; private final Map completedTasks = new HashMap(); - + private final List pendingTaskScheduleEvents = new ArrayList(); @@ -193,6 +197,14 @@ public class RecoveryService extends CompositeService implements Recovery { .getAllTasks(); for (TaskInfo taskInfo : taskInfos.values()) { if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) { + Iterator> taskAttemptIterator = + taskInfo.getAllTaskAttempts().entrySet().iterator(); + while (taskAttemptIterator.hasNext()) { + Map.Entry currentEntry = taskAttemptIterator.next(); + if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) { + taskAttemptIterator.remove(); + } + } completedTasks .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo); LOG.info("Read from history task " @@ -215,6 +227,7 @@ public class RecoveryService extends CompositeService implements Recovery { JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId); Path histDirPath = FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir)); + LOG.info("Trying file " + histDirPath.toString()); FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf); // read the previous history file historyFile = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 8d6ca2b9b2..2f92cb9369 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -50,11 +50,15 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Test; @@ -734,12 +738,173 @@ public class TestRecovery { app.verifyCompleted(); validateOutput(); } - + + /** + * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt + * completely disappears because of failed launch, one attempt gets killed and + * one attempt succeeds. AM crashes after the first tasks finishes and + * recovers completely and succeeds in the second generation. + * + * @throws Exception + */ + @Test + public void testSpeculative() throws Exception { + + int runCount = 0; + long am1StartTimeEst = System.currentTimeMillis(); + MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + long jobStartTime = job.getReport().getStartTime(); + //all maps would be running + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + + Iterator it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task mapTask2 = it.next(); + Task reduceTask = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + + // Launch a Speculative Task for the first Task + app.getContext().getEventHandler().handle( + new TaskEvent(mapTask1.getID(), TaskEventType.T_ADD_SPEC_ATTEMPT)); + int timeOut = 0; + while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) { + Thread.sleep(1000); + LOG.info("Waiting for next attempt to start"); + } + Iterator t1it = mapTask1.getAttempts().values().iterator(); + TaskAttempt task1Attempt1 = t1it.next(); + TaskAttempt task1Attempt2 = t1it.next(); + TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next(); + + ContainerId t1a2contId = task1Attempt2.getAssignedContainerID(); + + LOG.info(t1a2contId.toString()); + LOG.info(task1Attempt1.getID().toString()); + LOG.info(task1Attempt2.getID().toString()); + + // Launch container for speculative attempt + app.getContext().getEventHandler().handle( + new TaskAttemptContainerLaunchedEvent(task1Attempt2.getID(), runCount)); + + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task1Attempt1, TaskAttemptState.RUNNING); + app.waitForState(task1Attempt2, TaskAttemptState.RUNNING); + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + + // reduces must be in NEW state + Assert.assertEquals("Reduce Task state not correct", + TaskState.RUNNING, reduceTask.getReport().getTaskState()); + + //send the done signal to the map 1 attempt 1 + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task1Attempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + app.waitForState(task1Attempt1, TaskAttemptState.SUCCEEDED); + + //wait for first map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + long task1StartTime = mapTask1.getReport().getStartTime(); + long task1FinishTime = mapTask1.getReport().getFinishTime(); + + //stop the app + app.stop(); + + //rerun + //in rerun the 1st map will be recovered from previous run + long am2StartTimeEst = System.currentTimeMillis(); + app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount); + conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + //all maps would be running + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + reduceTask = it.next(); + + // first map will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + app.waitForState(mapTask2, TaskState.RUNNING); + + task2Attempt = mapTask2.getAttempts().values().iterator().next(); + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + + //send the done signal to the 2nd map task + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapTask2.getAttempts().values().iterator().next().getID(), + TaskAttemptEventType.TA_DONE)); + + //wait to get it completed + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + //wait for reduce to be running before sending done + app.waitForState(reduceTask, TaskState.RUNNING); + + //send the done signal to the reduce + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + reduceTask.getAttempts().values().iterator().next().getID(), + TaskAttemptEventType.TA_DONE)); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + Assert.assertEquals("Job Start time not correct", + jobStartTime, job.getReport().getStartTime()); + Assert.assertEquals("Task Start time not correct", + task1StartTime, mapTask1.getReport().getStartTime()); + Assert.assertEquals("Task Finish time not correct", + task1FinishTime, mapTask1.getReport().getFinishTime()); + Assert.assertEquals(2, job.getAMInfos().size()); + int attemptNum = 1; + // Verify AMInfo + for (AMInfo amInfo : job.getAMInfos()) { + Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId() + .getAttemptId()); + Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId() + .getApplicationAttemptId()); + Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost()); + Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort()); + Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort()); + } + long am1StartTimeReal = job.getAMInfos().get(0).getStartTime(); + long am2StartTimeReal = job.getAMInfos().get(1).getStartTime(); + Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst + && am1StartTimeReal <= am2StartTimeEst); + Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst + && am2StartTimeReal <= System.currentTimeMillis()); + + } + private void writeBadOutput(TaskAttempt attempt, Configuration conf) throws Exception { TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, TypeConverter.fromYarn(attempt.getID())); - + TextOutputFormat theOutputFormat = new TextOutputFormat(); RecordWriter theRecordWriter = theOutputFormat .getRecordWriter(tContext); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java index 36dc77ef2b..eb8db667e8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java @@ -246,6 +246,7 @@ public class JobHistoryParser implements HistoryEventHandler { attemptInfo.state = StringInterner.weakIntern(event.getState()); attemptInfo.counters = event.getCounters(); attemptInfo.hostname = StringInterner.weakIntern(event.getHostname()); + info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo); } private void handleReduceAttemptFinishedEvent @@ -262,6 +263,7 @@ public class JobHistoryParser implements HistoryEventHandler { attemptInfo.hostname = StringInterner.weakIntern(event.getHostname()); attemptInfo.port = event.getPort(); attemptInfo.rackname = StringInterner.weakIntern(event.getRackName()); + info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo); } private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) { @@ -276,6 +278,7 @@ public class JobHistoryParser implements HistoryEventHandler { attemptInfo.hostname = StringInterner.weakIntern(event.getHostname()); attemptInfo.port = event.getPort(); attemptInfo.rackname = StringInterner.weakIntern(event.getRackName()); + info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo); } private void handleTaskAttemptFailedEvent( @@ -306,6 +309,7 @@ public class JobHistoryParser implements HistoryEventHandler { taskInfo.successfulAttemptId = null; } } + info.completedTaskAttemptsMap.put(event.getTaskAttemptId(), attemptInfo); } private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) { @@ -443,6 +447,7 @@ public class JobHistoryParser implements HistoryEventHandler { Map jobACLs; Map tasksMap; + Map completedTaskAttemptsMap; List amInfos; AMInfo latestAmInfo; boolean uberized; @@ -456,6 +461,7 @@ public class JobHistoryParser implements HistoryEventHandler { finishedMaps = finishedReduces = 0; username = jobname = jobConfPath = jobQueueName = ""; tasksMap = new HashMap(); + completedTaskAttemptsMap = new HashMap(); jobACLs = new HashMap(); priority = JobPriority.NORMAL; } @@ -530,6 +536,8 @@ public class JobHistoryParser implements HistoryEventHandler { public Counters getReduceCounters() { return reduceCounters; } /** @return the map of all tasks in this job */ public Map getAllTasks() { return tasksMap; } + /** @return the map of all completed task attempts in this job */ + public Map getAllCompletedTaskAttempts() { return completedTaskAttemptsMap; } /** @return the priority of this job */ public String getPriority() { return priority.toString(); } public Map getJobACLs() { return jobACLs; }