diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 52c13f1a44..c7e69af8a8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -119,7 +120,11 @@ public class JobHistoryEventHandler extends AbstractService protected BlockingQueue eventQueue = new LinkedBlockingQueue(); + + protected boolean handleTimelineEvent = false; + protected AsyncDispatcher atsEventDispatcher = null; protected Thread eventHandlingThread; + private volatile boolean stopped; private final Object lock = new Object(); @@ -279,6 +284,7 @@ protected void serviceInit(Configuration conf) throws Exception { ((MRAppMaster.RunningAppContext) context).getTimelineClient(); timelineClient.init(conf); } + handleTimelineEvent = true; LOG.info("Timeline service is enabled; version: " + YarnConfiguration.getTimelineServiceVersion(conf)); } else { @@ -302,10 +308,23 @@ protected void serviceInit(Configuration conf) throws Exception { "'json' or 'binary'. Falling back to default value '" + JHAdminConfig.DEFAULT_MR_HS_JHIST_FORMAT + "'."); } - + // initiate the atsEventDispatcher for timeline event + // if timeline service is enabled. + if (handleTimelineEvent) { + atsEventDispatcher = createDispatcher(); + EventHandler timelineEventHandler = + new ForwardingEventHandler(); + atsEventDispatcher.register(EventType.class, timelineEventHandler); + atsEventDispatcher.setDrainEventsOnStop(); + atsEventDispatcher.init(conf); + } super.serviceInit(conf); } + protected AsyncDispatcher createDispatcher() { + return new AsyncDispatcher("Job ATS Event Dispatcher"); + } + private void mkdir(FileSystem fs, Path path, FsPermission fsp) throws IOException { if (!fs.exists(path)) { @@ -371,6 +390,10 @@ public void run() { } }, "eventHandlingThread"); eventHandlingThread.start(); + + if (handleTimelineEvent) { + atsEventDispatcher.start(); + } super.serviceStart(); } @@ -461,6 +484,11 @@ protected void serviceStop() throws Exception { LOG.info("Exception while closing file " + e.getMessage()); } } + + if (handleTimelineEvent && atsEventDispatcher != null) { + atsEventDispatcher.stop(); + } + if (timelineClient != null) { timelineClient.stop(); } else if (timelineV2Client != null) { @@ -580,6 +608,10 @@ public void handle(JobHistoryEvent event) { } eventQueue.put(event); + // Process it for ATS (if enabled) + if (handleTimelineEvent) { + atsEventDispatcher.getEventHandler().handle(event); + } } catch (InterruptedException e) { throw new YarnRuntimeException(e); } @@ -622,13 +654,6 @@ public void handleEvent(JobHistoryEvent event) { } processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); - if (timelineV2Client != null) { - processEventForNewTimelineService(historyEvent, event.getJobID(), - event.getTimestamp()); - } else if (timelineClient != null) { - processEventForTimelineServer(historyEvent, event.getJobID(), - event.getTimestamp()); - } if (LOG.isDebugEnabled()) { LOG.debug("In HistoryEventHandler " + event.getHistoryEvent().getEventType()); @@ -710,6 +735,23 @@ public void handleEvent(JobHistoryEvent event) { } } + private void handleTimelineEvent(JobHistoryEvent event) { + HistoryEvent historyEvent = event.getHistoryEvent(); + if (handleTimelineEvent) { + if (timelineV2Client != null) { + processEventForNewTimelineService(historyEvent, event.getJobID(), + event.getTimestamp()); + } else if (timelineClient != null) { + processEventForTimelineServer(historyEvent, event.getJobID(), + event.getTimestamp()); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("In HistoryEventHandler, handle timelineEvent:" + + event.getHistoryEvent().getEventType()); + } + } + public void processEventForJobSummary(HistoryEvent event, JobSummary summary, JobId jobId) { // context.getJob could be used for some of this info as well. @@ -1745,4 +1787,12 @@ private String createJobStateForJobUnsuccessfulCompletionEvent( boolean getFlushTimerStatus() { return isTimerActive; } + + private final class ForwardingEventHandler + implements EventHandler { + @Override + public void handle(JobHistoryEvent event) { + handleTimelineEvent(event); + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index d951944df6..096d13ec82 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -73,6 +73,8 @@ import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.timeline.TimelineStore; @@ -589,6 +591,7 @@ public void testTimelineEventHandling() throws Exception { handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1), currentTime - 10)); + jheh.getDispatcher().await(); TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, null, null, null, null); Assert.assertEquals(1, entities.getEntities().size()); @@ -605,6 +608,7 @@ public void testTimelineEventHandling() throws Exception { "user", 200, "/foo/job.xml", new HashMap(), "default"), currentTime + 10)); + jheh.getDispatcher().await(); entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, null, null, null, null); Assert.assertEquals(1, entities.getEntities().size()); @@ -623,6 +627,7 @@ public void testTimelineEventHandling() throws Exception { handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"), currentTime - 20)); + jheh.getDispatcher().await(); entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, null, null, null, null); Assert.assertEquals(1, entities.getEntities().size()); @@ -645,6 +650,7 @@ public void testTimelineEventHandling() throws Exception { handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0, new Counters(), new Counters(), new Counters()), currentTime)); + jheh.getDispatcher().await(); entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, null, null, null, null); Assert.assertEquals(1, entities.getEntities().size()); @@ -672,6 +678,7 @@ public void testTimelineEventHandling() throws Exception { new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0, JobStateInternal.KILLED.toString()), currentTime + 20)); + jheh.getDispatcher().await(); entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, null, null, null, null); Assert.assertEquals(1, entities.getEntities().size()); @@ -701,6 +708,7 @@ public void testTimelineEventHandling() throws Exception { handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskStartedEvent(t.taskID, 0, TaskType.MAP, ""))); + jheh.getDispatcher().await(); entities = ts.getEntities("MAPREDUCE_TASK", null, null, null, null, null, null, null, null, null); Assert.assertEquals(1, entities.getEntities().size()); @@ -714,6 +722,7 @@ public void testTimelineEventHandling() throws Exception { handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, ""))); + jheh.getDispatcher().await(); entities = ts.getEntities("MAPREDUCE_TASK", null, null, null, null, null, null, null, null, null); Assert.assertEquals(1, entities.getEntities().size()); @@ -1031,6 +1040,7 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler { private EventWriter eventWriter; private boolean mockHistoryProcessing = true; + private DrainDispatcher dispatcher; public JHEvenHandlerForTest(AppContext context, int startCount) { super(context, startCount); JobHistoryEventHandler.fileMap.clear(); @@ -1042,6 +1052,12 @@ public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHist JobHistoryEventHandler.fileMap.clear(); } + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + + } + @Override protected void serviceStart() { if (timelineClient != null) { @@ -1049,6 +1065,19 @@ protected void serviceStart() { } else if (timelineV2Client != null) { timelineV2Client.start(); } + if (handleTimelineEvent) { + atsEventDispatcher.start(); + } + } + + @Override + protected AsyncDispatcher createDispatcher() { + dispatcher = new DrainDispatcher(); + return dispatcher; + } + + public DrainDispatcher getDispatcher() { + return dispatcher; } @Override