MAPREDUCE-5795. Fixed MRAppMaster to record the correct job-state after it recovers from a commit during a previous attempt. Contributed by Xuan Gong.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1581180 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c2ef7e239e
commit
6015e95941
@ -263,6 +263,9 @@ Release 2.4.0 - UNRELEASED
|
||||
FadviseFileRegion::transferTo does not read disks efficiently.
|
||||
(Nikola Vujic via cnauroth)
|
||||
|
||||
MAPREDUCE-5795. Fixed MRAppMaster to record the correct job-state after it
|
||||
recovers from a commit during a previous attempt. (Xuan Gong via vinodkv)
|
||||
|
||||
Release 2.3.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -31,6 +31,7 @@
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
@ -49,6 +50,7 @@
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
||||
@ -348,7 +350,9 @@ protected void serviceStop() throws Exception {
|
||||
JobUnsuccessfulCompletionEvent jucEvent =
|
||||
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
|
||||
System.currentTimeMillis(), job.getCompletedMaps(),
|
||||
job.getCompletedReduces(), JobState.KILLED.toString(),
|
||||
job.getCompletedReduces(),
|
||||
createJobStateForJobUnsuccessfulCompletionEvent(
|
||||
mi.getForcedJobStateOnShutDown()),
|
||||
job.getDiagnostics());
|
||||
JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
|
||||
//Bypass the queue mechanism which might wait. Call the method directly
|
||||
@ -381,9 +385,10 @@ protected EventWriter createEventWriter(Path historyFilePath)
|
||||
* This should be the first call to history for a job
|
||||
*
|
||||
* @param jobId the jobId.
|
||||
* @param forcedJobStateOnShutDown
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void setupEventWriter(JobId jobId)
|
||||
protected void setupEventWriter(JobId jobId, String forcedJobStateOnShutDown)
|
||||
throws IOException {
|
||||
if (stagingDirPath == null) {
|
||||
LOG.error("Log Directory is null, returning");
|
||||
@ -438,7 +443,7 @@ protected void setupEventWriter(JobId jobId)
|
||||
}
|
||||
|
||||
MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer,
|
||||
user, jobName, jobId);
|
||||
user, jobName, jobId, forcedJobStateOnShutDown);
|
||||
fi.getJobSummary().setJobId(jobId);
|
||||
fileMap.put(jobId, fi);
|
||||
}
|
||||
@ -481,13 +486,17 @@ private boolean isJobCompletionEvent(HistoryEvent historyEvent) {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected void handleEvent(JobHistoryEvent event) {
|
||||
@Private
|
||||
public void handleEvent(JobHistoryEvent event) {
|
||||
synchronized (lock) {
|
||||
|
||||
// If this is JobSubmitted Event, setup the writer
|
||||
if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) {
|
||||
try {
|
||||
setupEventWriter(event.getJobID());
|
||||
AMStartedEvent amStartedEvent =
|
||||
(AMStartedEvent) event.getHistoryEvent();
|
||||
setupEventWriter(event.getJobID(),
|
||||
amStartedEvent.getForcedJobStateOnShutDown());
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
|
||||
ioe);
|
||||
@ -804,9 +813,10 @@ protected class MetaInfo {
|
||||
Timer flushTimer;
|
||||
FlushTimerTask flushTimerTask;
|
||||
private boolean isTimerShutDown = false;
|
||||
private String forcedJobStateOnShutDown;
|
||||
|
||||
MetaInfo(Path historyFile, Path conf, EventWriter writer, String user,
|
||||
String jobName, JobId jobId) {
|
||||
String jobName, JobId jobId, String forcedJobStateOnShutDown) {
|
||||
this.historyFile = historyFile;
|
||||
this.confFile = conf;
|
||||
this.writer = writer;
|
||||
@ -814,6 +824,7 @@ protected class MetaInfo {
|
||||
new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null);
|
||||
this.jobSummary = new JobSummary();
|
||||
this.flushTimer = new Timer("FlushTimer", true);
|
||||
this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
|
||||
}
|
||||
|
||||
Path getHistoryFile() {
|
||||
@ -840,6 +851,10 @@ boolean isTimerShutDown() {
|
||||
return isTimerShutDown;
|
||||
}
|
||||
|
||||
String getForcedJobStateOnShutDown() {
|
||||
return forcedJobStateOnShutDown;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Job MetaInfo for "+ jobSummary.getJobId()
|
||||
@ -983,4 +998,20 @@ public void setForcejobCompletion(boolean forceJobCompletion) {
|
||||
LOG.info("JobHistoryEventHandler notified that forceJobCompletion is "
|
||||
+ forceJobCompletion);
|
||||
}
|
||||
|
||||
private String createJobStateForJobUnsuccessfulCompletionEvent(
|
||||
String forcedJobStateOnShutDown) {
|
||||
if (forcedJobStateOnShutDown == null || forcedJobStateOnShutDown
|
||||
.isEmpty()) {
|
||||
return JobState.KILLED.toString();
|
||||
} else if (forcedJobStateOnShutDown.equals(
|
||||
JobStateInternal.ERROR.toString()) ||
|
||||
forcedJobStateOnShutDown.equals(JobStateInternal.FAILED.toString())) {
|
||||
return JobState.FAILED.toString();
|
||||
} else if (forcedJobStateOnShutDown.equals(JobStateInternal.SUCCEEDED
|
||||
.toString())) {
|
||||
return JobState.SUCCEEDED.toString();
|
||||
}
|
||||
return JobState.KILLED.toString();
|
||||
}
|
||||
}
|
||||
|
@ -1026,14 +1026,13 @@ protected void serviceStart() throws Exception {
|
||||
AMInfo amInfo =
|
||||
MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
|
||||
nmPort, nmHttpPort);
|
||||
amInfos.add(amInfo);
|
||||
|
||||
// /////////////////// Create the job itself.
|
||||
job = createJob(getConfig(), forcedState, shutDownMessage);
|
||||
|
||||
// End of creating the job.
|
||||
|
||||
// Send out an MR AM inited event for this AM and all previous AMs.
|
||||
// Send out an MR AM inited event for all previous AMs.
|
||||
for (AMInfo info : amInfos) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new JobHistoryEvent(job.getID(), new AMStartedEvent(info
|
||||
@ -1042,6 +1041,15 @@ protected void serviceStart() throws Exception {
|
||||
.getNodeManagerHttpPort())));
|
||||
}
|
||||
|
||||
// Send out an MR AM inited event for this AM.
|
||||
dispatcher.getEventHandler().handle(
|
||||
new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo
|
||||
.getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),
|
||||
amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo
|
||||
.getNodeManagerHttpPort(), this.forcedState == null ? null
|
||||
: this.forcedState.toString())));
|
||||
amInfos.add(amInfo);
|
||||
|
||||
// metrics system init is really init & start.
|
||||
// It's more test friendly to put it here.
|
||||
DefaultMetricsSystem.initialize("MRAppMaster");
|
||||
|
@ -497,7 +497,7 @@ public void addToFileMap(JobId jobId) {
|
||||
JobHistoryEvent lastEventHandled;
|
||||
int eventsHandled = 0;
|
||||
@Override
|
||||
protected void handleEvent(JobHistoryEvent event) {
|
||||
public void handleEvent(JobHistoryEvent event) {
|
||||
this.lastEventHandled = event;
|
||||
this.eventsHandled++;
|
||||
}
|
||||
|
@ -21,7 +21,8 @@
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.times;
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
@ -44,6 +45,10 @@
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
|
||||
@ -70,6 +75,8 @@
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestMRAppMaster {
|
||||
private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
|
||||
@ -120,7 +127,7 @@ public void testMRAppMasterForDifferentUser() throws IOException,
|
||||
assertEquals(userStagingPath.toString(),
|
||||
appMaster.stagingDirPath.toString());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMRAppMasterMidLock() throws IOException,
|
||||
InterruptedException {
|
||||
@ -154,6 +161,9 @@ public void testMRAppMasterMidLock() throws IOException,
|
||||
assertTrue(appMaster.errorHappenedShutDown);
|
||||
assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
|
||||
appMaster.stop();
|
||||
|
||||
// verify the final status is FAILED
|
||||
verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -190,6 +200,9 @@ public void testMRAppMasterSuccessLock() throws IOException,
|
||||
assertTrue(appMaster.errorHappenedShutDown);
|
||||
assertEquals(JobStateInternal.SUCCEEDED, appMaster.forcedState);
|
||||
appMaster.stop();
|
||||
|
||||
// verify the final status is SUCCEEDED
|
||||
verifyFailedStatus((MRAppMasterTest)appMaster, "SUCCEEDED");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -226,6 +239,9 @@ public void testMRAppMasterFailLock() throws IOException,
|
||||
assertTrue(appMaster.errorHappenedShutDown);
|
||||
assertEquals(JobStateInternal.FAILED, appMaster.forcedState);
|
||||
appMaster.stop();
|
||||
|
||||
// verify the final status is FAILED
|
||||
verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -423,8 +439,20 @@ public void testMRAppMasterCredentials() throws Exception {
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyFailedStatus(MRAppMasterTest appMaster,
|
||||
String expectedJobState) {
|
||||
ArgumentCaptor<JobHistoryEvent> captor = ArgumentCaptor
|
||||
.forClass(JobHistoryEvent.class);
|
||||
// handle two events: AMStartedEvent and JobUnsuccessfulCompletionEvent
|
||||
verify(appMaster.spyHistoryService, times(2))
|
||||
.handleEvent(captor.capture());
|
||||
HistoryEvent event = captor.getValue().getHistoryEvent();
|
||||
assertTrue(event instanceof JobUnsuccessfulCompletionEvent);
|
||||
assertEquals(((JobUnsuccessfulCompletionEvent) event).getStatus()
|
||||
, expectedJobState);
|
||||
}
|
||||
}
|
||||
class MRAppMasterTest extends MRAppMaster {
|
||||
|
||||
Path stagingDirPath;
|
||||
@ -434,6 +462,7 @@ class MRAppMasterTest extends MRAppMaster {
|
||||
ContainerAllocator mockContainerAllocator;
|
||||
CommitterEventHandler mockCommitterEventHandler;
|
||||
RMHeartbeatHandler mockRMHeartbeatHandler;
|
||||
JobHistoryEventHandler spyHistoryService;
|
||||
|
||||
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
|
||||
ContainerId containerId, String host, int port, int httpPort,
|
||||
@ -502,4 +531,14 @@ public Credentials getCredentials() {
|
||||
public UserGroupInformation getUgi() {
|
||||
return currentUser;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
|
||||
AppContext context) {
|
||||
spyHistoryService =
|
||||
Mockito.spy((JobHistoryEventHandler) super
|
||||
.createJobHistoryHandler(context));
|
||||
spyHistoryService.setForcejobCompletion(this.isLastAMRetry);
|
||||
return spyHistoryService;
|
||||
}
|
||||
}
|
||||
|
@ -34,6 +34,7 @@
|
||||
@InterfaceStability.Unstable
|
||||
public class AMStartedEvent implements HistoryEvent {
|
||||
private AMStarted datum = new AMStarted();
|
||||
private String forcedJobStateOnShutDown;
|
||||
|
||||
/**
|
||||
* Create an event to record the start of an MR AppMaster
|
||||
@ -54,12 +55,38 @@ public class AMStartedEvent implements HistoryEvent {
|
||||
public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
|
||||
ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
|
||||
int nodeManagerHttpPort) {
|
||||
this(appAttemptId, startTime, containerId, nodeManagerHost,
|
||||
nodeManagerPort, nodeManagerHttpPort, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an event to record the start of an MR AppMaster
|
||||
*
|
||||
* @param appAttemptId
|
||||
* the application attempt id.
|
||||
* @param startTime
|
||||
* the start time of the AM.
|
||||
* @param containerId
|
||||
* the containerId of the AM.
|
||||
* @param nodeManagerHost
|
||||
* the node on which the AM is running.
|
||||
* @param nodeManagerPort
|
||||
* the port on which the AM is running.
|
||||
* @param nodeManagerHttpPort
|
||||
* the httpPort for the node running the AM.
|
||||
* @param forcedJobStateOnShutDown
|
||||
* the state to force the job into
|
||||
*/
|
||||
public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
|
||||
ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
|
||||
int nodeManagerHttpPort, String forcedJobStateOnShutDown) {
|
||||
datum.applicationAttemptId = new Utf8(appAttemptId.toString());
|
||||
datum.startTime = startTime;
|
||||
datum.containerId = new Utf8(containerId.toString());
|
||||
datum.nodeManagerHost = new Utf8(nodeManagerHost);
|
||||
datum.nodeManagerPort = nodeManagerPort;
|
||||
datum.nodeManagerHttpPort = nodeManagerHttpPort;
|
||||
this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
|
||||
}
|
||||
|
||||
AMStartedEvent() {
|
||||
@ -116,6 +143,13 @@ public int getNodeManagerHttpPort() {
|
||||
return datum.nodeManagerHttpPort;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the state to force the job into
|
||||
*/
|
||||
public String getForcedJobStateOnShutDown() {
|
||||
return this.forcedJobStateOnShutDown;
|
||||
}
|
||||
|
||||
/** Get the attempt id */
|
||||
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user