MAPREDUCE-5562. Fixed MR App Master to perform pending tasks like staging-dir cleanup, sending job-end notification correctly when unregister with RM fails. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1529682 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-10-06 20:53:28 +00:00
parent 21181b6553
commit f0799c5536
14 changed files with 380 additions and 123 deletions

View File

@ -282,6 +282,10 @@ Release 2.1.2 - UNRELEASED
aren't heart-beating for a while, so that we can aggressively speculate aren't heart-beating for a while, so that we can aggressively speculate
instead of waiting for task-timeout (Xuan Gong via vinodkv) instead of waiting for task-timeout (Xuan Gong via vinodkv)
MAPREDUCE-5562. Fixed MR App Master to perform pending tasks like staging-dir
cleanup, sending job-end notification correctly when unregister with RM
fails. (Zhijie Shen via vinodkv)
Release 2.1.1-beta - 2013-09-23 Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -64,6 +64,6 @@ public interface AppContext {
boolean isLastAMRetry(); boolean isLastAMRetry();
boolean safeToReportTerminationToUser(); boolean hasSuccessfullyUnregistered();
} }

View File

@ -18,7 +18,21 @@
package org.apache.hadoop.mapreduce.v2.app; package org.apache.hadoop.mapreduce.v2.app;
import com.google.common.annotations.VisibleForTesting; import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -27,20 +41,37 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.jobhistory.*; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.*; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService; import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
@ -51,14 +82,26 @@
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.*; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.*; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
@ -95,14 +138,7 @@
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import java.io.IOException; import com.google.common.annotations.VisibleForTesting;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* The Map-Reduce Application Master. * The Map-Reduce Application Master.
@ -166,7 +202,8 @@ public class MRAppMaster extends CompositeService {
private Credentials jobCredentials = new Credentials(); // Filled during init private Credentials jobCredentials = new Credentials(); // Filled during init
protected UserGroupInformation currentUser; // Will be setup during init protected UserGroupInformation currentUser; // Will be setup during init
private volatile boolean isLastAMRetry = false; @VisibleForTesting
protected volatile boolean isLastAMRetry = false;
//Something happened and we should shut down right after we start up. //Something happened and we should shut down right after we start up.
boolean errorHappenedShutDown = false; boolean errorHappenedShutDown = false;
private String shutDownMessage = null; private String shutDownMessage = null;
@ -175,7 +212,7 @@ public class MRAppMaster extends CompositeService {
private long recoveredJobStartTime = 0; private long recoveredJobStartTime = 0;
@VisibleForTesting @VisibleForTesting
protected AtomicBoolean safeToReportTerminationToUser = protected AtomicBoolean successfullyUnregistered =
new AtomicBoolean(false); new AtomicBoolean(false);
public MRAppMaster(ApplicationAttemptId applicationAttemptId, public MRAppMaster(ApplicationAttemptId applicationAttemptId,
@ -208,14 +245,14 @@ protected void serviceInit(final Configuration conf) throws Exception {
initJobCredentialsAndUGI(conf); initJobCredentialsAndUGI(conf);
isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts; context = new RunningAppContext(conf);
((RunningAppContext)context).computeIsLastAMRetry();
LOG.info("The specific max attempts: " + maxAppAttempts + LOG.info("The specific max attempts: " + maxAppAttempts +
" for application: " + appAttemptID.getApplicationId().getId() + " for application: " + appAttemptID.getApplicationId().getId() +
". Attempt num: " + appAttemptID.getAttemptId() + ". Attempt num: " + appAttemptID.getAttemptId() +
" is last retry: " + isLastAMRetry); " is last retry: " + isLastAMRetry);
context = new RunningAppContext(conf);
// Job name is the same as the app name util we support DAG of jobs // Job name is the same as the app name util we support DAG of jobs
// for an app later // for an app later
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>"); appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
@ -511,11 +548,6 @@ public void shutDownJob() {
MRAppMaster.this.stop(); MRAppMaster.this.stop();
if (isLastAMRetry) { if (isLastAMRetry) {
// Except ClientService, other services are already stopped, it is safe to
// let clients know the final states. ClientService should wait for some
// time so clients have enough time to know the final states.
safeToReportTerminationToUser.set(true);
// Send job-end notification when it is safe to report termination to // Send job-end notification when it is safe to report termination to
// users and it is the last AM retry // users and it is the last AM retry
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
@ -524,7 +556,14 @@ public void shutDownJob() {
+ job.getReport().getJobId()); + job.getReport().getJobId());
JobEndNotifier notifier = new JobEndNotifier(); JobEndNotifier notifier = new JobEndNotifier();
notifier.setConf(getConfig()); notifier.setConf(getConfig());
notifier.notify(job.getReport()); JobReport report = job.getReport();
// If unregistration fails, the final state is unavailable. However,
// at the last AM Retry, the client will finally be notified FAILED
// from RM, so we should let users know FAILED via notifier as well
if (!context.hasSuccessfullyUnregistered()) {
report.setJobState(JobState.FAILED);
}
notifier.notify(report);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.warn("Job end notification interrupted for jobID : " LOG.warn("Job end notification interrupted for jobID : "
+ job.getReport().getJobId(), ie); + job.getReport().getJobId(), ie);
@ -863,7 +902,7 @@ protected void serviceStop() throws Exception {
} }
} }
private class RunningAppContext implements AppContext { public class RunningAppContext implements AppContext {
private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>(); private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
private final Configuration conf; private final Configuration conf;
@ -942,8 +981,16 @@ public boolean isLastAMRetry(){
} }
@Override @Override
public boolean safeToReportTerminationToUser() { public boolean hasSuccessfullyUnregistered() {
return safeToReportTerminationToUser.get(); return successfullyUnregistered.get();
}
public void markSuccessfulUnregistration() {
successfullyUnregistered.set(true);
}
public void computeIsLastAMRetry() {
isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
} }
} }

View File

@ -128,8 +128,6 @@
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import com.google.common.annotations.VisibleForTesting;
/** Implementation of Job interface. Maintains the state machines of Job. /** Implementation of Job interface. Maintains the state machines of Job.
* The read and write calls use ReadWriteLock for concurrency. * The read and write calls use ReadWriteLock for concurrency.
*/ */
@ -933,7 +931,7 @@ public JobState getState() {
readLock.lock(); readLock.lock();
try { try {
JobState state = getExternalState(getInternalState()); JobState state = getExternalState(getInternalState());
if (!appContext.safeToReportTerminationToUser() if (!appContext.hasSuccessfullyUnregistered()
&& (state == JobState.SUCCEEDED || state == JobState.FAILED && (state == JobState.SUCCEEDED || state == JobState.FAILED
|| state == JobState.KILLED || state == JobState.ERROR)) { || state == JobState.KILLED || state == JobState.ERROR)) {
return lastNonFinalState; return lastNonFinalState;

View File

@ -29,11 +29,11 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
@ -52,10 +52,13 @@
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Registers/unregisters to RM and sends heartbeats to RM. * Registers/unregisters to RM and sends heartbeats to RM.
*/ */
@ -171,41 +174,57 @@ private void setClientToAMToken(ByteBuffer clientToAMTokenMasterKey) {
protected void unregister() { protected void unregister() {
try { try {
FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED; doUnregistration();
JobImpl jobImpl = (JobImpl)job;
if (jobImpl.getInternalState() == JobStateInternal.SUCCEEDED) {
finishState = FinalApplicationStatus.SUCCEEDED;
} else if (jobImpl.getInternalState() == JobStateInternal.KILLED
|| (jobImpl.getInternalState() == JobStateInternal.RUNNING && isSignalled)) {
finishState = FinalApplicationStatus.KILLED;
} else if (jobImpl.getInternalState() == JobStateInternal.FAILED
|| jobImpl.getInternalState() == JobStateInternal.ERROR) {
finishState = FinalApplicationStatus.FAILED;
}
StringBuffer sb = new StringBuffer();
for (String s : job.getDiagnostics()) {
sb.append(s).append("\n");
}
LOG.info("Setting job diagnostics to " + sb.toString());
String historyUrl =
MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(getConfig(),
context.getApplicationID());
LOG.info("History url is " + historyUrl);
FinishApplicationMasterRequest request =
FinishApplicationMasterRequest.newInstance(finishState,
sb.toString(), historyUrl);
while (true) {
FinishApplicationMasterResponse response =
scheduler.finishApplicationMaster(request);
if (response.getIsUnregistered()) {
break;
}
LOG.info("Waiting for application to be successfully unregistered.");
Thread.sleep(rmPollInterval);
}
} catch(Exception are) { } catch(Exception are) {
LOG.error("Exception while unregistering ", are); LOG.error("Exception while unregistering ", are);
// if unregistration failed, isLastAMRetry needs to be recalculated
// to see whether AM really has the chance to retry
RunningAppContext raContext = (RunningAppContext) context;
raContext.computeIsLastAMRetry();
}
}
@VisibleForTesting
protected void doUnregistration()
throws YarnException, IOException, InterruptedException {
FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
JobImpl jobImpl = (JobImpl)job;
if (jobImpl.getInternalState() == JobStateInternal.SUCCEEDED) {
finishState = FinalApplicationStatus.SUCCEEDED;
} else if (jobImpl.getInternalState() == JobStateInternal.KILLED
|| (jobImpl.getInternalState() == JobStateInternal.RUNNING && isSignalled)) {
finishState = FinalApplicationStatus.KILLED;
} else if (jobImpl.getInternalState() == JobStateInternal.FAILED
|| jobImpl.getInternalState() == JobStateInternal.ERROR) {
finishState = FinalApplicationStatus.FAILED;
}
StringBuffer sb = new StringBuffer();
for (String s : job.getDiagnostics()) {
sb.append(s).append("\n");
}
LOG.info("Setting job diagnostics to " + sb.toString());
String historyUrl =
MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(getConfig(),
context.getApplicationID());
LOG.info("History url is " + historyUrl);
FinishApplicationMasterRequest request =
FinishApplicationMasterRequest.newInstance(finishState,
sb.toString(), historyUrl);
while (true) {
FinishApplicationMasterResponse response =
scheduler.finishApplicationMaster(request);
if (response.getIsUnregistered()) {
// When excepting ClientService, other services are already stopped,
// it is safe to let clients know the final states. ClientService
// should wait for some time so clients have enough time to know the
// final states.
RunningAppContext raContext = (RunningAppContext) context;
raContext.markSuccessfulUnregistration();
break;
}
LOG.info("Waiting for application to be successfully unregistered.");
Thread.sleep(rmPollInterval);
} }
} }
@ -235,7 +254,6 @@ protected void serviceStop() throws Exception {
protected void startAllocatorThread() { protected void startAllocatorThread() {
allocatorThread = new Thread(new Runnable() { allocatorThread = new Thread(new Runnable() {
@SuppressWarnings("unchecked")
@Override @Override
public void run() { public void run() {
while (!stopped.get() && !Thread.currentThread().isInterrupted()) { while (!stopped.get() && !Thread.currentThread().isInterrupted()) {

View File

@ -136,9 +136,9 @@ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, Clock clock, boolean shutdown) { boolean cleanOnStart, Clock clock, boolean unregistered) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock, this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock,
shutdown); unregistered);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
@ -147,8 +147,8 @@ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, boolean shutdown) { boolean cleanOnStart, boolean unregistered) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, shutdown); this(maps, reduces, autoComplete, testName, cleanOnStart, 1, unregistered);
} }
@Override @Override
@ -181,16 +181,16 @@ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, boolean shutdown) { boolean cleanOnStart, int startCount, boolean unregistered) {
this(maps, reduces, autoComplete, testName, cleanOnStart, startCount, this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
new SystemClock(), shutdown); new SystemClock(), unregistered);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock, boolean shutdown) { boolean cleanOnStart, int startCount, Clock clock, boolean unregistered) {
this(getApplicationAttemptId(applicationId, startCount), getContainerId( this(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), maps, reduces, autoComplete, testName, applicationId, startCount), maps, reduces, autoComplete, testName,
cleanOnStart, startCount, clock, shutdown); cleanOnStart, startCount, clock, unregistered);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
@ -202,9 +202,9 @@ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName, int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, boolean shutdown) { boolean cleanOnStart, int startCount, boolean unregistered) {
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName, this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
cleanOnStart, startCount, new SystemClock(), shutdown); cleanOnStart, startCount, new SystemClock(), unregistered);
} }
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
@ -216,7 +216,7 @@ public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName, int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock, boolean shutdown) { boolean cleanOnStart, int startCount, Clock clock, boolean unregistered) {
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); .currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
this.testWorkDir = new File("target", testName); this.testWorkDir = new File("target", testName);
@ -237,7 +237,7 @@ public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
this.autoComplete = autoComplete; this.autoComplete = autoComplete;
// If safeToReportTerminationToUser is set to true, we can verify whether // If safeToReportTerminationToUser is set to true, we can verify whether
// the job can reaches the final state when MRAppMaster shuts down. // the job can reaches the final state when MRAppMaster shuts down.
this.safeToReportTerminationToUser.set(shutdown); this.successfullyUnregistered.set(unregistered);
} }
@Override @Override

View File

@ -137,7 +137,7 @@ public boolean isLastAMRetry() {
} }
@Override @Override
public boolean safeToReportTerminationToUser() { public boolean hasSuccessfullyUnregistered() {
// bogus - Not Required // bogus - Not Required
return true; return true;
} }

View File

@ -21,6 +21,7 @@
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -41,10 +42,16 @@
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -185,25 +192,19 @@ public void testNotifyRetries() throws InterruptedException {
} }
@Test @Test
public void testNotificationOnNormalShutdown() throws Exception { public void testNotificationOnLastRetryNormalShutdown() throws Exception {
HttpServer server = startHttpServer(); HttpServer server = startHttpServer();
// Act like it is the second attempt. Default max attempts is 2 // Act like it is the second attempt. Default max attempts is 2
MRApp app = spy(new MRApp(2, 2, true, this.getClass().getName(), true, 2)); MRApp app = spy(new MRAppWithCustomContainerAllocator(
// Make use of safeToReportflag so that we can look at final job-state as 2, 2, true, this.getClass().getName(), true, 2, true));
// seen by real users.
app.safeToReportTerminationToUser.set(false);
doNothing().when(app).sysexit(); doNothing().when(app).sysexit();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus"); JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
JobImpl job = (JobImpl)app.submit(conf); JobImpl job = (JobImpl)app.submit(conf);
// Even though auto-complete is true, because app is not shut-down yet, user
// will only see RUNNING state.
app.waitForInternalState(job, JobStateInternal.SUCCEEDED); app.waitForInternalState(job, JobStateInternal.SUCCEEDED);
app.waitForState(job, JobState.RUNNING); // Unregistration succeeds: successfullyUnregistered is set
// Now shutdown. User should see SUCCEEDED state.
app.shutDownJob(); app.shutDownJob();
app.waitForState(job, JobState.SUCCEEDED);
Assert.assertEquals(true, app.isLastAMRetry()); Assert.assertEquals(true, app.isLastAMRetry());
Assert.assertEquals(1, JobEndServlet.calledTimes); Assert.assertEquals(1, JobEndServlet.calledTimes);
Assert.assertEquals("jobid=" + job.getID() + "&status=SUCCEEDED", Assert.assertEquals("jobid=" + job.getID() + "&status=SUCCEEDED",
@ -214,24 +215,25 @@ public void testNotificationOnNormalShutdown() throws Exception {
} }
@Test @Test
public void testNotificationOnNonLastRetryShutdown() throws Exception { public void testAbsentNotificationOnNotLastRetryUnregistrationFailure()
throws Exception {
HttpServer server = startHttpServer(); HttpServer server = startHttpServer();
MRApp app = spy(new MRApp(2, 2, false, this.getClass().getName(), true)); MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false,
this.getClass().getName(), true, 1, false));
doNothing().when(app).sysexit(); doNothing().when(app).sysexit();
// Make use of safeToReportflag so that we can look at final job-state as
// seen by real users.
app.safeToReportTerminationToUser.set(false);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus"); JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
JobImpl job = (JobImpl)app.submit(new Configuration()); JobImpl job = (JobImpl)app.submit(conf);
app.waitForState(job, JobState.RUNNING); app.waitForState(job, JobState.RUNNING);
app.getContext().getEventHandler() app.getContext().getEventHandler()
.handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT)); .handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT));
app.waitForInternalState(job, JobStateInternal.REBOOT); app.waitForInternalState(job, JobStateInternal.REBOOT);
// Now shutdown.
// Unregistration fails: isLastAMRetry is recalculated, this is not
app.shutDownJob();
// Not the last AM attempt. So user should that the job is still running. // Not the last AM attempt. So user should that the job is still running.
app.waitForState(job, JobState.RUNNING); app.waitForState(job, JobState.RUNNING);
app.shutDownJob();
Assert.assertEquals(false, app.isLastAMRetry()); Assert.assertEquals(false, app.isLastAMRetry());
Assert.assertEquals(0, JobEndServlet.calledTimes); Assert.assertEquals(0, JobEndServlet.calledTimes);
Assert.assertEquals(null, JobEndServlet.requestUri); Assert.assertEquals(null, JobEndServlet.requestUri);
@ -239,6 +241,33 @@ public void testNotificationOnNonLastRetryShutdown() throws Exception {
server.stop(); server.stop();
} }
@Test
public void testNotificationOnLastRetryUnregistrationFailure()
throws Exception {
HttpServer server = startHttpServer();
MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false,
this.getClass().getName(), true, 2, false));
doNothing().when(app).sysexit();
Configuration conf = new Configuration();
conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
JobImpl job = (JobImpl)app.submit(conf);
app.waitForState(job, JobState.RUNNING);
app.getContext().getEventHandler()
.handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT));
app.waitForInternalState(job, JobStateInternal.REBOOT);
// Now shutdown. User should see FAILED state.
// Unregistration fails: isLastAMRetry is recalculated, this is
app.shutDownJob();
Assert.assertEquals(true, app.isLastAMRetry());
Assert.assertEquals(1, JobEndServlet.calledTimes);
Assert.assertEquals("jobid=" + job.getID() + "&status=FAILED",
JobEndServlet.requestUri.getQuery());
Assert.assertEquals(JobState.FAILED.toString(),
JobEndServlet.foundJobState);
server.stop();
}
private static HttpServer startHttpServer() throws Exception { private static HttpServer startHttpServer() throws Exception {
new File(System.getProperty( new File(System.getProperty(
"build.webapps", "build/webapps") + "/test").mkdirs(); "build.webapps", "build/webapps") + "/test").mkdirs();
@ -280,4 +309,83 @@ public void doGet(HttpServletRequest request, HttpServletResponse response)
} }
} }
private class MRAppWithCustomContainerAllocator extends MRApp {
private boolean crushUnregistration;
public MRAppWithCustomContainerAllocator(int maps, int reduces,
boolean autoComplete, String testName, boolean cleanOnStart,
int startCount, boolean crushUnregistration) {
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
false);
this.crushUnregistration = crushUnregistration;
}
@Override
protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context) {
context = spy(context);
when(context.getEventHandler()).thenReturn(null);
when(context.getApplicationID()).thenReturn(null);
return new CustomContainerAllocator(this, context);
}
private class CustomContainerAllocator
extends RMCommunicator
implements ContainerAllocator, RMHeartbeatHandler {
private MRAppWithCustomContainerAllocator app;
private MRAppContainerAllocator allocator =
new MRAppContainerAllocator();
public CustomContainerAllocator(
MRAppWithCustomContainerAllocator app, AppContext context) {
super(null, context);
this.app = app;
}
@Override
public void serviceInit(Configuration conf) {
}
@Override
public void serviceStart() {
}
@Override
public void serviceStop() {
unregister();
}
@Override
protected void doUnregistration()
throws YarnException, IOException, InterruptedException {
if (crushUnregistration) {
app.successfullyUnregistered.set(true);
} else {
throw new YarnException("test exception");
}
}
@Override
public void handle(ContainerAllocatorEvent event) {
allocator.handle(event);
}
@Override
public long getLastHeartbeatTime() {
return allocator.getLastHeartbeatTime();
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
allocator.runOnNextHeartbeat(callback);
}
@Override
protected void heartbeat() throws Exception {
}
}
}
} }

View File

@ -29,7 +29,6 @@
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@ -44,7 +43,6 @@
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@ -55,15 +53,12 @@
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.Clock;
import org.junit.Test; import org.junit.Test;
/** /**
@ -384,12 +379,13 @@ public void testJobSuccess() throws Exception {
// AM is not unregistered // AM is not unregistered
Assert.assertEquals(JobState.RUNNING, job.getState()); Assert.assertEquals(JobState.RUNNING, job.getState());
// imitate that AM is unregistered // imitate that AM is unregistered
app.safeToReportTerminationToUser.set(true); app.successfullyUnregistered.set(true);
app.waitForState(job, JobState.SUCCEEDED); app.waitForState(job, JobState.SUCCEEDED);
} }
@Test @Test
public void testJobRebootNotLastRetry() throws Exception { public void testJobRebootNotLastRetryOnUnregistrationFailure()
throws Exception {
MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true); MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
Job job = app.submit(new Configuration()); Job job = app.submit(new Configuration());
app.waitForState(job, JobState.RUNNING); app.waitForState(job, JobState.RUNNING);
@ -408,10 +404,12 @@ public void testJobRebootNotLastRetry() throws Exception {
} }
@Test @Test
public void testJobRebootOnLastRetry() throws Exception { public void testJobRebootOnLastRetryOnUnregistrationFailure()
throws Exception {
// make startCount as 2 since this is last retry which equals to // make startCount as 2 since this is last retry which equals to
// DEFAULT_MAX_AM_RETRY // DEFAULT_MAX_AM_RETRY
MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2); // The last param mocks the unregistration failure
MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2, false);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
Job job = app.submit(conf); Job job = app.submit(conf);
@ -425,8 +423,10 @@ public void testJobRebootOnLastRetry() throws Exception {
app.getContext().getEventHandler().handle(new JobEvent(job.getID(), app.getContext().getEventHandler().handle(new JobEvent(job.getID(),
JobEventType.JOB_AM_REBOOT)); JobEventType.JOB_AM_REBOOT));
// return exteranl state as ERROR if this is the last retry app.waitForInternalState((JobImpl) job, JobStateInternal.REBOOT);
app.waitForState(job, JobState.ERROR); // return exteranl state as RUNNING if this is the last retry while
// unregistration fails
app.waitForState(job, JobState.RUNNING);
} }
private final class MRAppWithSpiedJob extends MRApp { private final class MRAppWithSpiedJob extends MRApp {

View File

@ -869,7 +869,7 @@ public boolean isLastAMRetry() {
} }
@Override @Override
public boolean safeToReportTerminationToUser() { public boolean hasSuccessfullyUnregistered() {
// bogus - Not Required // bogus - Not Required
return true; return true;
} }

View File

@ -21,6 +21,7 @@
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -36,18 +37,17 @@
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -57,7 +57,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -75,7 +75,44 @@ public class TestStagingCleanup extends TestCase {
private Path stagingJobPath = new Path(stagingJobDir); private Path stagingJobPath = new Path(stagingJobDir);
private final static RecordFactory recordFactory = RecordFactoryProvider. private final static RecordFactory recordFactory = RecordFactoryProvider.
getRecordFactory(null); getRecordFactory(null);
@Test
public void testDeletionofStagingOnUnregistrationFailure()
throws IOException {
testDeletionofStagingOnUnregistrationFailure(2, false);
testDeletionofStagingOnUnregistrationFailure(1, true);
}
@SuppressWarnings("resource")
private void testDeletionofStagingOnUnregistrationFailure(
int maxAttempts, boolean shouldHaveDeleted) throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
//Staging Dir exists
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
TestMRApp appMaster = new TestMRApp(attemptId, null,
JobStateInternal.RUNNING, maxAttempts);
appMaster.crushUnregistration = true;
appMaster.init(conf);
appMaster.start();
appMaster.shutDownJob();
((RunningAppContext) appMaster.getContext()).computeIsLastAMRetry();
if (shouldHaveDeleted) {
Assert.assertEquals(new Boolean(true), appMaster.isLastAMRetry());
verify(fs).delete(stagingJobPath, true);
} else {
Assert.assertEquals(new Boolean(false), appMaster.isLastAMRetry());
verify(fs, never()).delete(stagingJobPath, true);
}
}
@Test @Test
public void testDeletionofStaging() throws IOException { public void testDeletionofStaging() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
@ -204,6 +241,7 @@ private class TestMRApp extends MRAppMaster {
ContainerAllocator allocator; ContainerAllocator allocator;
boolean testIsLastAMRetry = false; boolean testIsLastAMRetry = false;
JobStateInternal jobStateInternal; JobStateInternal jobStateInternal;
boolean crushUnregistration = false;
public TestMRApp(ApplicationAttemptId applicationAttemptId, public TestMRApp(ApplicationAttemptId applicationAttemptId,
ContainerAllocator allocator, int maxAppAttempts) { ContainerAllocator allocator, int maxAppAttempts) {
@ -211,6 +249,7 @@ public TestMRApp(ApplicationAttemptId applicationAttemptId,
applicationAttemptId, 1), "testhost", 2222, 3333, applicationAttemptId, 1), "testhost", 2222, 3333,
System.currentTimeMillis(), maxAppAttempts); System.currentTimeMillis(), maxAppAttempts);
this.allocator = allocator; this.allocator = allocator;
this.successfullyUnregistered.set(true);
} }
public TestMRApp(ApplicationAttemptId applicationAttemptId, public TestMRApp(ApplicationAttemptId applicationAttemptId,
@ -229,7 +268,11 @@ protected FileSystem getFileSystem(Configuration conf) {
protected ContainerAllocator createContainerAllocator( protected ContainerAllocator createContainerAllocator(
final ClientService clientService, final AppContext context) { final ClientService clientService, final AppContext context) {
if(allocator == null) { if(allocator == null) {
return super.createContainerAllocator(clientService, context); if (crushUnregistration) {
return new CustomContainerAllocator(context);
} else {
return super.createContainerAllocator(clientService, context);
}
} }
return allocator; return allocator;
} }
@ -280,6 +323,41 @@ protected void initJobCredentialsAndUGI(Configuration conf) {
public boolean getTestIsLastAMRetry(){ public boolean getTestIsLastAMRetry(){
return testIsLastAMRetry; return testIsLastAMRetry;
} }
private class CustomContainerAllocator extends RMCommunicator
implements ContainerAllocator {
public CustomContainerAllocator(AppContext context) {
super(null, context);
}
@Override
public void serviceInit(Configuration conf) {
}
@Override
public void serviceStart() {
}
@Override
public void serviceStop() {
unregister();
}
@Override
protected void doUnregistration()
throws YarnException, IOException, InterruptedException {
throw new YarnException("test exception");
}
@Override
protected void heartbeat() throws Exception {
}
@Override
public void handle(ContainerAllocatorEvent event) {
}
}
} }
private final class MRAppTestCleanup extends MRApp { private final class MRAppTestCleanup extends MRApp {

View File

@ -275,7 +275,7 @@ public void testRebootedDuringCommit() throws Exception {
AppContext mockContext = mock(AppContext.class); AppContext mockContext = mock(AppContext.class);
when(mockContext.isLastAMRetry()).thenReturn(true); when(mockContext.isLastAMRetry()).thenReturn(true);
when(mockContext.safeToReportTerminationToUser()).thenReturn(false); when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false);
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext); JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext);
completeJobTasks(job); completeJobTasks(job);
assertJobState(job, JobStateInternal.COMMITTING); assertJobState(job, JobStateInternal.COMMITTING);
@ -285,7 +285,7 @@ public void testRebootedDuringCommit() throws Exception {
assertJobState(job, JobStateInternal.REBOOT); assertJobState(job, JobStateInternal.REBOOT);
// return the external state as ERROR since this is last retry. // return the external state as ERROR since this is last retry.
Assert.assertEquals(JobState.RUNNING, job.getState()); Assert.assertEquals(JobState.RUNNING, job.getState());
when(mockContext.safeToReportTerminationToUser()).thenReturn(true); when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true);
Assert.assertEquals(JobState.ERROR, job.getState()); Assert.assertEquals(JobState.ERROR, job.getState());
dispatcher.stop(); dispatcher.stop();
@ -594,7 +594,7 @@ public void testReportDiagnostics() throws Exception {
new JobDiagnosticsUpdateEvent(jobId, diagMsg); new JobDiagnosticsUpdateEvent(jobId, diagMsg);
MRAppMetrics mrAppMetrics = MRAppMetrics.create(); MRAppMetrics mrAppMetrics = MRAppMetrics.create();
AppContext mockContext = mock(AppContext.class); AppContext mockContext = mock(AppContext.class);
when(mockContext.safeToReportTerminationToUser()).thenReturn(true); when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true);
JobImpl job = new JobImpl(jobId, Records JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), new Configuration(), .newRecord(ApplicationAttemptId.class), new Configuration(),
mock(EventHandler.class), mock(EventHandler.class),
@ -705,7 +705,7 @@ public void testTransitionsAtFailed() throws IOException {
commitHandler.start(); commitHandler.start();
AppContext mockContext = mock(AppContext.class); AppContext mockContext = mock(AppContext.class);
when(mockContext.safeToReportTerminationToUser()).thenReturn(false); when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false);
JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext); JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
JobId jobId = job.getID(); JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
@ -722,7 +722,7 @@ public void testTransitionsAtFailed() throws IOException {
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
assertJobState(job, JobStateInternal.FAILED); assertJobState(job, JobStateInternal.FAILED);
Assert.assertEquals(JobState.RUNNING, job.getState()); Assert.assertEquals(JobState.RUNNING, job.getState());
when(mockContext.safeToReportTerminationToUser()).thenReturn(true); when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true);
Assert.assertEquals(JobState.FAILED, job.getState()); Assert.assertEquals(JobState.FAILED, job.getState());
dispatcher.stop(); dispatcher.stop();
@ -762,7 +762,7 @@ private static StubbedJob createStubbedJob(Configuration conf,
JobId jobId = TypeConverter.toYarn(jobID); JobId jobId = TypeConverter.toYarn(jobID);
if (appContext == null) { if (appContext == null) {
appContext = mock(AppContext.class); appContext = mock(AppContext.class);
when(appContext.safeToReportTerminationToUser()).thenReturn(true); when(appContext.hasSuccessfullyUnregistered()).thenReturn(true);
} }
StubbedJob job = new StubbedJob(jobId, StubbedJob job = new StubbedJob(jobId,
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0), ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0),

View File

@ -88,6 +88,10 @@ public StubbedLocalContainerAllocator() {
protected void register() { protected void register() {
} }
@Override
protected void unregister() {
}
@Override @Override
protected void startAllocatorThread() { protected void startAllocatorThread() {
allocatorThread = new Thread(); allocatorThread = new Thread();

View File

@ -389,7 +389,7 @@ public boolean isLastAMRetry() {
} }
@Override @Override
public boolean safeToReportTerminationToUser() { public boolean hasSuccessfullyUnregistered() {
// bogus - Not Required // bogus - Not Required
return true; return true;
} }