MAPREDUCE-2952. Fixed ResourceManager/MR-client to consume diagnostics for AM failures in a couple of corner cases. Contributed by Arun C Murthy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1175403 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-09-25 14:46:59 +00:00
parent c285cf3114
commit a5c9ede143
29 changed files with 798 additions and 118 deletions

View File

@ -1415,6 +1415,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3053. Better diagnostic message for unknown methods in ProtoBuf
RPCs. (vinodkv via acmurthy)
MAPREDUCE-2952. Fixed ResourceManager/MR-client to consume diagnostics
for AM failures in a couple of corner cases. (Arun C Murthy via vinodkv)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -281,14 +281,17 @@ public static Counters toYarn(org.apache.hadoop.mapreduce.Counters counters) {
}
public static org.apache.hadoop.mapred.JobStatus fromYarn(
JobReport jobreport, String jobFile, String trackingUrl) {
JobReport jobreport, String jobFile) {
JobPriority jobPriority = JobPriority.NORMAL;
return new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.getJobId()),
jobreport.getSetupProgress(), jobreport.getMapProgress(),
jobreport.getReduceProgress(), jobreport.getCleanupProgress(),
fromYarn(jobreport.getJobState()),
jobPriority, jobreport.getUser(), jobreport.getJobName(),
jobFile, trackingUrl);
org.apache.hadoop.mapred.JobStatus jobStatus =
new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.getJobId()),
jobreport.getSetupProgress(), jobreport.getMapProgress(),
jobreport.getReduceProgress(), jobreport.getCleanupProgress(),
fromYarn(jobreport.getJobState()),
jobPriority, jobreport.getUser(), jobreport.getJobName(),
jobFile, jobreport.getTrackingUrl());
jobStatus.setFailureInfo(jobreport.getDiagnostics());
return jobStatus;
}
public static org.apache.hadoop.mapreduce.QueueState fromYarn(
@ -422,6 +425,7 @@ public static JobStatus fromYarn(ApplicationReport application,
);
jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url
jobStatus.setStartTime(application.getStartTime());
jobStatus.setFailureInfo(application.getDiagnostics());
return jobStatus;
}

View File

@ -29,6 +29,8 @@ public interface JobReport {
public abstract long getFinishTime();
public abstract String getUser();
public abstract String getJobName();
public abstract String getTrackingUrl();
public abstract String getDiagnostics();
public abstract void setJobId(JobId jobId);
public abstract void setJobState(JobState jobState);
@ -40,4 +42,6 @@ public interface JobReport {
public abstract void setFinishTime(long finishTime);
public abstract void setUser(String user);
public abstract void setJobName(String jobName);
public abstract void setTrackingUrl(String trackingUrl);
public abstract void setDiagnostics(String diagnostics);
}

View File

@ -206,6 +206,30 @@ public void setJobName(String jobName) {
builder.setJobName((jobName));
}
@Override
public String getTrackingUrl() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getTrackingUrl());
}
@Override
public void setTrackingUrl(String trackingUrl) {
maybeInitBuilder();
builder.setTrackingUrl(trackingUrl);
}
@Override
public String getDiagnostics() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return p.getDiagnostics();
}
@Override
public void setDiagnostics(String diagnostics) {
maybeInitBuilder();
builder.setDiagnostics(diagnostics);
}
private JobIdPBImpl convertFromProtoFormat(JobIdProto p) {
return new JobIdPBImpl(p);
}

View File

@ -143,6 +143,8 @@ message JobReportProto {
optional int64 finish_time = 8;
optional string user = 9;
optional string jobName = 10;
optional string trackingUrl = 11;
optional string diagnostics = 12;
}
enum TaskAttemptCompletionEventStatusProto {

View File

@ -321,6 +321,10 @@ protected synchronized void setJobACLs(Map<JobACL, AccessControlList> acls) {
super.setJobACLs(acls);
}
public synchronized void setFailureInfo(String failureInfo) {
super.setFailureInfo(failureInfo);
}
/**
* Set the priority of the job, defaulting to NORMAL.
* @param jp new job priority

View File

@ -1239,7 +1239,8 @@ public boolean monitorAndPrintJob()
if (success) {
LOG.info("Job " + jobId + " completed successfully");
} else {
LOG.info("Job " + jobId + " failed with state " + status.getState());
LOG.info("Job " + jobId + " failed with state " + status.getState() +
" due to: " + status.getFailureInfo());
}
Counters counters = getCounters();
if (counters != null) {

View File

@ -81,6 +81,7 @@ public int getValue() {
private String queue;
private JobPriority priority;
private String schedulingInfo="NA";
private String failureInfo = "NA";
private Map<JobACL, AccessControlList> jobACLs =
new HashMap<JobACL, AccessControlList>();
@ -278,6 +279,14 @@ protected synchronized void setQueue(String queue) {
this.queue = queue;
}
/**
* Set diagnostic information.
* @param failureInfo diagnostic information
*/
protected synchronized void setFailureInfo(String failureInfo) {
this.failureInfo = failureInfo;
}
/**
* Get queue name
* @return queue name
@ -359,6 +368,15 @@ public synchronized Map<JobACL, AccessControlList> getJobACLs() {
*/
public synchronized JobPriority getPriority() { return priority; }
/**
* Gets any available info on the reason of failure of the job.
* @return diagnostic information on why a job might have failed.
*/
public synchronized String getFailureInfo() {
return this.failureInfo;
}
/**
* Returns true if the status is for a completed job.
*/

View File

@ -101,16 +101,20 @@ class ClientServiceDelegate {
// Get the instance of the NotRunningJob corresponding to the specified
// user and state
private NotRunningJob getNotRunningJob(String user, JobState state) {
private NotRunningJob getNotRunningJob(ApplicationReport applicationReport,
JobState state) {
synchronized (notRunningJobs) {
HashMap<String, NotRunningJob> map = notRunningJobs.get(state);
if (map == null) {
map = new HashMap<String, NotRunningJob>();
notRunningJobs.put(state, map);
}
String user =
(applicationReport == null) ?
UNKNOWN_USER : applicationReport.getUser();
NotRunningJob notRunningJob = map.get(user);
if (notRunningJob == null) {
notRunningJob = new NotRunningJob(user, state);
notRunningJob = new NotRunningJob(applicationReport, state);
map.put(user, notRunningJob);
}
return notRunningJob;
@ -130,7 +134,7 @@ private MRClientProtocol getProxy() throws YarnRemoteException {
if (application == null) {
LOG.info("Could not get Job info from RM for job " + jobId
+ ". Redirecting to job history server.");
return checkAndGetHSProxy(UNKNOWN_USER, JobState.NEW);
return checkAndGetHSProxy(null, JobState.NEW);
}
try {
if (application.getHost() == null || "".equals(application.getHost())) {
@ -171,7 +175,7 @@ private MRClientProtocol getProxy() throws YarnRemoteException {
if (application == null) {
LOG.info("Could not get Job info from RM for job " + jobId
+ ". Redirecting to job history server.");
return checkAndGetHSProxy(UNKNOWN_USER, JobState.RUNNING);
return checkAndGetHSProxy(null, JobState.RUNNING);
}
} catch (InterruptedException e) {
LOG.warn("getProxy() call interruped", e);
@ -191,17 +195,17 @@ private MRClientProtocol getProxy() throws YarnRemoteException {
if (application.getState() == ApplicationState.NEW ||
application.getState() == ApplicationState.SUBMITTED) {
realProxy = null;
return getNotRunningJob(user, JobState.NEW);
return getNotRunningJob(application, JobState.NEW);
}
if (application.getState() == ApplicationState.FAILED) {
realProxy = null;
return getNotRunningJob(user, JobState.FAILED);
return getNotRunningJob(application, JobState.FAILED);
}
if (application.getState() == ApplicationState.KILLED) {
realProxy = null;
return getNotRunningJob(user, JobState.KILLED);
return getNotRunningJob(application, JobState.KILLED);
}
//History server can serve a job only if application
@ -209,15 +213,16 @@ private MRClientProtocol getProxy() throws YarnRemoteException {
if (application.getState() == ApplicationState.SUCCEEDED) {
LOG.info("Application state is completed. " +
"Redirecting to job history server");
realProxy = checkAndGetHSProxy(user, JobState.SUCCEEDED);
realProxy = checkAndGetHSProxy(application, JobState.SUCCEEDED);
}
return realProxy;
}
private MRClientProtocol checkAndGetHSProxy(String user, JobState state) {
private MRClientProtocol checkAndGetHSProxy(
ApplicationReport applicationReport, JobState state) {
if (null == historyServerProxy) {
LOG.warn("Job History Server is not configured.");
return getNotRunningJob(user, state);
return getNotRunningJob(applicationReport, state);
}
return historyServerProxy;
}
@ -324,21 +329,22 @@ String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class);
GetJobReportRequest request =
recordFactory.newRecordInstance(GetJobReportRequest.class);
request.setJobId(jobId);
JobReport report = ((GetJobReportResponse) invoke("getJobReport",
GetJobReportRequest.class, request)).getJobReport();
String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
//TODO: add tracking url in JobReport
return TypeConverter.fromYarn(report, jobFile, "");
return TypeConverter.fromYarn(report, jobFile);
}
org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
throws YarnRemoteException, YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetTaskReportsRequest request = recordFactory.newRecordInstance(GetTaskReportsRequest.class);
GetTaskReportsRequest request =
recordFactory.newRecordInstance(GetTaskReportsRequest.class);
request.setJobId(jobId);
request.setTaskType(TypeConverter.toYarn(taskType));

View File

@ -22,6 +22,8 @@
import java.util.HashMap;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
@ -53,20 +55,41 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class NotRunningJob implements MRClientProtocol {
private static final Log LOG = LogFactory.getLog(NotRunningJob.class);
private RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private final JobState jobState;
private final String user;
NotRunningJob(String username, JobState jobState) {
this.user = username;
private final ApplicationReport applicationReport;
private ApplicationReport getUnknownApplicationReport() {
ApplicationReport unknown =
recordFactory.newRecordInstance(ApplicationReport.class);
unknown.setUser("N/A");
unknown.setHost("N/A");
unknown.setName("N/A");
unknown.setQueue("N/A");
unknown.setStartTime(0);
unknown.setFinishTime(0);
unknown.setTrackingUrl("N/A");
unknown.setDiagnostics("N/A");
LOG.info("getUnknownApplicationReport");
return unknown;
}
NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
this.applicationReport =
(applicationReport == null) ?
getUnknownApplicationReport() : applicationReport;
this.jobState = jobState;
}
@ -101,15 +124,19 @@ public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws YarnRemoteException {
GetJobReportResponse resp =
recordFactory.newRecordInstance(GetJobReportResponse.class);
JobReport jobReport =
recordFactory.newRecordInstance(JobReport.class);
jobReport.setJobId(request.getJobId());
jobReport.setJobState(this.jobState);
jobReport.setJobState(jobState);
jobReport.setUser(applicationReport.getUser());
jobReport.setStartTime(applicationReport.getStartTime());
jobReport.setDiagnostics(applicationReport.getDiagnostics());
jobReport.setJobName(applicationReport.getName());
jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
jobReport.setFinishTime(applicationReport.getFinishTime());
jobReport.setUser(this.user);
// TODO: Add jobName & other job information that is available
GetJobReportResponse resp =
recordFactory.newRecordInstance(GetJobReportResponse.class);
resp.setJobReport(jobReport);
return resp;
}

View File

@ -267,6 +267,13 @@ public GetApplicationReportResponse getApplicationReport(
application.setHost(split[0]);
application.setRpcPort(Integer.parseInt(split[1]));
application.setUser("TestClientRedirect-user");
application.setName("N/A");
application.setQueue("N/A");
application.setStartTime(0);
application.setFinishTime(0);
application.setTrackingUrl("N/A");
application.setDiagnostics("N/A");
GetApplicationReportResponse response = recordFactory
.newRecordInstance(GetApplicationReportResponse.class);
response.setApplicationReport(application);

View File

@ -109,7 +109,7 @@ public void testHistoryServerNotConfigured() throws Exception {
ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
null, getRMDelegate());
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertEquals("Unknown User", jobStatus.getUsername());
Assert.assertEquals("N/A", jobStatus.getUsername());
Assert.assertEquals(JobStatus.State.PREP, jobStatus.getState());
//RM has app report and job History Server is not configured
@ -145,6 +145,13 @@ private ApplicationReport getApplicationReport() {
.newRecord(ApplicationReport.class);
applicationReport.setState(ApplicationState.SUCCEEDED);
applicationReport.setUser("root");
applicationReport.setHost("N/A");
applicationReport.setName("N/A");
applicationReport.setQueue("N/A");
applicationReport.setStartTime(0);
applicationReport.setFinishTime(0);
applicationReport.setTrackingUrl("N/A");
applicationReport.setDiagnostics("N/A");
return applicationReport;
}

View File

@ -49,6 +49,10 @@
<Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<Match>
<Class name="~org\.apache\.hadoop\.yarn\.server\.resourcemanager\.rmapp\.RMAppImpl.*" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<Match>
<Class name="~org\.apache\.hadoop\.yarn\.server\.resourcemanager\.rmapp\.attempt\.RMAppAttemptImpl.*" />
<Bug pattern="BC_UNCONFIRMED_CAST" />

View File

@ -186,4 +186,16 @@ public interface ApplicationReport {
@Private
@Unstable
void setStartTime(long startTime);
/**
* Get the <em>finish time</em> of the application.
* @return <em>finish time</em> of the application
*/
@Public
@Stable
long getFinishTime();
@Private
@Unstable
void setFinishTime(long finishTime);
}

View File

@ -240,6 +240,30 @@ public ApplicationReportProto getProto() {
return proto;
}
@Override
public long getStartTime() {
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
return p.getStartTime();
}
@Override
public void setStartTime(long startTime) {
maybeInitBuilder();
builder.setStartTime(startTime);
}
@Override
public long getFinishTime() {
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
return p.getFinishTime();
}
@Override
public void setFinishTime(long finishTime) {
maybeInitBuilder();
builder.setFinishTime(finishTime);
}
private void mergeLocalToBuilder() {
if (this.applicationId != null
&& !((ApplicationIdPBImpl) this.applicationId).getProto().equals(
@ -279,16 +303,4 @@ private ApplicationIdPBImpl convertFromProtoFormat(
ApplicationIdProto applicationId) {
return new ApplicationIdPBImpl(applicationId);
}
@Override
public long getStartTime() {
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
return p.getStartTime();
}
@Override
public void setStartTime(long startTime) {
maybeInitBuilder();
builder.setStartTime(startTime);
}
}

View File

@ -140,6 +140,7 @@ message ApplicationReportProto {
optional string trackingUrl = 11;
optional string diagnostics = 12 [default = "N/A"];
optional int64 startTime = 13;
optional int64 finishTime = 14;
}
message NodeIdProto {

View File

@ -242,7 +242,7 @@ public static ResourceRequest newResourceRequest(ResourceRequest r) {
public static ApplicationReport newApplicationReport(
ApplicationId applicationId, String user, String queue, String name,
String host, int rpcPort, String clientToken, ApplicationState state,
String diagnostics, String url, long startTime) {
String diagnostics, String url, long startTime, long finishTime) {
ApplicationReport report = recordFactory
.newRecordInstance(ApplicationReport.class);
report.setApplicationId(applicationId);
@ -256,6 +256,7 @@ public static ApplicationReport newApplicationReport(
report.setDiagnostics(diagnostics);
report.setTrackingUrl(url);
report.setStartTime(startTime);
report.setFinishTime(finishTime);
return report;
}

View File

@ -167,6 +167,16 @@ public void setStartTime(long startTime) {
// TODO Auto-generated method stub
}
@Override
public long getFinishTime() {
// TODO Auto-generated method stub
return 0;
}
@Override
public void setFinishTime(long finishTime) {
// TODO Auto-generated method stub
}
};
}

View File

@ -22,7 +22,6 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
@ -31,7 +30,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class RMContextImpl implements RMContext {

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
@ -186,6 +187,9 @@ public synchronized void init(Configuration conf) {
addService(adminService);
this.applicationMasterLauncher = createAMLauncher();
this.rmDispatcher.register(AMLauncherEventType.class,
this.applicationMasterLauncher);
addService(applicationMasterLauncher);
super.init(conf);

View File

@ -46,13 +46,12 @@ public class ApplicationMasterLauncher extends AbstractService implements
private ClientToAMSecretManager clientToAMSecretManager;
protected final RMContext context;
public ApplicationMasterLauncher(ApplicationTokenSecretManager
applicationTokenSecretManager, ClientToAMSecretManager clientToAMSecretManager,
public ApplicationMasterLauncher(
ApplicationTokenSecretManager applicationTokenSecretManager,
ClientToAMSecretManager clientToAMSecretManager,
RMContext context) {
super(ApplicationMasterLauncher.class.getName());
this.context = context;
/* register to dispatcher */
this.context.getDispatcher().register(AMLauncherEventType.class, this);
this.launcherPool = new ThreadPoolExecutor(1, 10, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
this.launcherHandlingThread = new LauncherThread();

View File

@ -24,7 +24,6 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationId;
public class RMAppFailedAttemptEvent extends RMAppEvent {
private final String diagnostics;
public RMAppFailedAttemptEvent(ApplicationId appId, RMAppEventType event,
String diagnostics) {
super(appId, event);
this.diagnostics = diagnostics;
}
public String getDiagnostics() {
return this.diagnostics;
}
}

View File

@ -310,7 +310,8 @@ public ApplicationReport createAndGetApplicationReport() {
return BuilderUtils.newApplicationReport(this.applicationId, this.user,
this.queue, this.name, host, rpcPort, clientToken,
createApplicationState(this.stateMachine.getCurrentState()),
this.diagnostics.toString(), trackingUrl, this.startTime);
this.diagnostics.toString(), trackingUrl,
this.startTime, this.finishTime);
} finally {
this.readLock.unlock();
}
@ -470,11 +471,13 @@ public AttemptFailedTransition(RMAppState initialState) {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent)event);
if (app.attempts.size() == app.maxRetries) {
String msg = "Application " + app.getApplicationId()
+ " failed " + app.maxRetries
+ " times. Failing the application.";
+ " times due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
LOG.info(msg);
app.diagnostics.append(msg);
// Inform the node for app-finish

View File

@ -79,7 +79,7 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent>{
* Diagnostics information for the application attempt.
* @return diagnostics information for the application attempt.
*/
StringBuilder getDiagnostics();
String getDiagnostics();
/**
* Progress for the application attempt.

View File

@ -31,6 +31,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -47,6 +48,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@ -104,10 +106,10 @@ public class RMAppAttemptImpl implements RMAppAttempt {
private Container masterContainer;
private float progress = 0;
private String host;
private String host = "N/A";
private int rpcPort;
private String trackingUrl;
private String finalState;
private String trackingUrl = "N/A";
private String finalState = "N/A";
private final StringBuilder diagnostics = new StringBuilder();
private static final StateMachineFactory<RMAppAttemptImpl,
@ -123,7 +125,8 @@ public class RMAppAttemptImpl implements RMAppAttempt {
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
RMAppAttemptEventType.START, new AttemptStartedTransition())
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.KILLED,
RMAppAttemptEventType.KILL)
RMAppAttemptEventType.KILL,
new BaseFinalTransition(RMAppAttemptState.KILLED))
// Transitions from SUBMITTED state
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
@ -323,16 +326,26 @@ public String getClientToken() {
}
@Override
public StringBuilder getDiagnostics() {
public String getDiagnostics() {
this.readLock.lock();
try {
return this.diagnostics;
return this.diagnostics.toString();
} finally {
this.readLock.unlock();
}
}
public void setDiagnostics(String message) {
this.writeLock.lock();
try {
this.diagnostics.append(message);
} finally {
this.writeLock.unlock();
}
}
@Override
public float getProgress() {
this.readLock.lock();
@ -446,10 +459,17 @@ public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
RMAppAttemptRejectedEvent rejectedEvent = (RMAppAttemptRejectedEvent) event;
// Save the diagnostic message
String message = rejectedEvent.getMessage();
appAttempt.setDiagnostics(message);
// Send the rejection event to app
appAttempt.eventHandler.handle(new RMAppRejectedEvent(rejectedEvent
.getApplicationAttemptId().getApplicationId(), rejectedEvent
.getMessage()));
appAttempt.eventHandler.handle(
new RMAppRejectedEvent(
rejectedEvent.getApplicationAttemptId().getApplicationId(),
message)
);
}
}
@ -472,8 +492,6 @@ public void transition(RMAppAttemptImpl appAttempt,
ResourceRequest request = BuilderUtils.newResourceRequest(
AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
.getAMContainerSpec().getResource(), 1);
LOG.debug("About to request resources for AM of "
+ appAttempt.applicationAttemptId + " required " + request);
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST);
@ -517,23 +535,39 @@ public void transition(RMAppAttemptImpl appAttempt,
.unregisterAttempt(appAttempt.applicationAttemptId);
// Tell the application and the scheduler
RMAppEventType eventToApp = null;
ApplicationId applicationId = appAttempt.getAppAttemptId().getApplicationId();
RMAppEvent appEvent = null;
switch (finalAttemptState) {
case FINISHED:
eventToApp = RMAppEventType.ATTEMPT_FINISHED;
case FINISHED:
{
appEvent =
new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHED);
}
break;
case KILLED:
eventToApp = RMAppEventType.ATTEMPT_KILLED;
case KILLED:
{
appEvent =
new RMAppFailedAttemptEvent(applicationId,
RMAppEventType.ATTEMPT_KILLED,
"Application killed by user.");
}
break;
case FAILED:
eventToApp = RMAppEventType.ATTEMPT_FAILED;
case FAILED:
{
appEvent =
new RMAppFailedAttemptEvent(applicationId,
RMAppEventType.ATTEMPT_FAILED,
appAttempt.getDiagnostics());
}
break;
default:
LOG.info("Cannot get this state!! Error!!");
default:
{
LOG.error("Cannot get this state!! Error!!");
}
break;
}
appAttempt.eventHandler.handle(new RMAppEvent(
appAttempt.applicationAttemptId.getApplicationId(), eventToApp));
appAttempt.eventHandler.handle(appEvent);
appAttempt.eventHandler.handle(new AppRemovedSchedulerEvent(appAttempt
.getAppAttemptId(), finalAttemptState));
}
@ -621,16 +655,23 @@ public AMContainerCrashedTransition() {
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
RMAppAttemptContainerFinishedEvent finishEvent =
((RMAppAttemptContainerFinishedEvent)event);
// UnRegister from AMLivelinessMonitor
appAttempt.rmContext.getAMLivelinessMonitor().unregister(
appAttempt.getAppAttemptId());
// Tell the app, scheduler
super.transition(appAttempt, event);
// Setup diagnostic message
ContainerStatus status = finishEvent.getContainerStatus();
appAttempt.diagnostics.append("AM Container for " +
appAttempt.getAppAttemptId() + " exited with " +
" exitCode: " + status.getExitStatus() +
" due to: " + status.getDiagnostics() + "." +
"Failing this attempt.");
// Use diagnostic saying crashed.
appAttempt.diagnostics.append("AM Container for "
+ appAttempt.getAppAttemptId() + " exited. Failing this attempt.");
// Tell the app, scheduler
super.transition(appAttempt, finishEvent);
}
}
@ -725,6 +766,13 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
// the AMContainer, AppAttempt fails
if (appAttempt.masterContainer.getId().equals(
containerStatus.getContainerId())) {
// Setup diagnostic message
appAttempt.diagnostics.append("AM Container for " +
appAttempt.getAppAttemptId() + " exited with " +
" exitCode: " + containerStatus.getExitStatus() +
" due to: " + containerStatus.getDiagnostics() + "." +
"Failing this attempt.");
new FinalTransition(RMAppAttemptState.FAILED).transition(
appAttempt, containerFinishedEvent);
return RMAppAttemptState.FAILED;

View File

@ -18,11 +18,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
class InlineDispatcher extends AsyncDispatcher {
@Private
public class InlineDispatcher extends AsyncDispatcher {
private class InlineEventHandler implements EventHandler {
private final InlineDispatcher dispatcher;
public InlineEventHandler(InlineDispatcher dispatcher) {

View File

@ -32,7 +32,6 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
@ -46,7 +45,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
@ -95,10 +96,10 @@ public void handle(RMAppEvent event) {
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
Dispatcher rmDispatcher = new AsyncDispatcher();
AsyncDispatcher rmDispatcher = new AsyncDispatcher();
ContainerAllocationExpirer containerAllocationExpirer = mock(ContainerAllocationExpirer.class);
ContainerAllocationExpirer containerAllocationExpirer =
mock(ContainerAllocationExpirer.class);
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
this.rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor);
@ -122,8 +123,9 @@ protected RMApp createNewTestApp() {
String clientTokenStr = "bogusstring";
ApplicationStore appStore = mock(ApplicationStore.class);
YarnScheduler scheduler = mock(YarnScheduler.class);
ApplicationMasterService masterService = new ApplicationMasterService(rmContext,
new ApplicationTokenSecretManager(), scheduler);
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext,
new ApplicationTokenSecretManager(), scheduler);
RMApp application = new RMAppImpl(applicationId, rmContext,
conf, name, user,
@ -136,8 +138,8 @@ protected RMApp createNewTestApp() {
}
// Test expected newly created app state
private static void testAppStartState(ApplicationId applicationId, String user,
String name, String queue, RMApp application) {
private static void testAppStartState(ApplicationId applicationId,
String user, String name, String queue, RMApp application) {
Assert.assertTrue("application start time is not greater then 0",
application.getStartTime() > 0);
Assert.assertTrue("application start time is before currentTime",
@ -202,7 +204,8 @@ private static void assertFailed(RMApp application, String regex) {
protected RMApp testCreateAppSubmitted() throws IOException {
RMApp application = createNewTestApp();
// NEW => SUBMITTED event RMAppEventType.START
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.START);
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.START);
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.SUBMITTED, application);
@ -212,7 +215,9 @@ protected RMApp testCreateAppSubmitted() throws IOException {
protected RMApp testCreateAppAccepted() throws IOException {
RMApp application = testCreateAppSubmitted();
// SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_ACCEPTED);
RMAppEvent event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.APP_ACCEPTED);
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.ACCEPTED, application);
@ -222,7 +227,9 @@ protected RMApp testCreateAppAccepted() throws IOException {
protected RMApp testCreateAppRunning() throws IOException {
RMApp application = testCreateAppAccepted();
// ACCEPTED => RUNNING event RMAppEventType.ATTEMPT_REGISTERED
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_REGISTERED);
RMAppEvent event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_REGISTERED);
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.RUNNING, application);
@ -232,7 +239,9 @@ protected RMApp testCreateAppRunning() throws IOException {
protected RMApp testCreateAppFinished() throws IOException {
RMApp application = testCreateAppRunning();
// RUNNING => FINISHED event RMAppEventType.ATTEMPT_FINISHED
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FINISHED);
RMAppEvent event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FINISHED);
application.handle(event);
assertAppState(RMAppState.FINISHED, application);
assertTimesAtFinish(application);
@ -251,7 +260,8 @@ public void testAppNewKill() throws IOException {
RMApp application = createNewTestApp();
// NEW => KILLED event RMAppEventType.KILL
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
assertKilled(application);
}
@ -263,7 +273,8 @@ public void testAppNewReject() throws IOException {
RMApp application = createNewTestApp();
// NEW => FAILED event RMAppEventType.APP_REJECTED
String rejectedText = "Test Application Rejected";
RMAppEvent event = new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
RMAppEvent event =
new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
application.handle(event);
assertFailed(application, rejectedText);
}
@ -275,7 +286,8 @@ public void testAppSubmittedRejected() throws IOException {
RMApp application = testCreateAppSubmitted();
// SUBMITTED => FAILED event RMAppEventType.APP_REJECTED
String rejectedText = "app rejected";
RMAppEvent event = new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
RMAppEvent event =
new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
application.handle(event);
assertFailed(application, rejectedText);
}
@ -286,7 +298,8 @@ public void testAppSubmittedKill() throws IOException {
RMApp application = testCreateAppAccepted();
// SUBMITTED => KILLED event RMAppEventType.KILL
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
assertKilled(application);
}
@ -298,18 +311,26 @@ public void testAppAcceptedFailed() throws IOException {
RMApp application = testCreateAppAccepted();
// ACCEPTED => ACCEPTED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED
for (int i=1; i<maxRetries; i++) {
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED);
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
assertAppState(RMAppState.SUBMITTED, application);
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_ACCEPTED);
event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.APP_ACCEPTED);
application.handle(event);
assertAppState(RMAppState.ACCEPTED, application);
}
// ACCEPTED => FAILED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED after max retries
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED);
// ACCEPTED => FAILED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED
// after max retries
String message = "Test fail";
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, message);
application.handle(event);
assertFailed(application, ".*Failing the application.*");
assertFailed(application, ".*" + message + ".*Failing the application.*");
}
@Test
@ -318,7 +339,8 @@ public void testAppAcceptedKill() throws IOException {
RMApp application = testCreateAppAccepted();
// ACCEPTED => KILLED event RMAppEventType.KILL
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
assertKilled(application);
}
@ -329,7 +351,8 @@ public void testAppRunningKill() throws IOException {
RMApp application = testCreateAppRunning();
// RUNNING => KILLED event RMAppEventType.KILL
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
assertKilled(application);
}
@ -341,25 +364,35 @@ public void testAppRunningFailed() throws IOException {
RMApp application = testCreateAppRunning();
RMAppAttempt appAttempt = application.getCurrentAppAttempt();
int expectedAttemptId = 1;
Assert.assertEquals(expectedAttemptId, appAttempt.getAppAttemptId().getAttemptId());
Assert.assertEquals(expectedAttemptId,
appAttempt.getAppAttemptId().getAttemptId());
// RUNNING => FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED
for (int i=1; i<maxRetries; i++) {
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED);
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
assertAppState(RMAppState.SUBMITTED, application);
appAttempt = application.getCurrentAppAttempt();
Assert.assertEquals(++expectedAttemptId,
appAttempt.getAppAttemptId().getAttemptId());
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_ACCEPTED);
event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.APP_ACCEPTED);
application.handle(event);
assertAppState(RMAppState.ACCEPTED, application);
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_REGISTERED);
event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_REGISTERED);
application.handle(event);
assertAppState(RMAppState.RUNNING, application);
}
// RUNNING => FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED after max retries
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED);
// RUNNING => FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED
// after max retries
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
assertFailed(application, ".*Failing the application.*");
@ -376,7 +409,8 @@ public void testAppFinishedFinished() throws IOException {
RMApp application = testCreateAppFinished();
// FINISHED => FINISHED event RMAppEventType.KILL
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
assertTimesAtFinish(application);
assertAppState(RMAppState.FINISHED, application);
@ -392,25 +426,32 @@ public void testAppKilledKilled() throws IOException {
RMApp application = testCreateAppRunning();
// RUNNING => KILLED event RMAppEventType.KILL
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
// KILLED => KILLED event RMAppEventType.ATTEMPT_FINISHED
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FINISHED);
event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FINISHED);
application.handle(event);
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
// KILLED => KILLED event RMAppEventType.ATTEMPT_FAILED
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED);
event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
// KILLED => KILLED event RMAppEventType.ATTEMPT_KILLED
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_KILLED);
event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_KILLED);
application.handle(event);
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);

View File

@ -0,0 +1,403 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestRMAppAttemptTransitions {
private static final Log LOG =
LogFactory.getLog(TestRMAppAttemptTransitions.class);
private static final String EMPTY_DIAGNOSTICS = "";
private RMContext rmContext;
private YarnScheduler scheduler;
private ApplicationMasterService masterService;
private ApplicationMasterLauncher applicationMasterLauncher;
private RMApp application;
private RMAppAttempt applicationAttempt;
private final class TestApplicationAttemptEventDispatcher implements
EventHandler<RMAppAttemptEvent> {
@Override
public void handle(RMAppAttemptEvent event) {
ApplicationAttemptId appID = event.getApplicationAttemptId();
assertEquals(applicationAttempt.getAppAttemptId(), appID);
try {
applicationAttempt.handle(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " for application " + appID, t);
}
}
}
// handle all the RM application events - same as in ResourceManager.java
private final class TestApplicationEventDispatcher implements
EventHandler<RMAppEvent> {
@Override
public void handle(RMAppEvent event) {
assertEquals(application.getApplicationId(), event.getApplicationId());
try {
application.handle(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " for application " + application.getApplicationId(), t);
}
}
}
private final class TestSchedulerEventDispatcher implements
EventHandler<SchedulerEvent> {
@Override
public void handle(SchedulerEvent event) {
scheduler.handle(event);
}
}
private final class TestAMLauncherEventDispatcher implements
EventHandler<AMLauncherEvent> {
@Override
public void handle(AMLauncherEvent event) {
applicationMasterLauncher.handle(event);
}
}
private static int appId = 1;
@Before
public void setUp() throws Exception {
InlineDispatcher rmDispatcher = new InlineDispatcher();
ContainerAllocationExpirer containerAllocationExpirer =
mock(ContainerAllocationExpirer.class);
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor);
scheduler = mock(YarnScheduler.class);
masterService = mock(ApplicationMasterService.class);
applicationMasterLauncher = mock(ApplicationMasterLauncher.class);
rmDispatcher.register(RMAppAttemptEventType.class,
new TestApplicationAttemptEventDispatcher());
rmDispatcher.register(RMAppEventType.class,
new TestApplicationEventDispatcher());
rmDispatcher.register(SchedulerEventType.class,
new TestSchedulerEventDispatcher());
rmDispatcher.register(AMLauncherEventType.class,
new TestAMLauncherEventDispatcher());
rmDispatcher.init(new Configuration());
rmDispatcher.start();
ApplicationId applicationId = MockApps.newAppID(appId++);
ApplicationAttemptId applicationAttemptId =
MockApps.newAppAttemptID(applicationId, 0);
final String user = MockApps.newUserName();
final String queue = MockApps.newQueue();
ApplicationSubmissionContext submissionContext =
mock(ApplicationSubmissionContext.class);
when(submissionContext.getUser()).thenReturn(user);
when(submissionContext.getQueue()).thenReturn(queue);
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
Resource resource = mock(Resource.class);
when(amContainerSpec.getResource()).thenReturn(resource);
when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
application = mock(RMApp.class);
applicationAttempt =
new RMAppAttemptImpl(applicationAttemptId, null, rmContext, scheduler,
masterService, submissionContext);
when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
when(application.getApplicationId()).thenReturn(applicationId);
testAppAttemptNewState();
}
@After
public void tearDown() throws Exception {
((AsyncDispatcher)this.rmContext.getDispatcher()).stop();
}
/**
* {@link RMAppAttemptState#NEW}
*/
private void testAppAttemptNewState() {
assertEquals(RMAppAttemptState.NEW,
applicationAttempt.getAppAttemptState());
assertEquals(0, applicationAttempt.getDiagnostics().length());
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertNull(applicationAttempt.getMasterContainer());
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size());
}
/**
* {@link RMAppAttemptState#SUBMITTED}
*/
private void testAppAttemptSubmittedState() {
assertEquals(RMAppAttemptState.SUBMITTED,
applicationAttempt.getAppAttemptState());
assertEquals(0, applicationAttempt.getDiagnostics().length());
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertNull(applicationAttempt.getMasterContainer());
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size());
// Check events
verify(masterService).
registerAppAttempt(applicationAttempt.getAppAttemptId());
verify(scheduler).handle(any(AppAddedSchedulerEvent.class));
}
/**
* {@link RMAppAttemptState#SUBMITTED} -> {@link RMAppAttemptState#FAILED}
*/
private void testAppAttemptSubmittedToFailedState(String diagnostics) {
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
assertEquals(diagnostics, applicationAttempt.getDiagnostics());
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertNull(applicationAttempt.getMasterContainer());
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size());
// Check events
verify(application).handle(any(RMAppRejectedEvent.class));
}
/**
* {@link RMAppAttemptState#KILLED}
*/
private void testAppAttemptKilledState(Container amContainer,
String diagnostics) {
assertEquals(RMAppAttemptState.KILLED,
applicationAttempt.getAppAttemptState());
assertEquals(diagnostics, applicationAttempt.getDiagnostics());
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertEquals(amContainer, applicationAttempt.getMasterContainer());
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size());
}
/**
* {@link RMAppAttemptState#SCHEDULED}
*/
private void testAppAttemptScheduledState() {
assertEquals(RMAppAttemptState.SCHEDULED,
applicationAttempt.getAppAttemptState());
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertNull(applicationAttempt.getMasterContainer());
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size());
// Check events
verify(application).handle(any(RMAppEvent.class));
verify(scheduler).
allocate(any(ApplicationAttemptId.class),
any(List.class), any(List.class));
}
/**
* {@link RMAppAttemptState#ALLOCATED}
*/
private void testAppAttemptAllocatedState(Container amContainer) {
assertEquals(RMAppAttemptState.ALLOCATED,
applicationAttempt.getAppAttemptState());
assertEquals(amContainer, applicationAttempt.getMasterContainer());
// Check events
verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class));
verify(scheduler, times(2)).
allocate(
any(ApplicationAttemptId.class), any(List.class), any(List.class));
}
/**
* {@link RMAppAttemptState#FAILED}
*/
private void testAppAttemptFailedState(Container container,
String diagnostics) {
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
assertEquals(diagnostics, applicationAttempt.getDiagnostics());
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertEquals(container, applicationAttempt.getMasterContainer());
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size());
// Check events
verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class));
}
private void submitApplicationAttempt() {
ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
applicationAttempt.handle(
new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
testAppAttemptSubmittedState();
}
private void scheduleApplicationAttempt() {
submitApplicationAttempt();
applicationAttempt.handle(
new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.APP_ACCEPTED));
testAppAttemptScheduledState();
}
private Container allocateApplicationAttempt() {
scheduleApplicationAttempt();
// Mock the allocation of AM container
Container container = mock(Container.class);
Allocation allocation = mock(Allocation.class);
when(allocation.getContainers()).
thenReturn(Collections.singletonList(container));
when(
scheduler.allocate(
any(ApplicationAttemptId.class),
any(List.class),
any(List.class))).
thenReturn(allocation);
applicationAttempt.handle(
new RMAppAttemptContainerAllocatedEvent(
applicationAttempt.getAppAttemptId(),
container));
testAppAttemptAllocatedState(container);
return container;
}
@Test
public void testNewToKilled() {
applicationAttempt.handle(
new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.KILL));
testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS);
}
@Test
public void testSubmittedToFailed() {
submitApplicationAttempt();
String message = "Rejected";
applicationAttempt.handle(
new RMAppAttemptRejectedEvent(
applicationAttempt.getAppAttemptId(), message));
testAppAttemptSubmittedToFailedState(message);
}
@Test
public void testSubmittedToKilled() {
submitApplicationAttempt();
applicationAttempt.handle(
new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.KILL));
testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS);
}
@Test
public void testScheduledToKilled() {
scheduleApplicationAttempt();
applicationAttempt.handle(
new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.KILL));
testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS);
}
@Test
public void testAllocatedToKilled() {
Container amContainer = allocateApplicationAttempt();
applicationAttempt.handle(
new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.KILL));
testAppAttemptKilledState(amContainer, EMPTY_DIAGNOSTICS);
}
@Test
public void testAllocatedToFailed() {
Container amContainer = allocateApplicationAttempt();
String diagnostics = "Launch Failed";
applicationAttempt.handle(
new RMAppAttemptLaunchFailedEvent(
applicationAttempt.getAppAttemptId(),
diagnostics));
testAppAttemptFailedState(amContainer, diagnostics);
}
}