MAPREDUCE-4290. Fix Yarn Applicaiton Status to MR JobState conversion. (Contributed by Devaraj K)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1353684 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2012-06-25 17:54:48 +00:00
parent 15b61c787d
commit 75269b5488
4 changed files with 83 additions and 10 deletions

View File

@ -186,6 +186,9 @@ Branch-2 ( Unreleased changes )
MAPREDUCE-4336. Distributed Shell fails when used with the CapacityScheduler
(ahmed via tucu)
MAPREDUCE-4290. Fix Yarn Applicaiton Status to MR JobState conversion.
(Devaraj K via sseth)
Release 2.0.0-alpha - 05-23-2012
INCOMPATIBLE CHANGES

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
@ -376,22 +377,27 @@ public static List<TaskReport> fromYarn(
}
return reports;
}
public static JobStatus.State fromYarn(YarnApplicationState state) {
switch (state) {
public static State fromYarn(YarnApplicationState yarnApplicationState,
FinalApplicationStatus finalApplicationStatus) {
switch (yarnApplicationState) {
case NEW:
case SUBMITTED:
return State.PREP;
case RUNNING:
return State.RUNNING;
case FINISHED:
return State.SUCCEEDED;
if (finalApplicationStatus == FinalApplicationStatus.SUCCEEDED) {
return State.SUCCEEDED;
} else if (finalApplicationStatus == FinalApplicationStatus.KILLED) {
return State.KILLED;
}
case FAILED:
return State.FAILED;
case KILLED:
return State.KILLED;
}
throw new YarnException("Unrecognized application state: " + state);
throw new YarnException("Unrecognized application state: " + yarnApplicationState);
}
private static final String TT_NAME_PREFIX = "tracker_";
@ -417,7 +423,7 @@ public static JobStatus fromYarn(ApplicationReport application,
new JobStatus(
TypeConverter.fromYarn(application.getApplicationId()),
0.0f, 0.0f, 0.0f, 0.0f,
TypeConverter.fromYarn(application.getYarnApplicationState()),
TypeConverter.fromYarn(application.getYarnApplicationState(), application.getFinalApplicationStatus()),
org.apache.hadoop.mapreduce.JobPriority.NORMAL,
application.getUser(), application.getName(),
application.getQueue(), jobFile, trackingUrl, false

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
@ -45,7 +46,7 @@ public class TestTypeConverter {
@Test
public void testEnums() throws Exception {
for (YarnApplicationState applicationState : YarnApplicationState.values()) {
TypeConverter.fromYarn(applicationState);
TypeConverter.fromYarn(applicationState, FinalApplicationStatus.FAILED);
}
for (TaskType taskType : TaskType.values()) {
@ -63,8 +64,6 @@ public void testEnums() throws Exception {
for (TaskState taskState : TaskState.values()) {
TypeConverter.fromYarn(taskState);
}
}
@Test

View File

@ -19,13 +19,26 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@ -35,7 +48,7 @@ public class TestResourceMgrDelegate {
/**
* Tests that getRootQueues makes a request for the (recursive) child queues
*/
@Test
@Test
public void testGetRootQueues() throws IOException, InterruptedException {
ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
GetQueueInfoResponse response = Mockito.mock(GetQueueInfoResponse.class);
@ -60,4 +73,56 @@ public void testGetRootQueues() throws IOException, InterruptedException {
argument.getValue().getRecursive());
}
@Test
public void tesAllJobs() throws Exception {
ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
GetAllApplicationsResponse allApplicationsResponse = Records
.newRecord(GetAllApplicationsResponse.class);
List<ApplicationReport> applications = new ArrayList<ApplicationReport>();
applications.add(getApplicationReport(YarnApplicationState.FINISHED,
FinalApplicationStatus.FAILED));
applications.add(getApplicationReport(YarnApplicationState.FINISHED,
FinalApplicationStatus.SUCCEEDED));
applications.add(getApplicationReport(YarnApplicationState.FINISHED,
FinalApplicationStatus.KILLED));
applications.add(getApplicationReport(YarnApplicationState.FAILED,
FinalApplicationStatus.FAILED));
allApplicationsResponse.setApplicationList(applications);
Mockito.when(
applicationsManager.getAllApplications(Mockito
.any(GetAllApplicationsRequest.class))).thenReturn(
allApplicationsResponse);
ResourceMgrDelegate resourceMgrDelegate = new ResourceMgrDelegate(
new YarnConfiguration(), applicationsManager);
JobStatus[] allJobs = resourceMgrDelegate.getAllJobs();
Assert.assertEquals(State.FAILED, allJobs[0].getState());
Assert.assertEquals(State.SUCCEEDED, allJobs[1].getState());
Assert.assertEquals(State.KILLED, allJobs[2].getState());
Assert.assertEquals(State.FAILED, allJobs[3].getState());
}
private ApplicationReport getApplicationReport(
YarnApplicationState yarnApplicationState,
FinalApplicationStatus finalApplicationStatus) {
ApplicationReport appReport = Mockito.mock(ApplicationReport.class);
ApplicationResourceUsageReport appResources = Mockito
.mock(ApplicationResourceUsageReport.class);
Mockito.when(appReport.getApplicationId()).thenReturn(
Records.newRecord(ApplicationId.class));
Mockito.when(appResources.getNeededResources()).thenReturn(
Records.newRecord(Resource.class));
Mockito.when(appResources.getReservedResources()).thenReturn(
Records.newRecord(Resource.class));
Mockito.when(appResources.getUsedResources()).thenReturn(
Records.newRecord(Resource.class));
Mockito.when(appReport.getApplicationResourceUsageReport()).thenReturn(
appResources);
Mockito.when(appReport.getYarnApplicationState()).thenReturn(
yarnApplicationState);
Mockito.when(appReport.getFinalApplicationStatus()).thenReturn(
finalApplicationStatus);
return appReport;
}
}