diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index fee3fc9fab..8ecdae5262 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -201,6 +201,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3618. Fixed TaskHeartbeatHandler to not hold a global lock for all task-updates. (Siddarth Seth via vinodkv) + MAPREDUCE-3512. Batching JobHistory flushing to DFS so that we don't flush + for every event slowing down AM. (Siddarth Seth via vinodkv) + BUG FIXES MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 169917a30c..7e8c3163bc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -20,9 +20,12 @@ package org.apache.hadoop.mapreduce.jobhistory; import java.io.IOException; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -70,13 +73,20 @@ public class JobHistoryEventHandler extends AbstractService private FileSystem stagingDirFS; // log Dir FileSystem private FileSystem doneDirFS; // done Dir FileSystem - private Configuration conf; private Path stagingDirPath = null; private Path doneDirPrefixPath = null; // folder for completed jobs + private int maxUnflushedCompletionEvents; + private int postJobCompletionMultiplier; + private long flushTimeout; + private int minQueueSizeForBatchingFlushes; // TODO: Rename - private BlockingQueue eventQueue = + private int numUnflushedCompletionEvents = 0; + private boolean isTimerActive; + + + protected BlockingQueue eventQueue = new LinkedBlockingQueue(); protected Thread eventHandlingThread; private volatile boolean stopped; @@ -103,8 +113,6 @@ public class JobHistoryEventHandler extends AbstractService @Override public void init(Configuration conf) { - this.conf = conf; - String stagingDirStr = null; String doneDirStr = null; String userDoneDirStr = null; @@ -184,6 +192,27 @@ public class JobHistoryEventHandler extends AbstractService throw new YarnException(e); } + // Maximum number of unflushed completion-events that can stay in the queue + // before flush kicks in. + maxUnflushedCompletionEvents = + conf.getInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, + MRJobConfig.DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS); + // We want to cut down flushes after job completes so as to write quicker, + // so we increase maxUnflushedEvents post Job completion by using the + // following multiplier. + postJobCompletionMultiplier = + conf.getInt( + MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, + MRJobConfig.DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER); + // Max time until which flush doesn't take place. + flushTimeout = + conf.getLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS, + MRJobConfig.DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS); + minQueueSizeForBatchingFlushes = + conf.getInt( + MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, + MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD); + super.init(conf); } @@ -256,14 +285,28 @@ public class JobHistoryEventHandler extends AbstractService stopped = true; //do not interrupt while event handling is in progress synchronized(lock) { - eventHandlingThread.interrupt(); + if (eventHandlingThread != null) + eventHandlingThread.interrupt(); } try { - eventHandlingThread.join(); + if (eventHandlingThread != null) + eventHandlingThread.join(); } catch (InterruptedException ie) { LOG.info("Interruped Exception while stopping", ie); } + + // Cancel all timers - so that they aren't invoked during or after + // the metaInfo object is wrapped up. + for (MetaInfo mi : fileMap.values()) { + try { + mi.shutDownTimer(); + } catch (IOException e) { + LOG.info("Exception while cancelling delayed flush timer. " + + "Likely caused by a failed flush " + e.getMessage()); + } + } + //write all the events remaining in queue Iterator it = eventQueue.iterator(); while(it.hasNext()) { @@ -284,6 +327,12 @@ public class JobHistoryEventHandler extends AbstractService super.stop(); } + protected EventWriter createEventWriter(Path historyFilePath) + throws IOException { + FSDataOutputStream out = stagingDirFS.create(historyFilePath, true); + return new EventWriter(out); + } + /** * Create an event writer for the Job represented by the jobID. * Writes out the job configuration to the log directory. @@ -319,8 +368,7 @@ public class JobHistoryEventHandler extends AbstractService JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId, startCount); if (writer == null) { try { - FSDataOutputStream out = stagingDirFS.create(historyFile, true); - writer = new EventWriter(out); + writer = createEventWriter(historyFile); LOG.info("Event Writer setup for JobId: " + jobId + ", File: " + historyFile); } catch (IOException ioe) { @@ -371,12 +419,26 @@ public class JobHistoryEventHandler extends AbstractService @Override public void handle(JobHistoryEvent event) { try { + if (isJobCompletionEvent(event.getHistoryEvent())) { + // When the job is complete, flush slower but write faster. + maxUnflushedCompletionEvents = + maxUnflushedCompletionEvents * postJobCompletionMultiplier; + } + eventQueue.put(event); } catch (InterruptedException e) { throw new YarnException(e); } } + private boolean isJobCompletionEvent(HistoryEvent historyEvent) { + if (EnumSet.of(EventType.JOB_FINISHED, EventType.JOB_FAILED, + EventType.JOB_KILLED).contains(historyEvent.getEventType())) { + return true; + } + return false; + } + protected void handleEvent(JobHistoryEvent event) { synchronized (lock) { @@ -615,50 +677,159 @@ public class JobHistoryEventHandler extends AbstractService } } + private class FlushTimerTask extends TimerTask { + private MetaInfo metaInfo; + private IOException ioe = null; + private volatile boolean shouldRun = true; + + FlushTimerTask(MetaInfo metaInfo) { + this.metaInfo = metaInfo; + } + + @Override + public void run() { + synchronized (lock) { + try { + if (!metaInfo.isTimerShutDown() && shouldRun) + metaInfo.flush(); + } catch (IOException e) { + ioe = e; + } + } + } + + public IOException getException() { + return ioe; + } + + public void stop() { + shouldRun = false; + this.cancel(); + } + } + private class MetaInfo { private Path historyFile; private Path confFile; private EventWriter writer; JobIndexInfo jobIndexInfo; JobSummary jobSummary; + Timer flushTimer; + FlushTimerTask flushTimerTask; + private boolean isTimerShutDown = false; - MetaInfo(Path historyFile, Path conf, EventWriter writer, - String user, String jobName, JobId jobId) { + MetaInfo(Path historyFile, Path conf, EventWriter writer, String user, + String jobName, JobId jobId) { this.historyFile = historyFile; this.confFile = conf; this.writer = writer; - this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, - null); + this.jobIndexInfo = + new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null); this.jobSummary = new JobSummary(); + this.flushTimer = new Timer("FlushTimer", true); } - Path getHistoryFile() { return historyFile; } + Path getHistoryFile() { + return historyFile; + } - Path getConfFile() {return confFile; } + Path getConfFile() { + return confFile; + } - JobIndexInfo getJobIndexInfo() { return jobIndexInfo; } + JobIndexInfo getJobIndexInfo() { + return jobIndexInfo; + } - JobSummary getJobSummary() { return jobSummary; } + JobSummary getJobSummary() { + return jobSummary; + } - boolean isWriterActive() {return writer != null ; } + boolean isWriterActive() { + return writer != null; + } + + boolean isTimerShutDown() { + return isTimerShutDown; + } void closeWriter() throws IOException { synchronized (lock) { - if (writer != null) { - writer.close(); + if (writer != null) { + writer.close(); + } + writer = null; } - writer = null; - } } void writeEvent(HistoryEvent event) throws IOException { synchronized (lock) { - if (writer != null) { - writer.write(event); - writer.flush(); + if (writer != null) { + writer.write(event); + processEventForFlush(event); + maybeFlush(event); + } + } + } + + void processEventForFlush(HistoryEvent historyEvent) throws IOException { + if (EnumSet.of(EventType.MAP_ATTEMPT_FINISHED, + EventType.MAP_ATTEMPT_FAILED, EventType.MAP_ATTEMPT_KILLED, + EventType.REDUCE_ATTEMPT_FINISHED, EventType.REDUCE_ATTEMPT_FAILED, + EventType.REDUCE_ATTEMPT_KILLED, EventType.TASK_FINISHED, + EventType.TASK_FAILED, EventType.JOB_FINISHED, EventType.JOB_FAILED, + EventType.JOB_KILLED).contains(historyEvent.getEventType())) { + numUnflushedCompletionEvents++; + if (!isTimerActive) { + resetFlushTimer(); + if (!isTimerShutDown) { + flushTimerTask = new FlushTimerTask(this); + flushTimer.schedule(flushTimerTask, flushTimeout); + } + } + } + } + + void resetFlushTimer() throws IOException { + if (flushTimerTask != null) { + IOException exception = flushTimerTask.getException(); + flushTimerTask.stop(); + if (exception != null) { + throw exception; + } + flushTimerTask = null; + } + isTimerActive = false; + } + + void maybeFlush(HistoryEvent historyEvent) throws IOException { + if ((eventQueue.size() < minQueueSizeForBatchingFlushes + && numUnflushedCompletionEvents > 0) + || numUnflushedCompletionEvents >= maxUnflushedCompletionEvents + || isJobCompletionEvent(historyEvent)) { + this.flush(); + } + } + + void flush() throws IOException { + synchronized (lock) { + if (numUnflushedCompletionEvents != 0) { // skipped timer cancel. + writer.flush(); + numUnflushedCompletionEvents = 0; + resetFlushTimer(); + } + } + } + + void shutDownTimer() throws IOException { + synchronized (lock) { + isTimerShutDown = true; + flushTimer.cancel(); + if (flushTimerTask != null && flushTimerTask.getException() != null) { + throw flushTimerTask.getException(); + } } } - } } private void moveTmpToDone(Path tmpPath) throws IOException { @@ -682,7 +853,7 @@ public class JobHistoryEventHandler extends AbstractService doneDirFS.delete(toPath, true); } boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath, - false, conf); + false, getConfig()); if (copied) LOG.info("Copied to done location: " + toPath); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java new file mode 100644 index 0000000000..5e7af4629f --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.jobhistory; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.Test; + +public class TestJobHistoryEventHandler { + + + private static final Log LOG = LogFactory + .getLog(TestJobHistoryEventHandler.class); + + @Test + public void testFirstFlushOnCompletionEvent() throws Exception { + TestParams t = new TestParams(); + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir); + conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS, + 60 * 1000l); + conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10); + conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10); + conf.setInt( + MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 200); + + JHEvenHandlerForTest realJheh = + new JHEvenHandlerForTest(t.mockAppContext, 0); + JHEvenHandlerForTest jheh = spy(realJheh); + jheh.init(conf); + + EventWriter mockWriter = null; + try { + jheh.start(); + handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + mockWriter = jheh.getEventWriter(); + verify(mockWriter).write(any(HistoryEvent.class)); + + for (int i = 0; i < 100; i++) { + queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskStartedEvent( + t.taskID, 0, TaskType.MAP, ""))); + } + handleNextNEvents(jheh, 100); + verify(mockWriter, times(0)).flush(); + + // First completion event, but min-queue-size for batching flushes is 10 + handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( + t.taskID, 0, TaskType.MAP, "", null))); + verify(mockWriter).flush(); + + } finally { + jheh.stop(); + verify(mockWriter).close(); + } + } + + @Test + public void testMaxUnflushedCompletionEvents() throws Exception { + TestParams t = new TestParams(); + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir); + conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS, + 60 * 1000l); + conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10); + conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10); + conf.setInt( + MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 5); + + JHEvenHandlerForTest realJheh = + new JHEvenHandlerForTest(t.mockAppContext, 0); + JHEvenHandlerForTest jheh = spy(realJheh); + jheh.init(conf); + + EventWriter mockWriter = null; + try { + jheh.start(); + handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + mockWriter = jheh.getEventWriter(); + verify(mockWriter).write(any(HistoryEvent.class)); + + for (int i = 0 ; i < 100 ; i++) { + queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( + t.taskID, 0, TaskType.MAP, "", null))); + } + + handleNextNEvents(jheh, 9); + verify(mockWriter, times(0)).flush(); + + handleNextNEvents(jheh, 1); + verify(mockWriter).flush(); + + handleNextNEvents(jheh, 50); + verify(mockWriter, times(6)).flush(); + + } finally { + jheh.stop(); + verify(mockWriter).close(); + } + } + + @Test + public void testUnflushedTimer() throws Exception { + TestParams t = new TestParams(); + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir); + conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS, + 2 * 1000l); //2 seconds. + conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10); + conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 100); + conf.setInt( + MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 5); + + JHEvenHandlerForTest realJheh = + new JHEvenHandlerForTest(t.mockAppContext, 0); + JHEvenHandlerForTest jheh = spy(realJheh); + jheh.init(conf); + + EventWriter mockWriter = null; + try { + jheh.start(); + handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + mockWriter = jheh.getEventWriter(); + verify(mockWriter).write(any(HistoryEvent.class)); + + for (int i = 0 ; i < 100 ; i++) { + queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( + t.taskID, 0, TaskType.MAP, "", null))); + } + + handleNextNEvents(jheh, 9); + verify(mockWriter, times(0)).flush(); + + Thread.sleep(2 * 4 * 1000l); // 4 seconds should be enough. Just be safe. + verify(mockWriter).flush(); + } finally { + jheh.stop(); + verify(mockWriter).close(); + } + } + + @Test + public void testBatchedFlushJobEndMultiplier() throws Exception { + TestParams t = new TestParams(); + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir); + conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS, + 60 * 1000l); //2 seconds. + conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 3); + conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10); + conf.setInt( + MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 0); + + JHEvenHandlerForTest realJheh = + new JHEvenHandlerForTest(t.mockAppContext, 0); + JHEvenHandlerForTest jheh = spy(realJheh); + jheh.init(conf); + + EventWriter mockWriter = null; + try { + jheh.start(); + handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + mockWriter = jheh.getEventWriter(); + verify(mockWriter).write(any(HistoryEvent.class)); + + for (int i = 0 ; i < 100 ; i++) { + queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( + t.taskID, 0, TaskType.MAP, "", null))); + } + queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent( + TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters()))); + + handleNextNEvents(jheh, 29); + verify(mockWriter, times(0)).flush(); + + handleNextNEvents(jheh, 72); + verify(mockWriter, times(4)).flush(); //3 * 30 + 1 for JobFinished + } finally { + jheh.stop(); + verify(mockWriter).close(); + } + } + + private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) { + jheh.handle(event); + } + + private void handleEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) + throws InterruptedException { + jheh.handle(event); + jheh.handleEvent(jheh.eventQueue.take()); + } + + private void handleNextNEvents(JHEvenHandlerForTest jheh, int numEvents) + throws InterruptedException { + for (int i = 0; i < numEvents; i++) { + jheh.handleEvent(jheh.eventQueue.take()); + } + } + + private String setupTestWorkDir() { + File testWorkDir = new File("target", this.getClass().getCanonicalName()); + try { + FileContext.getLocalFSFileContext().delete( + new Path(testWorkDir.getAbsolutePath()), true); + return testWorkDir.getAbsolutePath(); + } catch (Exception e) { + LOG.warn("Could not cleanup", e); + throw new YarnException("could not cleanup test dir", e); + } + } + + private AppContext mockAppContext(JobId jobId) { + AppContext mockContext = mock(AppContext.class); + Job mockJob = mock(Job.class); + when(mockJob.getTotalMaps()).thenReturn(10); + when(mockJob.getTotalReduces()).thenReturn(10); + when(mockJob.getName()).thenReturn("mockjob"); + when(mockContext.getJob(jobId)).thenReturn(mockJob); + return mockContext; + } + + + private class TestParams { + String workDir = setupTestWorkDir(); + ApplicationId appId = BuilderUtils.newApplicationId(200, 1); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 1); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); + TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005"); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + AppContext mockAppContext = mockAppContext(jobId); + } +} + +class JHEvenHandlerForTest extends JobHistoryEventHandler { + + private EventWriter eventWriter; + volatile int handleEventCompleteCalls = 0; + volatile int handleEventStartedCalls = 0; + + public JHEvenHandlerForTest(AppContext context, int startCount) { + super(context, startCount); + } + + @Override + public void start() { + } + + @Override + protected EventWriter createEventWriter(Path historyFilePath) + throws IOException { + this.eventWriter = mock(EventWriter.class); + return this.eventWriter; + } + + @Override + protected void closeEventWriter(JobId jobId) { + } + + public EventWriter getEventWriter() { + return this.eventWriter; + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 044f952b2e..97504ea36c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -436,6 +436,26 @@ public interface MRJobConfig { public static final String MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR = MR_AM_PREFIX + "create-intermediate-jh-base-dir"; + public static final String MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS = + MR_AM_PREFIX + "history.max-unflushed-events"; + public static final int DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS = + 200; + + public static final String MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER = + MR_AM_PREFIX + "history.job-complete-unflushed-multiplier"; + public static final int DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER = + 30; + + public static final String MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS = + MR_AM_PREFIX + "history.complete-event-flush-timeout"; + public static final long DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS = + 30 * 1000l; + + public static final String MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD = + MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold"; + public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD = + 50; + public static final String MAPRED_MAP_ADMIN_JAVA_OPTS = "mapreduce.admin.map.child.java.opts";