From bf70c5ae2824a9139c1aa9d7c14020018881cec2 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 4 May 2015 20:39:18 +0000 Subject: [PATCH] MAPREDUCE-6259. IllegalArgumentException due to missing job submit time. Contributed by zhihai xu --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../jobhistory/JobHistoryEventHandler.java | 15 +++-- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 4 +- .../TestJobHistoryEventHandler.java | 57 ++++++++++++++++--- .../mapreduce/jobhistory/AMStartedEvent.java | 16 +++++- 5 files changed, 77 insertions(+), 18 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f7e3bded2e..481757ab50 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -397,6 +397,9 @@ Release 2.7.1 - UNRELEASED MAPREDUCE-6339. Job history file is not flushed correctly because isTimerActive flag is not set true when flushTimerTask is scheduled. (zhihai xu via devaraj) + MAPREDUCE-6259. IllegalArgumentException due to missing job submit time + (zhihai xu via jlowe) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 6b0ea7957c..bf32888098 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -426,10 +426,10 @@ protected EventWriter createEventWriter(Path historyFilePath) * This should be the first call to history for a job * * @param jobId the jobId. - * @param forcedJobStateOnShutDown + * @param amStartedEvent * @throws IOException */ - protected void setupEventWriter(JobId jobId, String forcedJobStateOnShutDown) + protected void setupEventWriter(JobId jobId, AMStartedEvent amStartedEvent) throws IOException { if (stagingDirPath == null) { LOG.error("Log Directory is null, returning"); @@ -489,8 +489,13 @@ protected void setupEventWriter(JobId jobId, String forcedJobStateOnShutDown) } MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer, - user, jobName, jobId, forcedJobStateOnShutDown, queueName); + user, jobName, jobId, amStartedEvent.getForcedJobStateOnShutDown(), + queueName); fi.getJobSummary().setJobId(jobId); + fi.getJobSummary().setJobLaunchTime(amStartedEvent.getStartTime()); + fi.getJobSummary().setJobSubmitTime(amStartedEvent.getSubmitTime()); + fi.getJobIndexInfo().setJobStartTime(amStartedEvent.getStartTime()); + fi.getJobIndexInfo().setSubmitTime(amStartedEvent.getSubmitTime()); fileMap.put(jobId, fi); } @@ -541,8 +546,7 @@ public void handleEvent(JobHistoryEvent event) { try { AMStartedEvent amStartedEvent = (AMStartedEvent) event.getHistoryEvent(); - setupEventWriter(event.getJobID(), - amStartedEvent.getForcedJobStateOnShutDown()); + setupEventWriter(event.getJobID(), amStartedEvent); } catch (IOException ioe) { LOG.error("Error JobHistoryEventHandler in handleEvent: " + event, ioe); @@ -982,6 +986,7 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId, tEvent.addEventInfo("NODE_MANAGER_HTTP_PORT", ase.getNodeManagerHttpPort()); tEvent.addEventInfo("START_TIME", ase.getStartTime()); + tEvent.addEventInfo("SUBMIT_TIME", ase.getSubmitTime()); tEntity.addEvent(tEvent); tEntity.setEntityId(jobId.toString()); tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 9908ea5788..c41f679c0d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -1052,7 +1052,7 @@ protected void serviceStart() throws Exception { new JobHistoryEvent(job.getID(), new AMStartedEvent(info .getAppAttemptId(), info.getStartTime(), info.getContainerId(), info.getNodeManagerHost(), info.getNodeManagerPort(), info - .getNodeManagerHttpPort()))); + .getNodeManagerHttpPort(), appSubmitTime))); } // Send out an MR AM inited event for this AM. @@ -1061,7 +1061,7 @@ protected void serviceStart() throws Exception { .getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(), amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo .getNodeManagerHttpPort(), this.forcedState == null ? null - : this.forcedState.toString()))); + : this.forcedState.toString(), appSubmitTime))); amInfos.add(amInfo); // metrics system init is really init & start. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index 49be35b2a4..2b07efb7f8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -125,7 +125,7 @@ public void testFirstFlushOnCompletionEvent() throws Exception { try { jheh.start(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1))); mockWriter = jheh.getEventWriter(); verify(mockWriter).write(any(HistoryEvent.class)); @@ -168,7 +168,7 @@ public void testMaxUnflushedCompletionEvents() throws Exception { try { jheh.start(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1))); mockWriter = jheh.getEventWriter(); verify(mockWriter).write(any(HistoryEvent.class)); @@ -213,7 +213,7 @@ public void testUnflushedTimer() throws Exception { try { jheh.start(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1))); mockWriter = jheh.getEventWriter(); verify(mockWriter).write(any(HistoryEvent.class)); @@ -256,7 +256,7 @@ public void testBatchedFlushJobEndMultiplier() throws Exception { try { jheh.start(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1))); mockWriter = jheh.getEventWriter(); verify(mockWriter).write(any(HistoryEvent.class)); @@ -293,7 +293,7 @@ public void testProcessDoneFilesOnLastAMRetry() throws Exception { try { jheh.start(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1))); verify(jheh, times(0)).processDoneFiles(any(JobId.class)); handleEvent(jheh, new JobHistoryEvent(t.jobId, @@ -338,7 +338,7 @@ public void testProcessDoneFilesNotLastAMRetry() throws Exception { try { jheh.start(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1))); verify(jheh, times(0)).processDoneFiles(t.jobId); // skip processing done files @@ -395,7 +395,7 @@ public void testDefaultFsIsUsedForHistory() throws Exception { try { jheh.start(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1))); handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent( TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(), @@ -441,6 +441,47 @@ public void testGetHistoryIntermediateDoneDirForUser() throws IOException { pathStr); } + // test AMStartedEvent for submitTime and startTime + @Test (timeout=50000) + public void testAMStartedEvent() throws Exception { + TestParams t = new TestParams(); + Configuration conf = new Configuration(); + + JHEvenHandlerForTest realJheh = + new JHEvenHandlerForTest(t.mockAppContext, 0); + JHEvenHandlerForTest jheh = spy(realJheh); + jheh.init(conf); + + EventWriter mockWriter = null; + try { + jheh.start(); + handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, 100))); + + JobHistoryEventHandler.MetaInfo mi = + JobHistoryEventHandler.fileMap.get(t.jobId); + Assert.assertEquals(mi.getJobIndexInfo().getSubmitTime(), 100); + Assert.assertEquals(mi.getJobIndexInfo().getJobStartTime(), 200); + Assert.assertEquals(mi.getJobSummary().getJobSubmitTime(), 100); + Assert.assertEquals(mi.getJobSummary().getJobLaunchTime(), 200); + + handleEvent(jheh, new JobHistoryEvent(t.jobId, + new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0, + 0, 0, JobStateInternal.FAILED.toString()))); + + Assert.assertEquals(mi.getJobIndexInfo().getSubmitTime(), 100); + Assert.assertEquals(mi.getJobIndexInfo().getJobStartTime(), 200); + Assert.assertEquals(mi.getJobSummary().getJobSubmitTime(), 100); + Assert.assertEquals(mi.getJobSummary().getJobLaunchTime(), 200); + verify(jheh, times(1)).processDoneFiles(t.jobId); + + mockWriter = jheh.getEventWriter(); + verify(mockWriter, times(2)).write(any(HistoryEvent.class)); + } finally { + jheh.stop(); + } + } + // Have JobHistoryEventHandler handle some events and make sure they get // stored to the Timeline store @Test (timeout=50000) @@ -463,7 +504,7 @@ public void testTimelineEventHandling() throws Exception { .getTimelineStore(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000), + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1), currentTime - 10)); TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, null, null, null, null); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java index d1a378bc20..e1465f513a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java @@ -35,6 +35,7 @@ public class AMStartedEvent implements HistoryEvent { private AMStarted datum = new AMStarted(); private String forcedJobStateOnShutDown; + private long submitTime; /** * Create an event to record the start of an MR AppMaster @@ -54,9 +55,9 @@ public class AMStartedEvent implements HistoryEvent { */ public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime, ContainerId containerId, String nodeManagerHost, int nodeManagerPort, - int nodeManagerHttpPort) { + int nodeManagerHttpPort, long submitTime) { this(appAttemptId, startTime, containerId, nodeManagerHost, - nodeManagerPort, nodeManagerHttpPort, null); + nodeManagerPort, nodeManagerHttpPort, null, submitTime); } /** @@ -79,7 +80,8 @@ public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime, */ public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime, ContainerId containerId, String nodeManagerHost, int nodeManagerPort, - int nodeManagerHttpPort, String forcedJobStateOnShutDown) { + int nodeManagerHttpPort, String forcedJobStateOnShutDown, + long submitTime) { datum.applicationAttemptId = new Utf8(appAttemptId.toString()); datum.startTime = startTime; datum.containerId = new Utf8(containerId.toString()); @@ -87,6 +89,7 @@ public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime, datum.nodeManagerPort = nodeManagerPort; datum.nodeManagerHttpPort = nodeManagerHttpPort; this.forcedJobStateOnShutDown = forcedJobStateOnShutDown; + this.submitTime = submitTime; } AMStartedEvent() { @@ -150,6 +153,13 @@ public String getForcedJobStateOnShutDown() { return this.forcedJobStateOnShutDown; } + /** + * @return the submit time for the Application(Job) + */ + public long getSubmitTime() { + return this.submitTime; + } + /** Get the attempt id */ @Override