From f0799c55360e1e77224955f331892390e4361729 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Sun, 6 Oct 2013 20:53:28 +0000 Subject: [PATCH] MAPREDUCE-5562. Fixed MR App Master to perform pending tasks like staging-dir cleanup, sending job-end notification correctly when unregister with RM fails. Contributed by Zhijie Shen. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1529682 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 + .../hadoop/mapreduce/v2/app/AppContext.java | 2 +- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 105 +++++++++---- .../mapreduce/v2/app/job/impl/JobImpl.java | 4 +- .../mapreduce/v2/app/rm/RMCommunicator.java | 88 ++++++----- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 24 +-- .../mapreduce/v2/app/MockAppContext.java | 2 +- .../mapreduce/v2/app/TestJobEndNotifier.java | 142 +++++++++++++++--- .../hadoop/mapreduce/v2/app/TestMRApp.java | 22 +-- .../v2/app/TestRuntimeEstimators.java | 2 +- .../mapreduce/v2/app/TestStagingCleanup.java | 90 ++++++++++- .../v2/app/job/impl/TestJobImpl.java | 12 +- .../local/TestLocalContainerAllocator.java | 4 + .../hadoop/mapreduce/v2/hs/JobHistory.java | 2 +- 14 files changed, 380 insertions(+), 123 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 8c10325e38..5ca29b021b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -282,6 +282,10 @@ Release 2.1.2 - UNRELEASED aren't heart-beating for a while, so that we can aggressively speculate instead of waiting for task-timeout (Xuan Gong via vinodkv) + MAPREDUCE-5562. Fixed MR App Master to perform pending tasks like staging-dir + cleanup, sending job-end notification correctly when unregister with RM + fails. (Zhijie Shen via vinodkv) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java index 36482aebe3..6f036c4a74 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java @@ -64,6 +64,6 @@ public interface AppContext { boolean isLastAMRetry(); - boolean safeToReportTerminationToUser(); + boolean hasSuccessfullyUnregistered(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 1509cb51e7..b60b64764a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -18,7 +18,21 @@ package org.apache.hadoop.mapreduce.v2.app; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,20 +41,37 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.http.HttpConfig; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapred.FileOutputCommitter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.LocalContainerLauncher; +import org.apache.hadoop.mapred.TaskAttemptListenerImpl; +import org.apache.hadoop.mapred.TaskUmbilicalProtocol; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.jobhistory.*; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; +import org.apache.hadoop.mapreduce.jobhistory.EventReader; +import org.apache.hadoop.mapreduce.jobhistory.EventType; +import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.apache.hadoop.mapreduce.v2.api.records.*; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.MRClientService; @@ -51,14 +82,26 @@ import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; -import org.apache.hadoop.mapreduce.v2.app.job.event.*; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; -import org.apache.hadoop.mapreduce.v2.app.rm.*; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; +import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; @@ -95,14 +138,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.SystemClock; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.security.PrivilegedExceptionAction; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; /** * The Map-Reduce Application Master. @@ -166,7 +202,8 @@ public class MRAppMaster extends CompositeService { private Credentials jobCredentials = new Credentials(); // Filled during init protected UserGroupInformation currentUser; // Will be setup during init - private volatile boolean isLastAMRetry = false; + @VisibleForTesting + protected volatile boolean isLastAMRetry = false; //Something happened and we should shut down right after we start up. boolean errorHappenedShutDown = false; private String shutDownMessage = null; @@ -175,7 +212,7 @@ public class MRAppMaster extends CompositeService { private long recoveredJobStartTime = 0; @VisibleForTesting - protected AtomicBoolean safeToReportTerminationToUser = + protected AtomicBoolean successfullyUnregistered = new AtomicBoolean(false); public MRAppMaster(ApplicationAttemptId applicationAttemptId, @@ -208,14 +245,14 @@ protected void serviceInit(final Configuration conf) throws Exception { initJobCredentialsAndUGI(conf); - isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts; + context = new RunningAppContext(conf); + + ((RunningAppContext)context).computeIsLastAMRetry(); LOG.info("The specific max attempts: " + maxAppAttempts + " for application: " + appAttemptID.getApplicationId().getId() + ". Attempt num: " + appAttemptID.getAttemptId() + " is last retry: " + isLastAMRetry); - context = new RunningAppContext(conf); - // Job name is the same as the app name util we support DAG of jobs // for an app later appName = conf.get(MRJobConfig.JOB_NAME, ""); @@ -511,11 +548,6 @@ public void shutDownJob() { MRAppMaster.this.stop(); if (isLastAMRetry) { - // Except ClientService, other services are already stopped, it is safe to - // let clients know the final states. ClientService should wait for some - // time so clients have enough time to know the final states. - safeToReportTerminationToUser.set(true); - // Send job-end notification when it is safe to report termination to // users and it is the last AM retry if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { @@ -524,7 +556,14 @@ public void shutDownJob() { + job.getReport().getJobId()); JobEndNotifier notifier = new JobEndNotifier(); notifier.setConf(getConfig()); - notifier.notify(job.getReport()); + JobReport report = job.getReport(); + // If unregistration fails, the final state is unavailable. However, + // at the last AM Retry, the client will finally be notified FAILED + // from RM, so we should let users know FAILED via notifier as well + if (!context.hasSuccessfullyUnregistered()) { + report.setJobState(JobState.FAILED); + } + notifier.notify(report); } catch (InterruptedException ie) { LOG.warn("Job end notification interrupted for jobID : " + job.getReport().getJobId(), ie); @@ -863,7 +902,7 @@ protected void serviceStop() throws Exception { } } - private class RunningAppContext implements AppContext { + public class RunningAppContext implements AppContext { private final Map jobs = new ConcurrentHashMap(); private final Configuration conf; @@ -942,8 +981,16 @@ public boolean isLastAMRetry(){ } @Override - public boolean safeToReportTerminationToUser() { - return safeToReportTerminationToUser.get(); + public boolean hasSuccessfullyUnregistered() { + return successfullyUnregistered.get(); + } + + public void markSuccessfulUnregistration() { + successfullyUnregistered.set(true); + } + + public void computeIsLastAMRetry() { + isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 6241df905f..c884a51cbf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -128,8 +128,6 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; -import com.google.common.annotations.VisibleForTesting; - /** Implementation of Job interface. Maintains the state machines of Job. * The read and write calls use ReadWriteLock for concurrency. */ @@ -933,7 +931,7 @@ public JobState getState() { readLock.lock(); try { JobState state = getExternalState(getInternalState()); - if (!appContext.safeToReportTerminationToUser() + if (!appContext.hasSuccessfullyUnregistered() && (state == JobState.SUCCEEDED || state == JobState.FAILED || state == JobState.KILLED || state == JobState.ERROR)) { return lastNonFinalState; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 67c632a87c..f09ac744d5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -29,11 +29,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; @@ -52,10 +52,13 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import com.google.common.annotations.VisibleForTesting; + /** * Registers/unregisters to RM and sends heartbeats to RM. */ @@ -171,41 +174,57 @@ private void setClientToAMToken(ByteBuffer clientToAMTokenMasterKey) { protected void unregister() { try { - FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED; - JobImpl jobImpl = (JobImpl)job; - if (jobImpl.getInternalState() == JobStateInternal.SUCCEEDED) { - finishState = FinalApplicationStatus.SUCCEEDED; - } else if (jobImpl.getInternalState() == JobStateInternal.KILLED - || (jobImpl.getInternalState() == JobStateInternal.RUNNING && isSignalled)) { - finishState = FinalApplicationStatus.KILLED; - } else if (jobImpl.getInternalState() == JobStateInternal.FAILED - || jobImpl.getInternalState() == JobStateInternal.ERROR) { - finishState = FinalApplicationStatus.FAILED; - } - StringBuffer sb = new StringBuffer(); - for (String s : job.getDiagnostics()) { - sb.append(s).append("\n"); - } - LOG.info("Setting job diagnostics to " + sb.toString()); - - String historyUrl = - MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(getConfig(), - context.getApplicationID()); - LOG.info("History url is " + historyUrl); - FinishApplicationMasterRequest request = - FinishApplicationMasterRequest.newInstance(finishState, - sb.toString(), historyUrl); - while (true) { - FinishApplicationMasterResponse response = - scheduler.finishApplicationMaster(request); - if (response.getIsUnregistered()) { - break; - } - LOG.info("Waiting for application to be successfully unregistered."); - Thread.sleep(rmPollInterval); - } + doUnregistration(); } catch(Exception are) { LOG.error("Exception while unregistering ", are); + // if unregistration failed, isLastAMRetry needs to be recalculated + // to see whether AM really has the chance to retry + RunningAppContext raContext = (RunningAppContext) context; + raContext.computeIsLastAMRetry(); + } + } + + @VisibleForTesting + protected void doUnregistration() + throws YarnException, IOException, InterruptedException { + FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED; + JobImpl jobImpl = (JobImpl)job; + if (jobImpl.getInternalState() == JobStateInternal.SUCCEEDED) { + finishState = FinalApplicationStatus.SUCCEEDED; + } else if (jobImpl.getInternalState() == JobStateInternal.KILLED + || (jobImpl.getInternalState() == JobStateInternal.RUNNING && isSignalled)) { + finishState = FinalApplicationStatus.KILLED; + } else if (jobImpl.getInternalState() == JobStateInternal.FAILED + || jobImpl.getInternalState() == JobStateInternal.ERROR) { + finishState = FinalApplicationStatus.FAILED; + } + StringBuffer sb = new StringBuffer(); + for (String s : job.getDiagnostics()) { + sb.append(s).append("\n"); + } + LOG.info("Setting job diagnostics to " + sb.toString()); + + String historyUrl = + MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(getConfig(), + context.getApplicationID()); + LOG.info("History url is " + historyUrl); + FinishApplicationMasterRequest request = + FinishApplicationMasterRequest.newInstance(finishState, + sb.toString(), historyUrl); + while (true) { + FinishApplicationMasterResponse response = + scheduler.finishApplicationMaster(request); + if (response.getIsUnregistered()) { + // When excepting ClientService, other services are already stopped, + // it is safe to let clients know the final states. ClientService + // should wait for some time so clients have enough time to know the + // final states. + RunningAppContext raContext = (RunningAppContext) context; + raContext.markSuccessfulUnregistration(); + break; + } + LOG.info("Waiting for application to be successfully unregistered."); + Thread.sleep(rmPollInterval); } } @@ -235,7 +254,6 @@ protected void serviceStop() throws Exception { protected void startAllocatorThread() { allocatorThread = new Thread(new Runnable() { - @SuppressWarnings("unchecked") @Override public void run() { while (!stopped.get() && !Thread.currentThread().isInterrupted()) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 3a7e865c7b..de573fe300 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -136,9 +136,9 @@ public MRApp(int maps, int reduces, boolean autoComplete, String testName, } public MRApp(int maps, int reduces, boolean autoComplete, String testName, - boolean cleanOnStart, Clock clock, boolean shutdown) { + boolean cleanOnStart, Clock clock, boolean unregistered) { this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock, - shutdown); + unregistered); } public MRApp(int maps, int reduces, boolean autoComplete, String testName, @@ -147,8 +147,8 @@ public MRApp(int maps, int reduces, boolean autoComplete, String testName, } public MRApp(int maps, int reduces, boolean autoComplete, String testName, - boolean cleanOnStart, boolean shutdown) { - this(maps, reduces, autoComplete, testName, cleanOnStart, 1, shutdown); + boolean cleanOnStart, boolean unregistered) { + this(maps, reduces, autoComplete, testName, cleanOnStart, 1, unregistered); } @Override @@ -181,16 +181,16 @@ public MRApp(int maps, int reduces, boolean autoComplete, String testName, } public MRApp(int maps, int reduces, boolean autoComplete, String testName, - boolean cleanOnStart, int startCount, boolean shutdown) { + boolean cleanOnStart, int startCount, boolean unregistered) { this(maps, reduces, autoComplete, testName, cleanOnStart, startCount, - new SystemClock(), shutdown); + new SystemClock(), unregistered); } public MRApp(int maps, int reduces, boolean autoComplete, String testName, - boolean cleanOnStart, int startCount, Clock clock, boolean shutdown) { + boolean cleanOnStart, int startCount, Clock clock, boolean unregistered) { this(getApplicationAttemptId(applicationId, startCount), getContainerId( applicationId, startCount), maps, reduces, autoComplete, testName, - cleanOnStart, startCount, clock, shutdown); + cleanOnStart, startCount, clock, unregistered); } public MRApp(int maps, int reduces, boolean autoComplete, String testName, @@ -202,9 +202,9 @@ public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, int maps, int reduces, boolean autoComplete, String testName, - boolean cleanOnStart, int startCount, boolean shutdown) { + boolean cleanOnStart, int startCount, boolean unregistered) { this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName, - cleanOnStart, startCount, new SystemClock(), shutdown); + cleanOnStart, startCount, new SystemClock(), unregistered); } public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, @@ -216,7 +216,7 @@ public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, int maps, int reduces, boolean autoComplete, String testName, - boolean cleanOnStart, int startCount, Clock clock, boolean shutdown) { + boolean cleanOnStart, int startCount, Clock clock, boolean unregistered) { super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System .currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); this.testWorkDir = new File("target", testName); @@ -237,7 +237,7 @@ public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, this.autoComplete = autoComplete; // If safeToReportTerminationToUser is set to true, we can verify whether // the job can reaches the final state when MRAppMaster shuts down. - this.safeToReportTerminationToUser.set(shutdown); + this.successfullyUnregistered.set(unregistered); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java index 0496072986..d33e734f83 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java @@ -137,7 +137,7 @@ public boolean isLastAMRetry() { } @Override - public boolean safeToReportTerminationToUser() { + public boolean hasSuccessfullyUnregistered() { // bogus - Not Required return true; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java index bd8baf400f..116c32cc7b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; @@ -41,10 +42,16 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.junit.Assert; import org.junit.Test; @@ -185,25 +192,19 @@ public void testNotifyRetries() throws InterruptedException { } @Test - public void testNotificationOnNormalShutdown() throws Exception { + public void testNotificationOnLastRetryNormalShutdown() throws Exception { HttpServer server = startHttpServer(); // Act like it is the second attempt. Default max attempts is 2 - MRApp app = spy(new MRApp(2, 2, true, this.getClass().getName(), true, 2)); - // Make use of safeToReportflag so that we can look at final job-state as - // seen by real users. - app.safeToReportTerminationToUser.set(false); + MRApp app = spy(new MRAppWithCustomContainerAllocator( + 2, 2, true, this.getClass().getName(), true, 2, true)); doNothing().when(app).sysexit(); Configuration conf = new Configuration(); conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus"); JobImpl job = (JobImpl)app.submit(conf); - // Even though auto-complete is true, because app is not shut-down yet, user - // will only see RUNNING state. app.waitForInternalState(job, JobStateInternal.SUCCEEDED); - app.waitForState(job, JobState.RUNNING); - // Now shutdown. User should see SUCCEEDED state. + // Unregistration succeeds: successfullyUnregistered is set app.shutDownJob(); - app.waitForState(job, JobState.SUCCEEDED); Assert.assertEquals(true, app.isLastAMRetry()); Assert.assertEquals(1, JobEndServlet.calledTimes); Assert.assertEquals("jobid=" + job.getID() + "&status=SUCCEEDED", @@ -214,24 +215,25 @@ public void testNotificationOnNormalShutdown() throws Exception { } @Test - public void testNotificationOnNonLastRetryShutdown() throws Exception { + public void testAbsentNotificationOnNotLastRetryUnregistrationFailure() + throws Exception { HttpServer server = startHttpServer(); - MRApp app = spy(new MRApp(2, 2, false, this.getClass().getName(), true)); + MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false, + this.getClass().getName(), true, 1, false)); doNothing().when(app).sysexit(); - // Make use of safeToReportflag so that we can look at final job-state as - // seen by real users. - app.safeToReportTerminationToUser.set(false); Configuration conf = new Configuration(); conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus"); - JobImpl job = (JobImpl)app.submit(new Configuration()); + JobImpl job = (JobImpl)app.submit(conf); app.waitForState(job, JobState.RUNNING); app.getContext().getEventHandler() .handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT)); app.waitForInternalState(job, JobStateInternal.REBOOT); + // Now shutdown. + // Unregistration fails: isLastAMRetry is recalculated, this is not + app.shutDownJob(); // Not the last AM attempt. So user should that the job is still running. app.waitForState(job, JobState.RUNNING); - app.shutDownJob(); Assert.assertEquals(false, app.isLastAMRetry()); Assert.assertEquals(0, JobEndServlet.calledTimes); Assert.assertEquals(null, JobEndServlet.requestUri); @@ -239,6 +241,33 @@ public void testNotificationOnNonLastRetryShutdown() throws Exception { server.stop(); } + @Test + public void testNotificationOnLastRetryUnregistrationFailure() + throws Exception { + HttpServer server = startHttpServer(); + MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false, + this.getClass().getName(), true, 2, false)); + doNothing().when(app).sysexit(); + Configuration conf = new Configuration(); + conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, + JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus"); + JobImpl job = (JobImpl)app.submit(conf); + app.waitForState(job, JobState.RUNNING); + app.getContext().getEventHandler() + .handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT)); + app.waitForInternalState(job, JobStateInternal.REBOOT); + // Now shutdown. User should see FAILED state. + // Unregistration fails: isLastAMRetry is recalculated, this is + app.shutDownJob(); + Assert.assertEquals(true, app.isLastAMRetry()); + Assert.assertEquals(1, JobEndServlet.calledTimes); + Assert.assertEquals("jobid=" + job.getID() + "&status=FAILED", + JobEndServlet.requestUri.getQuery()); + Assert.assertEquals(JobState.FAILED.toString(), + JobEndServlet.foundJobState); + server.stop(); + } + private static HttpServer startHttpServer() throws Exception { new File(System.getProperty( "build.webapps", "build/webapps") + "/test").mkdirs(); @@ -280,4 +309,83 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) } } + private class MRAppWithCustomContainerAllocator extends MRApp { + + private boolean crushUnregistration; + + public MRAppWithCustomContainerAllocator(int maps, int reduces, + boolean autoComplete, String testName, boolean cleanOnStart, + int startCount, boolean crushUnregistration) { + super(maps, reduces, autoComplete, testName, cleanOnStart, startCount, + false); + this.crushUnregistration = crushUnregistration; + } + + @Override + protected ContainerAllocator createContainerAllocator( + ClientService clientService, AppContext context) { + context = spy(context); + when(context.getEventHandler()).thenReturn(null); + when(context.getApplicationID()).thenReturn(null); + return new CustomContainerAllocator(this, context); + } + + private class CustomContainerAllocator + extends RMCommunicator + implements ContainerAllocator, RMHeartbeatHandler { + private MRAppWithCustomContainerAllocator app; + private MRAppContainerAllocator allocator = + new MRAppContainerAllocator(); + + public CustomContainerAllocator( + MRAppWithCustomContainerAllocator app, AppContext context) { + super(null, context); + this.app = app; + } + + @Override + public void serviceInit(Configuration conf) { + } + + @Override + public void serviceStart() { + } + + @Override + public void serviceStop() { + unregister(); + } + + @Override + protected void doUnregistration() + throws YarnException, IOException, InterruptedException { + if (crushUnregistration) { + app.successfullyUnregistered.set(true); + } else { + throw new YarnException("test exception"); + } + } + + @Override + public void handle(ContainerAllocatorEvent event) { + allocator.handle(event); + } + + @Override + public long getLastHeartbeatTime() { + return allocator.getLastHeartbeatTime(); + } + + @Override + public void runOnNextHeartbeat(Runnable callback) { + allocator.runOnNextHeartbeat(callback); + } + + @Override + protected void heartbeat() throws Exception { + } + } + + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index 1987d70613..cc752f1f6e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -29,7 +29,6 @@ import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; @@ -44,7 +43,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; -import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; @@ -55,15 +53,12 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.util.Clock; import org.junit.Test; /** @@ -384,12 +379,13 @@ public void testJobSuccess() throws Exception { // AM is not unregistered Assert.assertEquals(JobState.RUNNING, job.getState()); // imitate that AM is unregistered - app.safeToReportTerminationToUser.set(true); + app.successfullyUnregistered.set(true); app.waitForState(job, JobState.SUCCEEDED); } @Test - public void testJobRebootNotLastRetry() throws Exception { + public void testJobRebootNotLastRetryOnUnregistrationFailure() + throws Exception { MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true); Job job = app.submit(new Configuration()); app.waitForState(job, JobState.RUNNING); @@ -408,10 +404,12 @@ public void testJobRebootNotLastRetry() throws Exception { } @Test - public void testJobRebootOnLastRetry() throws Exception { + public void testJobRebootOnLastRetryOnUnregistrationFailure() + throws Exception { // make startCount as 2 since this is last retry which equals to // DEFAULT_MAX_AM_RETRY - MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2); + // The last param mocks the unregistration failure + MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2, false); Configuration conf = new Configuration(); Job job = app.submit(conf); @@ -425,8 +423,10 @@ public void testJobRebootOnLastRetry() throws Exception { app.getContext().getEventHandler().handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); - // return exteranl state as ERROR if this is the last retry - app.waitForState(job, JobState.ERROR); + app.waitForInternalState((JobImpl) job, JobStateInternal.REBOOT); + // return exteranl state as RUNNING if this is the last retry while + // unregistration fails + app.waitForState(job, JobState.RUNNING); } private final class MRAppWithSpiedJob extends MRApp { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index 7f968ca70f..3d555f2b7f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -869,7 +869,7 @@ public boolean isLastAMRetry() { } @Override - public boolean safeToReportTerminationToUser() { + public boolean hasSuccessfullyUnregistered() { // bogus - Not Required return true; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index 496c1e3506..1c92b11b5b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -21,6 +21,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -36,18 +37,17 @@ import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; -import org.apache.hadoop.mapreduce.v2.app.client.MRClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -57,7 +57,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -75,7 +75,44 @@ public class TestStagingCleanup extends TestCase { private Path stagingJobPath = new Path(stagingJobDir); private final static RecordFactory recordFactory = RecordFactoryProvider. getRecordFactory(null); - + + @Test + public void testDeletionofStagingOnUnregistrationFailure() + throws IOException { + testDeletionofStagingOnUnregistrationFailure(2, false); + testDeletionofStagingOnUnregistrationFailure(1, true); + } + + @SuppressWarnings("resource") + private void testDeletionofStagingOnUnregistrationFailure( + int maxAttempts, boolean shouldHaveDeleted) throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); + //Staging Dir exists + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + Path stagingDir = MRApps.getStagingAreaDir(conf, user); + when(fs.exists(stagingDir)).thenReturn(true); + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + JobId jobid = recordFactory.newRecordInstance(JobId.class); + jobid.setAppId(appId); + TestMRApp appMaster = new TestMRApp(attemptId, null, + JobStateInternal.RUNNING, maxAttempts); + appMaster.crushUnregistration = true; + appMaster.init(conf); + appMaster.start(); + appMaster.shutDownJob(); + ((RunningAppContext) appMaster.getContext()).computeIsLastAMRetry(); + if (shouldHaveDeleted) { + Assert.assertEquals(new Boolean(true), appMaster.isLastAMRetry()); + verify(fs).delete(stagingJobPath, true); + } else { + Assert.assertEquals(new Boolean(false), appMaster.isLastAMRetry()); + verify(fs, never()).delete(stagingJobPath, true); + } + } + @Test public void testDeletionofStaging() throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); @@ -204,6 +241,7 @@ private class TestMRApp extends MRAppMaster { ContainerAllocator allocator; boolean testIsLastAMRetry = false; JobStateInternal jobStateInternal; + boolean crushUnregistration = false; public TestMRApp(ApplicationAttemptId applicationAttemptId, ContainerAllocator allocator, int maxAppAttempts) { @@ -211,6 +249,7 @@ public TestMRApp(ApplicationAttemptId applicationAttemptId, applicationAttemptId, 1), "testhost", 2222, 3333, System.currentTimeMillis(), maxAppAttempts); this.allocator = allocator; + this.successfullyUnregistered.set(true); } public TestMRApp(ApplicationAttemptId applicationAttemptId, @@ -229,7 +268,11 @@ protected FileSystem getFileSystem(Configuration conf) { protected ContainerAllocator createContainerAllocator( final ClientService clientService, final AppContext context) { if(allocator == null) { - return super.createContainerAllocator(clientService, context); + if (crushUnregistration) { + return new CustomContainerAllocator(context); + } else { + return super.createContainerAllocator(clientService, context); + } } return allocator; } @@ -280,6 +323,41 @@ protected void initJobCredentialsAndUGI(Configuration conf) { public boolean getTestIsLastAMRetry(){ return testIsLastAMRetry; } + + private class CustomContainerAllocator extends RMCommunicator + implements ContainerAllocator { + + public CustomContainerAllocator(AppContext context) { + super(null, context); + } + + @Override + public void serviceInit(Configuration conf) { + } + + @Override + public void serviceStart() { + } + + @Override + public void serviceStop() { + unregister(); + } + + @Override + protected void doUnregistration() + throws YarnException, IOException, InterruptedException { + throw new YarnException("test exception"); + } + + @Override + protected void heartbeat() throws Exception { + } + + @Override + public void handle(ContainerAllocatorEvent event) { + } + } } private final class MRAppTestCleanup extends MRApp { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 8fb7f1a7b9..714b753d13 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -275,7 +275,7 @@ public void testRebootedDuringCommit() throws Exception { AppContext mockContext = mock(AppContext.class); when(mockContext.isLastAMRetry()).thenReturn(true); - when(mockContext.safeToReportTerminationToUser()).thenReturn(false); + when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false); JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext); completeJobTasks(job); assertJobState(job, JobStateInternal.COMMITTING); @@ -285,7 +285,7 @@ public void testRebootedDuringCommit() throws Exception { assertJobState(job, JobStateInternal.REBOOT); // return the external state as ERROR since this is last retry. Assert.assertEquals(JobState.RUNNING, job.getState()); - when(mockContext.safeToReportTerminationToUser()).thenReturn(true); + when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true); Assert.assertEquals(JobState.ERROR, job.getState()); dispatcher.stop(); @@ -594,7 +594,7 @@ public void testReportDiagnostics() throws Exception { new JobDiagnosticsUpdateEvent(jobId, diagMsg); MRAppMetrics mrAppMetrics = MRAppMetrics.create(); AppContext mockContext = mock(AppContext.class); - when(mockContext.safeToReportTerminationToUser()).thenReturn(true); + when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true); JobImpl job = new JobImpl(jobId, Records .newRecord(ApplicationAttemptId.class), new Configuration(), mock(EventHandler.class), @@ -705,7 +705,7 @@ public void testTransitionsAtFailed() throws IOException { commitHandler.start(); AppContext mockContext = mock(AppContext.class); - when(mockContext.safeToReportTerminationToUser()).thenReturn(false); + when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false); JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext); JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); @@ -722,7 +722,7 @@ public void testTransitionsAtFailed() throws IOException { job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)); assertJobState(job, JobStateInternal.FAILED); Assert.assertEquals(JobState.RUNNING, job.getState()); - when(mockContext.safeToReportTerminationToUser()).thenReturn(true); + when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true); Assert.assertEquals(JobState.FAILED, job.getState()); dispatcher.stop(); @@ -762,7 +762,7 @@ private static StubbedJob createStubbedJob(Configuration conf, JobId jobId = TypeConverter.toYarn(jobID); if (appContext == null) { appContext = mock(AppContext.class); - when(appContext.safeToReportTerminationToUser()).thenReturn(true); + when(appContext.hasSuccessfullyUnregistered()).thenReturn(true); } StubbedJob job = new StubbedJob(jobId, ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0), diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java index cdbecd2304..90dbe489f4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java @@ -88,6 +88,10 @@ public StubbedLocalContainerAllocator() { protected void register() { } + @Override + protected void unregister() { + } + @Override protected void startAllocatorThread() { allocatorThread = new Thread(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index 7de35ff319..b7823a0c50 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -389,7 +389,7 @@ public boolean isLastAMRetry() { } @Override - public boolean safeToReportTerminationToUser() { + public boolean hasSuccessfullyUnregistered() { // bogus - Not Required return true; }