From 6015e9594180f157472a88030c85c5599fdc289c Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 25 Mar 2014 02:00:39 +0000 Subject: [PATCH] MAPREDUCE-5795. Fixed MRAppMaster to record the correct job-state after it recovers from a commit during a previous attempt. Contributed by Xuan Gong. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1581180 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../jobhistory/JobHistoryEventHandler.java | 43 +++++++++++++++--- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 12 ++++- .../TestJobHistoryEventHandler.java | 2 +- .../mapreduce/v2/app/TestMRAppMaster.java | 45 +++++++++++++++++-- .../mapreduce/jobhistory/AMStartedEvent.java | 34 ++++++++++++++ 6 files changed, 127 insertions(+), 12 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0e67214469..05e882f5d5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -263,6 +263,9 @@ Release 2.4.0 - UNRELEASED FadviseFileRegion::transferTo does not read disks efficiently. (Nikola Vujic via cnauroth) + MAPREDUCE-5795. Fixed MRAppMaster to record the correct job-state after it + recovers from a commit during a previous attempt. (Xuan Gong via vinodkv) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 7ab02287b1..f053e9600a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -49,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; @@ -348,7 +350,9 @@ protected void serviceStop() throws Exception { JobUnsuccessfulCompletionEvent jucEvent = new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose), System.currentTimeMillis(), job.getCompletedMaps(), - job.getCompletedReduces(), JobState.KILLED.toString(), + job.getCompletedReduces(), + createJobStateForJobUnsuccessfulCompletionEvent( + mi.getForcedJobStateOnShutDown()), job.getDiagnostics()); JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent); //Bypass the queue mechanism which might wait. Call the method directly @@ -381,9 +385,10 @@ protected EventWriter createEventWriter(Path historyFilePath) * This should be the first call to history for a job * * @param jobId the jobId. + * @param forcedJobStateOnShutDown * @throws IOException */ - protected void setupEventWriter(JobId jobId) + protected void setupEventWriter(JobId jobId, String forcedJobStateOnShutDown) throws IOException { if (stagingDirPath == null) { LOG.error("Log Directory is null, returning"); @@ -438,7 +443,7 @@ protected void setupEventWriter(JobId jobId) } MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer, - user, jobName, jobId); + user, jobName, jobId, forcedJobStateOnShutDown); fi.getJobSummary().setJobId(jobId); fileMap.put(jobId, fi); } @@ -481,13 +486,17 @@ private boolean isJobCompletionEvent(HistoryEvent historyEvent) { return false; } - protected void handleEvent(JobHistoryEvent event) { + @Private + public void handleEvent(JobHistoryEvent event) { synchronized (lock) { // If this is JobSubmitted Event, setup the writer if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) { try { - setupEventWriter(event.getJobID()); + AMStartedEvent amStartedEvent = + (AMStartedEvent) event.getHistoryEvent(); + setupEventWriter(event.getJobID(), + amStartedEvent.getForcedJobStateOnShutDown()); } catch (IOException ioe) { LOG.error("Error JobHistoryEventHandler in handleEvent: " + event, ioe); @@ -804,9 +813,10 @@ protected class MetaInfo { Timer flushTimer; FlushTimerTask flushTimerTask; private boolean isTimerShutDown = false; + private String forcedJobStateOnShutDown; MetaInfo(Path historyFile, Path conf, EventWriter writer, String user, - String jobName, JobId jobId) { + String jobName, JobId jobId, String forcedJobStateOnShutDown) { this.historyFile = historyFile; this.confFile = conf; this.writer = writer; @@ -814,6 +824,7 @@ protected class MetaInfo { new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null); this.jobSummary = new JobSummary(); this.flushTimer = new Timer("FlushTimer", true); + this.forcedJobStateOnShutDown = forcedJobStateOnShutDown; } Path getHistoryFile() { @@ -840,6 +851,10 @@ boolean isTimerShutDown() { return isTimerShutDown; } + String getForcedJobStateOnShutDown() { + return forcedJobStateOnShutDown; + } + @Override public String toString() { return "Job MetaInfo for "+ jobSummary.getJobId() @@ -983,4 +998,20 @@ public void setForcejobCompletion(boolean forceJobCompletion) { LOG.info("JobHistoryEventHandler notified that forceJobCompletion is " + forceJobCompletion); } + + private String createJobStateForJobUnsuccessfulCompletionEvent( + String forcedJobStateOnShutDown) { + if (forcedJobStateOnShutDown == null || forcedJobStateOnShutDown + .isEmpty()) { + return JobState.KILLED.toString(); + } else if (forcedJobStateOnShutDown.equals( + JobStateInternal.ERROR.toString()) || + forcedJobStateOnShutDown.equals(JobStateInternal.FAILED.toString())) { + return JobState.FAILED.toString(); + } else if (forcedJobStateOnShutDown.equals(JobStateInternal.SUCCEEDED + .toString())) { + return JobState.SUCCEEDED.toString(); + } + return JobState.KILLED.toString(); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 89482f4352..1656d3c0b1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -1026,14 +1026,13 @@ protected void serviceStart() throws Exception { AMInfo amInfo = MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost, nmPort, nmHttpPort); - amInfos.add(amInfo); // /////////////////// Create the job itself. job = createJob(getConfig(), forcedState, shutDownMessage); // End of creating the job. - // Send out an MR AM inited event for this AM and all previous AMs. + // Send out an MR AM inited event for all previous AMs. for (AMInfo info : amInfos) { dispatcher.getEventHandler().handle( new JobHistoryEvent(job.getID(), new AMStartedEvent(info @@ -1042,6 +1041,15 @@ protected void serviceStart() throws Exception { .getNodeManagerHttpPort()))); } + // Send out an MR AM inited event for this AM. + dispatcher.getEventHandler().handle( + new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo + .getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(), + amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo + .getNodeManagerHttpPort(), this.forcedState == null ? null + : this.forcedState.toString()))); + amInfos.add(amInfo); + // metrics system init is really init & start. // It's more test friendly to put it here. DefaultMetricsSystem.initialize("MRAppMaster"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index 7964b9c815..33e4a56d22 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -497,7 +497,7 @@ public void addToFileMap(JobId jobId) { JobHistoryEvent lastEventHandled; int eventsHandled = 0; @Override - protected void handleEvent(JobHistoryEvent event) { + public void handleEvent(JobHistoryEvent event) { this.lastEventHandled = event; this.eventsHandled++; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java index 6d6510f40c..ad390b5ff7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java @@ -21,7 +21,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; - +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -44,6 +45,10 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; +import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent; @@ -70,6 +75,8 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; public class TestMRAppMaster { private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class); @@ -120,7 +127,7 @@ public void testMRAppMasterForDifferentUser() throws IOException, assertEquals(userStagingPath.toString(), appMaster.stagingDirPath.toString()); } - + @Test public void testMRAppMasterMidLock() throws IOException, InterruptedException { @@ -154,6 +161,9 @@ public void testMRAppMasterMidLock() throws IOException, assertTrue(appMaster.errorHappenedShutDown); assertEquals(JobStateInternal.ERROR, appMaster.forcedState); appMaster.stop(); + + // verify the final status is FAILED + verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED"); } @Test @@ -190,6 +200,9 @@ public void testMRAppMasterSuccessLock() throws IOException, assertTrue(appMaster.errorHappenedShutDown); assertEquals(JobStateInternal.SUCCEEDED, appMaster.forcedState); appMaster.stop(); + + // verify the final status is SUCCEEDED + verifyFailedStatus((MRAppMasterTest)appMaster, "SUCCEEDED"); } @Test @@ -226,6 +239,9 @@ public void testMRAppMasterFailLock() throws IOException, assertTrue(appMaster.errorHappenedShutDown); assertEquals(JobStateInternal.FAILED, appMaster.forcedState); appMaster.stop(); + + // verify the final status is FAILED + verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED"); } @Test @@ -423,8 +439,20 @@ public void testMRAppMasterCredentials() throws Exception { } -} + private void verifyFailedStatus(MRAppMasterTest appMaster, + String expectedJobState) { + ArgumentCaptor captor = ArgumentCaptor + .forClass(JobHistoryEvent.class); + // handle two events: AMStartedEvent and JobUnsuccessfulCompletionEvent + verify(appMaster.spyHistoryService, times(2)) + .handleEvent(captor.capture()); + HistoryEvent event = captor.getValue().getHistoryEvent(); + assertTrue(event instanceof JobUnsuccessfulCompletionEvent); + assertEquals(((JobUnsuccessfulCompletionEvent) event).getStatus() + , expectedJobState); + } +} class MRAppMasterTest extends MRAppMaster { Path stagingDirPath; @@ -434,6 +462,7 @@ class MRAppMasterTest extends MRAppMaster { ContainerAllocator mockContainerAllocator; CommitterEventHandler mockCommitterEventHandler; RMHeartbeatHandler mockRMHeartbeatHandler; + JobHistoryEventHandler spyHistoryService; public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String host, int port, int httpPort, @@ -502,4 +531,14 @@ public Credentials getCredentials() { public UserGroupInformation getUgi() { return currentUser; } + + @Override + protected EventHandler createJobHistoryHandler( + AppContext context) { + spyHistoryService = + Mockito.spy((JobHistoryEventHandler) super + .createJobHistoryHandler(context)); + spyHistoryService.setForcejobCompletion(this.isLastAMRetry); + return spyHistoryService; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java index 1e7ce4c774..d1a378bc20 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java @@ -34,6 +34,7 @@ @InterfaceStability.Unstable public class AMStartedEvent implements HistoryEvent { private AMStarted datum = new AMStarted(); + private String forcedJobStateOnShutDown; /** * Create an event to record the start of an MR AppMaster @@ -54,12 +55,38 @@ public class AMStartedEvent implements HistoryEvent { public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime, ContainerId containerId, String nodeManagerHost, int nodeManagerPort, int nodeManagerHttpPort) { + this(appAttemptId, startTime, containerId, nodeManagerHost, + nodeManagerPort, nodeManagerHttpPort, null); + } + + /** + * Create an event to record the start of an MR AppMaster + * + * @param appAttemptId + * the application attempt id. + * @param startTime + * the start time of the AM. + * @param containerId + * the containerId of the AM. + * @param nodeManagerHost + * the node on which the AM is running. + * @param nodeManagerPort + * the port on which the AM is running. + * @param nodeManagerHttpPort + * the httpPort for the node running the AM. + * @param forcedJobStateOnShutDown + * the state to force the job into + */ + public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime, + ContainerId containerId, String nodeManagerHost, int nodeManagerPort, + int nodeManagerHttpPort, String forcedJobStateOnShutDown) { datum.applicationAttemptId = new Utf8(appAttemptId.toString()); datum.startTime = startTime; datum.containerId = new Utf8(containerId.toString()); datum.nodeManagerHost = new Utf8(nodeManagerHost); datum.nodeManagerPort = nodeManagerPort; datum.nodeManagerHttpPort = nodeManagerHttpPort; + this.forcedJobStateOnShutDown = forcedJobStateOnShutDown; } AMStartedEvent() { @@ -116,6 +143,13 @@ public int getNodeManagerHttpPort() { return datum.nodeManagerHttpPort; } + /** + * @return the state to force the job into + */ + public String getForcedJobStateOnShutDown() { + return this.forcedJobStateOnShutDown; + } + /** Get the attempt id */ @Override