MAPREDUCE-2791. Added missing info on 'job -status' output. Contributed by Devaraj K.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1177487 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1ae5b5e338
commit
0b3c654d83
@ -1478,6 +1478,9 @@ Release 0.23.0 - Unreleased
|
||||
MAPREDUCE-3114. Fixed invalid ApplicationURL on RM WebUI. (Subroto Sanyal
|
||||
via vinodkv)
|
||||
|
||||
MAPREDUCE-2791. Added missing info on 'job -status' output. (Devaraj K via
|
||||
acmurthy)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -590,12 +590,12 @@ public JobReport getReport() {
|
||||
if (getState() == JobState.NEW) {
|
||||
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
|
||||
startTime, finishTime, setupProgress, 0.0f,
|
||||
0.0f, cleanupProgress);
|
||||
0.0f, cleanupProgress, remoteJobConfFile.toString());
|
||||
}
|
||||
|
||||
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
|
||||
startTime, finishTime, setupProgress, computeProgress(mapTasks),
|
||||
computeProgress(reduceTasks), cleanupProgress);
|
||||
computeProgress(reduceTasks), cleanupProgress, remoteJobConfFile.toString());
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
@ -116,7 +116,7 @@ public void testSimple() throws Exception {
|
||||
Job mockJob = mock(Job.class);
|
||||
when(mockJob.getReport()).thenReturn(
|
||||
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
|
||||
0, 0, 0, 0, 0, 0));
|
||||
0, 0, 0, 0, 0, 0, "jobfile"));
|
||||
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
||||
appAttemptId, mockJob);
|
||||
|
||||
@ -193,7 +193,7 @@ public void testResource() throws Exception {
|
||||
Job mockJob = mock(Job.class);
|
||||
when(mockJob.getReport()).thenReturn(
|
||||
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
|
||||
0, 0, 0, 0, 0, 0));
|
||||
0, 0, 0, 0, 0, 0, "jobfile"));
|
||||
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
||||
appAttemptId, mockJob);
|
||||
|
||||
@ -259,7 +259,7 @@ public void testMapReduceScheduling() throws Exception {
|
||||
Job mockJob = mock(Job.class);
|
||||
when(mockJob.getReport()).thenReturn(
|
||||
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
|
||||
0, 0, 0, 0, 0, 0));
|
||||
0, 0, 0, 0, 0, 0, "jobfile"));
|
||||
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
||||
appAttemptId, mockJob);
|
||||
|
||||
@ -373,7 +373,7 @@ void setProgress(float setupProgress, float mapProgress,
|
||||
public JobReport getReport() {
|
||||
return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
|
||||
JobState.RUNNING, 0, 0, this.setupProgress, this.mapProgress,
|
||||
this.reduceProgress, this.cleanupProgress);
|
||||
this.reduceProgress, this.cleanupProgress, "jobfile");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -22,6 +22,7 @@
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapred.JobPriority;
|
||||
import org.apache.hadoop.mapred.TaskCompletionEvent;
|
||||
@ -280,16 +281,14 @@ public static Counters toYarn(org.apache.hadoop.mapreduce.Counters counters) {
|
||||
return yCntrs;
|
||||
}
|
||||
|
||||
public static org.apache.hadoop.mapred.JobStatus fromYarn(
|
||||
JobReport jobreport, String jobFile) {
|
||||
public static JobStatus fromYarn(JobReport jobreport, String trackingUrl) {
|
||||
JobPriority jobPriority = JobPriority.NORMAL;
|
||||
org.apache.hadoop.mapred.JobStatus jobStatus =
|
||||
new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.getJobId()),
|
||||
jobreport.getSetupProgress(), jobreport.getMapProgress(),
|
||||
jobreport.getReduceProgress(), jobreport.getCleanupProgress(),
|
||||
fromYarn(jobreport.getJobState()),
|
||||
jobPriority, jobreport.getUser(), jobreport.getJobName(),
|
||||
jobFile, jobreport.getTrackingUrl());
|
||||
JobStatus jobStatus = new org.apache.hadoop.mapred.JobStatus(
|
||||
fromYarn(jobreport.getJobId()), jobreport.getSetupProgress(), jobreport
|
||||
.getMapProgress(), jobreport.getReduceProgress(), jobreport
|
||||
.getCleanupProgress(), fromYarn(jobreport.getJobState()),
|
||||
jobPriority, jobreport.getUser(), jobreport.getJobName(), jobreport
|
||||
.getJobFile(), trackingUrl);
|
||||
jobStatus.setFailureInfo(jobreport.getDiagnostics());
|
||||
return jobStatus;
|
||||
}
|
||||
|
@ -31,6 +31,7 @@ public interface JobReport {
|
||||
public abstract String getJobName();
|
||||
public abstract String getTrackingUrl();
|
||||
public abstract String getDiagnostics();
|
||||
public abstract String getJobFile();
|
||||
|
||||
public abstract void setJobId(JobId jobId);
|
||||
public abstract void setJobState(JobState jobState);
|
||||
@ -44,4 +45,5 @@ public interface JobReport {
|
||||
public abstract void setJobName(String jobName);
|
||||
public abstract void setTrackingUrl(String trackingUrl);
|
||||
public abstract void setDiagnostics(String diagnostics);
|
||||
public abstract void setJobFile(String jobFile);
|
||||
}
|
||||
|
@ -229,7 +229,19 @@ public void setDiagnostics(String diagnostics) {
|
||||
maybeInitBuilder();
|
||||
builder.setDiagnostics(diagnostics);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getJobFile() {
|
||||
JobReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getJobFile();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setJobFile(String jobFile) {
|
||||
maybeInitBuilder();
|
||||
builder.setJobFile(jobFile);
|
||||
}
|
||||
|
||||
private JobIdPBImpl convertFromProtoFormat(JobIdProto p) {
|
||||
return new JobIdPBImpl(p);
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ public static TaskAttemptId newTaskAttemptId(TaskId taskId, int attemptId) {
|
||||
public static JobReport newJobReport(JobId jobId, String jobName,
|
||||
String userName, JobState state, long startTime, long finishTime,
|
||||
float setupProgress, float mapProgress, float reduceProgress,
|
||||
float cleanupProgress) {
|
||||
float cleanupProgress, String jobFile) {
|
||||
JobReport report = Records.newRecord(JobReport.class);
|
||||
report.setJobId(jobId);
|
||||
report.setJobName(jobName);
|
||||
@ -67,6 +67,7 @@ public static JobReport newJobReport(JobId jobId, String jobName,
|
||||
report.setCleanupProgress(cleanupProgress);
|
||||
report.setMapProgress(mapProgress);
|
||||
report.setReduceProgress(reduceProgress);
|
||||
report.setJobFile(jobFile);
|
||||
return report;
|
||||
}
|
||||
}
|
@ -145,6 +145,7 @@ message JobReportProto {
|
||||
optional string jobName = 10;
|
||||
optional string trackingUrl = 11;
|
||||
optional string diagnostics = 12;
|
||||
optional string jobFile = 13;
|
||||
}
|
||||
|
||||
enum TaskAttemptCompletionEventStatusProto {
|
||||
|
@ -462,8 +462,6 @@ public String toString() {
|
||||
sb.append(status.getReduceProgress()).append("\n");
|
||||
sb.append("Job state: ");
|
||||
sb.append(status.getState()).append("\n");
|
||||
sb.append("history URL: ");
|
||||
sb.append(status.getHistoryFile()).append("\n");
|
||||
sb.append("retired: ").append(status.isRetired()).append("\n");
|
||||
sb.append("reason for failure: ").append(reasonforFailure);
|
||||
return sb.toString();
|
||||
|
@ -48,6 +48,7 @@
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
@ -96,9 +97,11 @@ public CompletedJob(Configuration conf, JobId jobId, Path historyFile,
|
||||
report.setFinishTime(jobInfo.getFinishTime());
|
||||
report.setJobName(jobInfo.getJobname());
|
||||
report.setUser(jobInfo.getUsername());
|
||||
//TODO Possibly populate job progress. Never used.
|
||||
//report.setMapProgress(progress)
|
||||
//report.setReduceProgress(progress)
|
||||
report.setMapProgress((float) getCompletedMaps() / getTotalMaps());
|
||||
report.setReduceProgress((float) getCompletedReduces() / getTotalReduces());
|
||||
report.setJobFile(confFile.toString());
|
||||
report.setTrackingUrl(JobHistoryUtils.getHistoryUrl(conf, TypeConverter
|
||||
.toYarn(TypeConverter.fromYarn(jobId)).getAppId()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -25,6 +25,7 @@
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -86,6 +87,7 @@ public class ClientServiceDelegate {
|
||||
private MRClientProtocol realProxy = null;
|
||||
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||
private static String UNKNOWN_USER = "Unknown User";
|
||||
private String trackingUrl;
|
||||
|
||||
public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
|
||||
JobID jobId, MRClientProtocol historyServerProxy) {
|
||||
@ -129,6 +131,9 @@ private MRClientProtocol getProxy() throws YarnRemoteException {
|
||||
// Possibly allow nulls through the PB tunnel, otherwise deal with an exception
|
||||
// and redirect to the history server.
|
||||
ApplicationReport application = rm.getApplicationReport(appId);
|
||||
if (application != null) {
|
||||
trackingUrl = application.getTrackingUrl();
|
||||
}
|
||||
String serviceAddr = null;
|
||||
while (application == null || ApplicationState.RUNNING.equals(application.getState())) {
|
||||
if (application == null) {
|
||||
@ -334,9 +339,14 @@ public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
|
||||
request.setJobId(jobId);
|
||||
JobReport report = ((GetJobReportResponse) invoke("getJobReport",
|
||||
GetJobReportRequest.class, request)).getJobReport();
|
||||
String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
|
||||
|
||||
return TypeConverter.fromYarn(report, jobFile);
|
||||
if (StringUtils.isEmpty(report.getJobFile())) {
|
||||
String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
|
||||
report.setJobFile(jobFile);
|
||||
}
|
||||
String historyTrackingUrl = report.getTrackingUrl();
|
||||
return TypeConverter.fromYarn(report, "http://"
|
||||
+ (StringUtils.isNotEmpty(historyTrackingUrl) ? historyTrackingUrl
|
||||
: trackingUrl));
|
||||
}
|
||||
|
||||
public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
|
||||
|
@ -124,6 +124,26 @@ public void testHistoryServerNotConfigured() throws Exception {
|
||||
Assert.assertEquals(JobStatus.State.SUCCEEDED, jobStatus.getState());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testJobReportFromHistoryServer() throws Exception {
|
||||
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
|
||||
when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(
|
||||
getJobReportResponseFromHistoryServer());
|
||||
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
|
||||
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
|
||||
.thenReturn(null);
|
||||
ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
|
||||
historyServerProxy, rm);
|
||||
|
||||
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
|
||||
Assert.assertNotNull(jobStatus);
|
||||
Assert.assertEquals("TestJobFilePath", jobStatus.getJobFile());
|
||||
Assert.assertEquals("http://TestTrackingUrl", jobStatus.getTrackingUrl());
|
||||
Assert.assertEquals(1.0f, jobStatus.getMapProgress());
|
||||
Assert.assertEquals(1.0f, jobStatus.getReduceProgress());
|
||||
}
|
||||
|
||||
private GetJobReportRequest getJobReportRequest() {
|
||||
GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
|
||||
request.setJobId(jobId);
|
||||
@ -170,4 +190,17 @@ private ClientServiceDelegate getClientServiceDelegate(
|
||||
return clientServiceDelegate;
|
||||
}
|
||||
|
||||
private GetJobReportResponse getJobReportResponseFromHistoryServer() {
|
||||
GetJobReportResponse jobReportResponse = Records
|
||||
.newRecord(GetJobReportResponse.class);
|
||||
JobReport jobReport = Records.newRecord(JobReport.class);
|
||||
jobReport.setJobId(jobId);
|
||||
jobReport.setJobState(JobState.SUCCEEDED);
|
||||
jobReport.setMapProgress(1.0f);
|
||||
jobReport.setReduceProgress(1.0f);
|
||||
jobReport.setJobFile("TestJobFilePath");
|
||||
jobReport.setTrackingUrl("TestTrackingUrl");
|
||||
jobReportResponse.setJobReport(jobReport);
|
||||
return jobReportResponse;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user