MAPREDUCE-3512. Batching JobHistory flushing to DFS so that we don't flush for every event slowing down AM. Contributed by Siddarth Seth.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1230353 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0086014703
commit
74697f2317
@ -201,6 +201,9 @@ Release 0.23.1 - Unreleased
|
||||
MAPREDUCE-3618. Fixed TaskHeartbeatHandler to not hold a global lock for all
|
||||
task-updates. (Siddarth Seth via vinodkv)
|
||||
|
||||
MAPREDUCE-3512. Batching JobHistory flushing to DFS so that we don't flush
|
||||
for every event slowing down AM. (Siddarth Seth via vinodkv)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob
|
||||
|
@ -20,9 +20,12 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
@ -70,13 +73,20 @@ public class JobHistoryEventHandler extends AbstractService
|
||||
private FileSystem stagingDirFS; // log Dir FileSystem
|
||||
private FileSystem doneDirFS; // done Dir FileSystem
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
private Path stagingDirPath = null;
|
||||
private Path doneDirPrefixPath = null; // folder for completed jobs
|
||||
|
||||
private int maxUnflushedCompletionEvents;
|
||||
private int postJobCompletionMultiplier;
|
||||
private long flushTimeout;
|
||||
private int minQueueSizeForBatchingFlushes; // TODO: Rename
|
||||
|
||||
private BlockingQueue<JobHistoryEvent> eventQueue =
|
||||
private int numUnflushedCompletionEvents = 0;
|
||||
private boolean isTimerActive;
|
||||
|
||||
|
||||
protected BlockingQueue<JobHistoryEvent> eventQueue =
|
||||
new LinkedBlockingQueue<JobHistoryEvent>();
|
||||
protected Thread eventHandlingThread;
|
||||
private volatile boolean stopped;
|
||||
@ -103,8 +113,6 @@ public JobHistoryEventHandler(AppContext context, int startCount) {
|
||||
@Override
|
||||
public void init(Configuration conf) {
|
||||
|
||||
this.conf = conf;
|
||||
|
||||
String stagingDirStr = null;
|
||||
String doneDirStr = null;
|
||||
String userDoneDirStr = null;
|
||||
@ -184,6 +192,27 @@ public void init(Configuration conf) {
|
||||
throw new YarnException(e);
|
||||
}
|
||||
|
||||
// Maximum number of unflushed completion-events that can stay in the queue
|
||||
// before flush kicks in.
|
||||
maxUnflushedCompletionEvents =
|
||||
conf.getInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS,
|
||||
MRJobConfig.DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS);
|
||||
// We want to cut down flushes after job completes so as to write quicker,
|
||||
// so we increase maxUnflushedEvents post Job completion by using the
|
||||
// following multiplier.
|
||||
postJobCompletionMultiplier =
|
||||
conf.getInt(
|
||||
MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER,
|
||||
MRJobConfig.DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER);
|
||||
// Max time until which flush doesn't take place.
|
||||
flushTimeout =
|
||||
conf.getLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
|
||||
MRJobConfig.DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS);
|
||||
minQueueSizeForBatchingFlushes =
|
||||
conf.getInt(
|
||||
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
|
||||
MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
|
||||
|
||||
super.init(conf);
|
||||
}
|
||||
|
||||
@ -256,14 +285,28 @@ public void stop() {
|
||||
stopped = true;
|
||||
//do not interrupt while event handling is in progress
|
||||
synchronized(lock) {
|
||||
eventHandlingThread.interrupt();
|
||||
if (eventHandlingThread != null)
|
||||
eventHandlingThread.interrupt();
|
||||
}
|
||||
|
||||
try {
|
||||
eventHandlingThread.join();
|
||||
if (eventHandlingThread != null)
|
||||
eventHandlingThread.join();
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("Interruped Exception while stopping", ie);
|
||||
}
|
||||
|
||||
// Cancel all timers - so that they aren't invoked during or after
|
||||
// the metaInfo object is wrapped up.
|
||||
for (MetaInfo mi : fileMap.values()) {
|
||||
try {
|
||||
mi.shutDownTimer();
|
||||
} catch (IOException e) {
|
||||
LOG.info("Exception while cancelling delayed flush timer. "
|
||||
+ "Likely caused by a failed flush " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
//write all the events remaining in queue
|
||||
Iterator<JobHistoryEvent> it = eventQueue.iterator();
|
||||
while(it.hasNext()) {
|
||||
@ -284,6 +327,12 @@ public void stop() {
|
||||
super.stop();
|
||||
}
|
||||
|
||||
protected EventWriter createEventWriter(Path historyFilePath)
|
||||
throws IOException {
|
||||
FSDataOutputStream out = stagingDirFS.create(historyFilePath, true);
|
||||
return new EventWriter(out);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an event writer for the Job represented by the jobID.
|
||||
* Writes out the job configuration to the log directory.
|
||||
@ -319,8 +368,7 @@ protected void setupEventWriter(JobId jobId)
|
||||
JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId, startCount);
|
||||
if (writer == null) {
|
||||
try {
|
||||
FSDataOutputStream out = stagingDirFS.create(historyFile, true);
|
||||
writer = new EventWriter(out);
|
||||
writer = createEventWriter(historyFile);
|
||||
LOG.info("Event Writer setup for JobId: " + jobId + ", File: "
|
||||
+ historyFile);
|
||||
} catch (IOException ioe) {
|
||||
@ -371,12 +419,26 @@ public void closeWriter(JobId id) throws IOException {
|
||||
@Override
|
||||
public void handle(JobHistoryEvent event) {
|
||||
try {
|
||||
if (isJobCompletionEvent(event.getHistoryEvent())) {
|
||||
// When the job is complete, flush slower but write faster.
|
||||
maxUnflushedCompletionEvents =
|
||||
maxUnflushedCompletionEvents * postJobCompletionMultiplier;
|
||||
}
|
||||
|
||||
eventQueue.put(event);
|
||||
} catch (InterruptedException e) {
|
||||
throw new YarnException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isJobCompletionEvent(HistoryEvent historyEvent) {
|
||||
if (EnumSet.of(EventType.JOB_FINISHED, EventType.JOB_FAILED,
|
||||
EventType.JOB_KILLED).contains(historyEvent.getEventType())) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected void handleEvent(JobHistoryEvent event) {
|
||||
synchronized (lock) {
|
||||
|
||||
@ -615,50 +677,159 @@ protected void closeEventWriter(JobId jobId) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
private class FlushTimerTask extends TimerTask {
|
||||
private MetaInfo metaInfo;
|
||||
private IOException ioe = null;
|
||||
private volatile boolean shouldRun = true;
|
||||
|
||||
FlushTimerTask(MetaInfo metaInfo) {
|
||||
this.metaInfo = metaInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (lock) {
|
||||
try {
|
||||
if (!metaInfo.isTimerShutDown() && shouldRun)
|
||||
metaInfo.flush();
|
||||
} catch (IOException e) {
|
||||
ioe = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public IOException getException() {
|
||||
return ioe;
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
shouldRun = false;
|
||||
this.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
private class MetaInfo {
|
||||
private Path historyFile;
|
||||
private Path confFile;
|
||||
private EventWriter writer;
|
||||
JobIndexInfo jobIndexInfo;
|
||||
JobSummary jobSummary;
|
||||
Timer flushTimer;
|
||||
FlushTimerTask flushTimerTask;
|
||||
private boolean isTimerShutDown = false;
|
||||
|
||||
MetaInfo(Path historyFile, Path conf, EventWriter writer,
|
||||
String user, String jobName, JobId jobId) {
|
||||
MetaInfo(Path historyFile, Path conf, EventWriter writer, String user,
|
||||
String jobName, JobId jobId) {
|
||||
this.historyFile = historyFile;
|
||||
this.confFile = conf;
|
||||
this.writer = writer;
|
||||
this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1,
|
||||
null);
|
||||
this.jobIndexInfo =
|
||||
new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null);
|
||||
this.jobSummary = new JobSummary();
|
||||
this.flushTimer = new Timer("FlushTimer", true);
|
||||
}
|
||||
|
||||
Path getHistoryFile() { return historyFile; }
|
||||
Path getHistoryFile() {
|
||||
return historyFile;
|
||||
}
|
||||
|
||||
Path getConfFile() {return confFile; }
|
||||
Path getConfFile() {
|
||||
return confFile;
|
||||
}
|
||||
|
||||
JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
|
||||
JobIndexInfo getJobIndexInfo() {
|
||||
return jobIndexInfo;
|
||||
}
|
||||
|
||||
JobSummary getJobSummary() { return jobSummary; }
|
||||
JobSummary getJobSummary() {
|
||||
return jobSummary;
|
||||
}
|
||||
|
||||
boolean isWriterActive() {return writer != null ; }
|
||||
boolean isWriterActive() {
|
||||
return writer != null;
|
||||
}
|
||||
|
||||
boolean isTimerShutDown() {
|
||||
return isTimerShutDown;
|
||||
}
|
||||
|
||||
void closeWriter() throws IOException {
|
||||
synchronized (lock) {
|
||||
if (writer != null) {
|
||||
writer.close();
|
||||
if (writer != null) {
|
||||
writer.close();
|
||||
}
|
||||
writer = null;
|
||||
}
|
||||
writer = null;
|
||||
}
|
||||
}
|
||||
|
||||
void writeEvent(HistoryEvent event) throws IOException {
|
||||
synchronized (lock) {
|
||||
if (writer != null) {
|
||||
writer.write(event);
|
||||
writer.flush();
|
||||
if (writer != null) {
|
||||
writer.write(event);
|
||||
processEventForFlush(event);
|
||||
maybeFlush(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void processEventForFlush(HistoryEvent historyEvent) throws IOException {
|
||||
if (EnumSet.of(EventType.MAP_ATTEMPT_FINISHED,
|
||||
EventType.MAP_ATTEMPT_FAILED, EventType.MAP_ATTEMPT_KILLED,
|
||||
EventType.REDUCE_ATTEMPT_FINISHED, EventType.REDUCE_ATTEMPT_FAILED,
|
||||
EventType.REDUCE_ATTEMPT_KILLED, EventType.TASK_FINISHED,
|
||||
EventType.TASK_FAILED, EventType.JOB_FINISHED, EventType.JOB_FAILED,
|
||||
EventType.JOB_KILLED).contains(historyEvent.getEventType())) {
|
||||
numUnflushedCompletionEvents++;
|
||||
if (!isTimerActive) {
|
||||
resetFlushTimer();
|
||||
if (!isTimerShutDown) {
|
||||
flushTimerTask = new FlushTimerTask(this);
|
||||
flushTimer.schedule(flushTimerTask, flushTimeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void resetFlushTimer() throws IOException {
|
||||
if (flushTimerTask != null) {
|
||||
IOException exception = flushTimerTask.getException();
|
||||
flushTimerTask.stop();
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
flushTimerTask = null;
|
||||
}
|
||||
isTimerActive = false;
|
||||
}
|
||||
|
||||
void maybeFlush(HistoryEvent historyEvent) throws IOException {
|
||||
if ((eventQueue.size() < minQueueSizeForBatchingFlushes
|
||||
&& numUnflushedCompletionEvents > 0)
|
||||
|| numUnflushedCompletionEvents >= maxUnflushedCompletionEvents
|
||||
|| isJobCompletionEvent(historyEvent)) {
|
||||
this.flush();
|
||||
}
|
||||
}
|
||||
|
||||
void flush() throws IOException {
|
||||
synchronized (lock) {
|
||||
if (numUnflushedCompletionEvents != 0) { // skipped timer cancel.
|
||||
writer.flush();
|
||||
numUnflushedCompletionEvents = 0;
|
||||
resetFlushTimer();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void shutDownTimer() throws IOException {
|
||||
synchronized (lock) {
|
||||
isTimerShutDown = true;
|
||||
flushTimer.cancel();
|
||||
if (flushTimerTask != null && flushTimerTask.getException() != null) {
|
||||
throw flushTimerTask.getException();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void moveTmpToDone(Path tmpPath) throws IOException {
|
||||
@ -682,7 +853,7 @@ private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
|
||||
doneDirFS.delete(toPath, true);
|
||||
}
|
||||
boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath,
|
||||
false, conf);
|
||||
false, getConfig());
|
||||
|
||||
if (copied)
|
||||
LOG.info("Copied to done location: " + toPath);
|
||||
|
@ -0,0 +1,310 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.jobhistory;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.TaskID;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestJobHistoryEventHandler {
|
||||
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(TestJobHistoryEventHandler.class);
|
||||
|
||||
@Test
|
||||
public void testFirstFlushOnCompletionEvent() throws Exception {
|
||||
TestParams t = new TestParams();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
|
||||
conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
|
||||
60 * 1000l);
|
||||
conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
|
||||
conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
|
||||
conf.setInt(
|
||||
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 200);
|
||||
|
||||
JHEvenHandlerForTest realJheh =
|
||||
new JHEvenHandlerForTest(t.mockAppContext, 0);
|
||||
JHEvenHandlerForTest jheh = spy(realJheh);
|
||||
jheh.init(conf);
|
||||
|
||||
EventWriter mockWriter = null;
|
||||
try {
|
||||
jheh.start();
|
||||
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
|
||||
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
|
||||
mockWriter = jheh.getEventWriter();
|
||||
verify(mockWriter).write(any(HistoryEvent.class));
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskStartedEvent(
|
||||
t.taskID, 0, TaskType.MAP, "")));
|
||||
}
|
||||
handleNextNEvents(jheh, 100);
|
||||
verify(mockWriter, times(0)).flush();
|
||||
|
||||
// First completion event, but min-queue-size for batching flushes is 10
|
||||
handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
||||
t.taskID, 0, TaskType.MAP, "", null)));
|
||||
verify(mockWriter).flush();
|
||||
|
||||
} finally {
|
||||
jheh.stop();
|
||||
verify(mockWriter).close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxUnflushedCompletionEvents() throws Exception {
|
||||
TestParams t = new TestParams();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
|
||||
conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
|
||||
60 * 1000l);
|
||||
conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
|
||||
conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
|
||||
conf.setInt(
|
||||
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 5);
|
||||
|
||||
JHEvenHandlerForTest realJheh =
|
||||
new JHEvenHandlerForTest(t.mockAppContext, 0);
|
||||
JHEvenHandlerForTest jheh = spy(realJheh);
|
||||
jheh.init(conf);
|
||||
|
||||
EventWriter mockWriter = null;
|
||||
try {
|
||||
jheh.start();
|
||||
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
|
||||
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
|
||||
mockWriter = jheh.getEventWriter();
|
||||
verify(mockWriter).write(any(HistoryEvent.class));
|
||||
|
||||
for (int i = 0 ; i < 100 ; i++) {
|
||||
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
||||
t.taskID, 0, TaskType.MAP, "", null)));
|
||||
}
|
||||
|
||||
handleNextNEvents(jheh, 9);
|
||||
verify(mockWriter, times(0)).flush();
|
||||
|
||||
handleNextNEvents(jheh, 1);
|
||||
verify(mockWriter).flush();
|
||||
|
||||
handleNextNEvents(jheh, 50);
|
||||
verify(mockWriter, times(6)).flush();
|
||||
|
||||
} finally {
|
||||
jheh.stop();
|
||||
verify(mockWriter).close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnflushedTimer() throws Exception {
|
||||
TestParams t = new TestParams();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
|
||||
conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
|
||||
2 * 1000l); //2 seconds.
|
||||
conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
|
||||
conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 100);
|
||||
conf.setInt(
|
||||
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 5);
|
||||
|
||||
JHEvenHandlerForTest realJheh =
|
||||
new JHEvenHandlerForTest(t.mockAppContext, 0);
|
||||
JHEvenHandlerForTest jheh = spy(realJheh);
|
||||
jheh.init(conf);
|
||||
|
||||
EventWriter mockWriter = null;
|
||||
try {
|
||||
jheh.start();
|
||||
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
|
||||
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
|
||||
mockWriter = jheh.getEventWriter();
|
||||
verify(mockWriter).write(any(HistoryEvent.class));
|
||||
|
||||
for (int i = 0 ; i < 100 ; i++) {
|
||||
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
||||
t.taskID, 0, TaskType.MAP, "", null)));
|
||||
}
|
||||
|
||||
handleNextNEvents(jheh, 9);
|
||||
verify(mockWriter, times(0)).flush();
|
||||
|
||||
Thread.sleep(2 * 4 * 1000l); // 4 seconds should be enough. Just be safe.
|
||||
verify(mockWriter).flush();
|
||||
} finally {
|
||||
jheh.stop();
|
||||
verify(mockWriter).close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchedFlushJobEndMultiplier() throws Exception {
|
||||
TestParams t = new TestParams();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
|
||||
conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
|
||||
60 * 1000l); //2 seconds.
|
||||
conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 3);
|
||||
conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
|
||||
conf.setInt(
|
||||
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 0);
|
||||
|
||||
JHEvenHandlerForTest realJheh =
|
||||
new JHEvenHandlerForTest(t.mockAppContext, 0);
|
||||
JHEvenHandlerForTest jheh = spy(realJheh);
|
||||
jheh.init(conf);
|
||||
|
||||
EventWriter mockWriter = null;
|
||||
try {
|
||||
jheh.start();
|
||||
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
|
||||
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
|
||||
mockWriter = jheh.getEventWriter();
|
||||
verify(mockWriter).write(any(HistoryEvent.class));
|
||||
|
||||
for (int i = 0 ; i < 100 ; i++) {
|
||||
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
||||
t.taskID, 0, TaskType.MAP, "", null)));
|
||||
}
|
||||
queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
|
||||
TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));
|
||||
|
||||
handleNextNEvents(jheh, 29);
|
||||
verify(mockWriter, times(0)).flush();
|
||||
|
||||
handleNextNEvents(jheh, 72);
|
||||
verify(mockWriter, times(4)).flush(); //3 * 30 + 1 for JobFinished
|
||||
} finally {
|
||||
jheh.stop();
|
||||
verify(mockWriter).close();
|
||||
}
|
||||
}
|
||||
|
||||
private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
|
||||
jheh.handle(event);
|
||||
}
|
||||
|
||||
private void handleEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event)
|
||||
throws InterruptedException {
|
||||
jheh.handle(event);
|
||||
jheh.handleEvent(jheh.eventQueue.take());
|
||||
}
|
||||
|
||||
private void handleNextNEvents(JHEvenHandlerForTest jheh, int numEvents)
|
||||
throws InterruptedException {
|
||||
for (int i = 0; i < numEvents; i++) {
|
||||
jheh.handleEvent(jheh.eventQueue.take());
|
||||
}
|
||||
}
|
||||
|
||||
private String setupTestWorkDir() {
|
||||
File testWorkDir = new File("target", this.getClass().getCanonicalName());
|
||||
try {
|
||||
FileContext.getLocalFSFileContext().delete(
|
||||
new Path(testWorkDir.getAbsolutePath()), true);
|
||||
return testWorkDir.getAbsolutePath();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Could not cleanup", e);
|
||||
throw new YarnException("could not cleanup test dir", e);
|
||||
}
|
||||
}
|
||||
|
||||
private AppContext mockAppContext(JobId jobId) {
|
||||
AppContext mockContext = mock(AppContext.class);
|
||||
Job mockJob = mock(Job.class);
|
||||
when(mockJob.getTotalMaps()).thenReturn(10);
|
||||
when(mockJob.getTotalReduces()).thenReturn(10);
|
||||
when(mockJob.getName()).thenReturn("mockjob");
|
||||
when(mockContext.getJob(jobId)).thenReturn(mockJob);
|
||||
return mockContext;
|
||||
}
|
||||
|
||||
|
||||
private class TestParams {
|
||||
String workDir = setupTestWorkDir();
|
||||
ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
BuilderUtils.newApplicationAttemptId(appId, 1);
|
||||
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
|
||||
TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
|
||||
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
||||
AppContext mockAppContext = mockAppContext(jobId);
|
||||
}
|
||||
}
|
||||
|
||||
class JHEvenHandlerForTest extends JobHistoryEventHandler {
|
||||
|
||||
private EventWriter eventWriter;
|
||||
volatile int handleEventCompleteCalls = 0;
|
||||
volatile int handleEventStartedCalls = 0;
|
||||
|
||||
public JHEvenHandlerForTest(AppContext context, int startCount) {
|
||||
super(context, startCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventWriter createEventWriter(Path historyFilePath)
|
||||
throws IOException {
|
||||
this.eventWriter = mock(EventWriter.class);
|
||||
return this.eventWriter;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void closeEventWriter(JobId jobId) {
|
||||
}
|
||||
|
||||
public EventWriter getEventWriter() {
|
||||
return this.eventWriter;
|
||||
}
|
||||
}
|
@ -436,6 +436,26 @@ public interface MRJobConfig {
|
||||
public static final String MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR =
|
||||
MR_AM_PREFIX + "create-intermediate-jh-base-dir";
|
||||
|
||||
public static final String MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS =
|
||||
MR_AM_PREFIX + "history.max-unflushed-events";
|
||||
public static final int DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS =
|
||||
200;
|
||||
|
||||
public static final String MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER =
|
||||
MR_AM_PREFIX + "history.job-complete-unflushed-multiplier";
|
||||
public static final int DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER =
|
||||
30;
|
||||
|
||||
public static final String MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS =
|
||||
MR_AM_PREFIX + "history.complete-event-flush-timeout";
|
||||
public static final long DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS =
|
||||
30 * 1000l;
|
||||
|
||||
public static final String MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
|
||||
MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold";
|
||||
public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
|
||||
50;
|
||||
|
||||
public static final String MAPRED_MAP_ADMIN_JAVA_OPTS =
|
||||
"mapreduce.admin.map.child.java.opts";
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user