MAPREDUCE-6259. IllegalArgumentException due to missing job submit time. Contributed by zhihai xu

This commit is contained in:
Jason Lowe 2015-05-04 20:39:18 +00:00
parent 3fe79e1db8
commit bf70c5ae28
5 changed files with 77 additions and 18 deletions

View File

@ -397,6 +397,9 @@ Release 2.7.1 - UNRELEASED
MAPREDUCE-6339. Job history file is not flushed correctly because isTimerActive
flag is not set true when flushTimerTask is scheduled. (zhihai xu via devaraj)
MAPREDUCE-6259. IllegalArgumentException due to missing job submit time
(zhihai xu via jlowe)
Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES

View File

@ -426,10 +426,10 @@ protected EventWriter createEventWriter(Path historyFilePath)
* This should be the first call to history for a job
*
* @param jobId the jobId.
* @param forcedJobStateOnShutDown
* @param amStartedEvent
* @throws IOException
*/
protected void setupEventWriter(JobId jobId, String forcedJobStateOnShutDown)
protected void setupEventWriter(JobId jobId, AMStartedEvent amStartedEvent)
throws IOException {
if (stagingDirPath == null) {
LOG.error("Log Directory is null, returning");
@ -489,8 +489,13 @@ protected void setupEventWriter(JobId jobId, String forcedJobStateOnShutDown)
}
MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer,
user, jobName, jobId, forcedJobStateOnShutDown, queueName);
user, jobName, jobId, amStartedEvent.getForcedJobStateOnShutDown(),
queueName);
fi.getJobSummary().setJobId(jobId);
fi.getJobSummary().setJobLaunchTime(amStartedEvent.getStartTime());
fi.getJobSummary().setJobSubmitTime(amStartedEvent.getSubmitTime());
fi.getJobIndexInfo().setJobStartTime(amStartedEvent.getStartTime());
fi.getJobIndexInfo().setSubmitTime(amStartedEvent.getSubmitTime());
fileMap.put(jobId, fi);
}
@ -541,8 +546,7 @@ public void handleEvent(JobHistoryEvent event) {
try {
AMStartedEvent amStartedEvent =
(AMStartedEvent) event.getHistoryEvent();
setupEventWriter(event.getJobID(),
amStartedEvent.getForcedJobStateOnShutDown());
setupEventWriter(event.getJobID(), amStartedEvent);
} catch (IOException ioe) {
LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
ioe);
@ -982,6 +986,7 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
tEvent.addEventInfo("NODE_MANAGER_HTTP_PORT",
ase.getNodeManagerHttpPort());
tEvent.addEventInfo("START_TIME", ase.getStartTime());
tEvent.addEventInfo("SUBMIT_TIME", ase.getSubmitTime());
tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString());
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);

View File

@ -1052,7 +1052,7 @@ protected void serviceStart() throws Exception {
new JobHistoryEvent(job.getID(), new AMStartedEvent(info
.getAppAttemptId(), info.getStartTime(), info.getContainerId(),
info.getNodeManagerHost(), info.getNodeManagerPort(), info
.getNodeManagerHttpPort())));
.getNodeManagerHttpPort(), appSubmitTime)));
}
// Send out an MR AM inited event for this AM.
@ -1061,7 +1061,7 @@ protected void serviceStart() throws Exception {
.getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),
amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo
.getNodeManagerHttpPort(), this.forcedState == null ? null
: this.forcedState.toString())));
: this.forcedState.toString(), appSubmitTime)));
amInfos.add(amInfo);
// metrics system init is really init & start.

View File

@ -125,7 +125,7 @@ public void testFirstFlushOnCompletionEvent() throws Exception {
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
@ -168,7 +168,7 @@ public void testMaxUnflushedCompletionEvents() throws Exception {
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
@ -213,7 +213,7 @@ public void testUnflushedTimer() throws Exception {
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
@ -256,7 +256,7 @@ public void testBatchedFlushJobEndMultiplier() throws Exception {
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
@ -293,7 +293,7 @@ public void testProcessDoneFilesOnLastAMRetry() throws Exception {
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId,
@ -338,7 +338,7 @@ public void testProcessDoneFilesNotLastAMRetry() throws Exception {
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(t.jobId);
// skip processing done files
@ -395,7 +395,7 @@ public void testDefaultFsIsUsedForHistory() throws Exception {
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
@ -441,6 +441,47 @@ public void testGetHistoryIntermediateDoneDirForUser() throws IOException {
pathStr);
}
// test AMStartedEvent for submitTime and startTime
@Test (timeout=50000)
public void testAMStartedEvent() throws Exception {
TestParams t = new TestParams();
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
EventWriter mockWriter = null;
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, 100)));
JobHistoryEventHandler.MetaInfo mi =
JobHistoryEventHandler.fileMap.get(t.jobId);
Assert.assertEquals(mi.getJobIndexInfo().getSubmitTime(), 100);
Assert.assertEquals(mi.getJobIndexInfo().getJobStartTime(), 200);
Assert.assertEquals(mi.getJobSummary().getJobSubmitTime(), 100);
Assert.assertEquals(mi.getJobSummary().getJobLaunchTime(), 200);
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, JobStateInternal.FAILED.toString())));
Assert.assertEquals(mi.getJobIndexInfo().getSubmitTime(), 100);
Assert.assertEquals(mi.getJobIndexInfo().getJobStartTime(), 200);
Assert.assertEquals(mi.getJobSummary().getJobSubmitTime(), 100);
Assert.assertEquals(mi.getJobSummary().getJobLaunchTime(), 200);
verify(jheh, times(1)).processDoneFiles(t.jobId);
mockWriter = jheh.getEventWriter();
verify(mockWriter, times(2)).write(any(HistoryEvent.class));
} finally {
jheh.stop();
}
}
// Have JobHistoryEventHandler handle some events and make sure they get
// stored to the Timeline store
@Test (timeout=50000)
@ -463,7 +504,7 @@ public void testTimelineEventHandling() throws Exception {
.getTimelineStore();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000),
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1),
currentTime - 10));
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
null, null, null, null, null, null, null);

View File

@ -35,6 +35,7 @@
public class AMStartedEvent implements HistoryEvent {
private AMStarted datum = new AMStarted();
private String forcedJobStateOnShutDown;
private long submitTime;
/**
* Create an event to record the start of an MR AppMaster
@ -54,9 +55,9 @@ public class AMStartedEvent implements HistoryEvent {
*/
public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
int nodeManagerHttpPort) {
int nodeManagerHttpPort, long submitTime) {
this(appAttemptId, startTime, containerId, nodeManagerHost,
nodeManagerPort, nodeManagerHttpPort, null);
nodeManagerPort, nodeManagerHttpPort, null, submitTime);
}
/**
@ -79,7 +80,8 @@ public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
*/
public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
int nodeManagerHttpPort, String forcedJobStateOnShutDown) {
int nodeManagerHttpPort, String forcedJobStateOnShutDown,
long submitTime) {
datum.applicationAttemptId = new Utf8(appAttemptId.toString());
datum.startTime = startTime;
datum.containerId = new Utf8(containerId.toString());
@ -87,6 +89,7 @@ public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
datum.nodeManagerPort = nodeManagerPort;
datum.nodeManagerHttpPort = nodeManagerHttpPort;
this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
this.submitTime = submitTime;
}
AMStartedEvent() {
@ -150,6 +153,13 @@ public String getForcedJobStateOnShutDown() {
return this.forcedJobStateOnShutDown;
}
/**
* @return the submit time for the Application(Job)
*/
public long getSubmitTime() {
return this.submitTime;
}
/** Get the attempt id */
@Override