MAPREDUCE-2672. MR-279: JobHistory Server needs Analysis this job. (Robert Evans via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1171297 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2011-09-15 22:21:00 +00:00
parent e66d697a66
commit d9ba4670ed
20 changed files with 687 additions and 134 deletions

View File

@ -295,6 +295,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2966. Added ShutDown hooks for MRV2 processes so that they can
gracefully exit. (Abhijit Suresh Shingate via vinodkv)
MAPREDUCE-2672. MR-279: JobHistory Server needs Analysis this job.
(Robert Evans via mahadev)
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -38,31 +38,50 @@ public interface TaskAttempt {
float getProgress();
TaskAttemptState getState();
/** Has attempt reached the final state or not.
/**
* Has attempt reached the final state or not.
* @return true if it has finished, else false
*/
boolean isFinished();
/**If container Assigned then return container ID, otherwise null.
/**
* @return the container ID if a container is assigned, otherwise null.
*/
ContainerId getAssignedContainerID();
/**If container Assigned then return container mgr address, otherwise null.
/**
* @return container mgr address if a container is assigned, otherwise null.
*/
String getAssignedContainerMgrAddress();
/**If container Assigned then return the node's http address, otherwise null.
/**
* @return node's http address if a container is assigned, otherwise null.
*/
String getNodeHttpAddress();
/** Returns time at which container is launched. If container is not launched
/**
* @return time at which container is launched. If container is not launched
* yet, returns 0.
*/
long getLaunchTime();
/** Returns attempt's finish time. If attempt is not finished
/**
* @return attempt's finish time. If attempt is not finished
* yet, returns 0.
*/
long getFinishTime();
/**
* @return The attempt's shuffle finish time if the attempt is a reduce. If
* attempt is not finished yet, returns 0.
*/
long getShuffleFinishTime();
/**
* @return The attempt's sort or merge finish time if the attempt is a reduce.
* If attempt is not finished yet, returns 0.
*/
long getSortFinishTime();
/**
* @return the port shuffle is on.

View File

@ -695,6 +695,25 @@ public long getFinishTime() {
}
}
@Override
public long getShuffleFinishTime() {
readLock.lock();
try {
return this.reportedStatus.shuffleFinishTime;
} finally {
readLock.unlock();
}
}
@Override
public long getSortFinishTime() {
readLock.lock();
try {
return this.reportedStatus.sortFinishTime;
} finally {
readLock.unlock();
}
}
@Override
public int getShufflePort() {
@ -751,6 +770,7 @@ public TaskAttemptReport getReport() {
result.setProgress(reportedStatus.progress);
result.setStartTime(launchTime);
result.setFinishTime(finishTime);
result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
result.setDiagnosticInfo(reportedStatus.diagnosticInfo);
result.setPhase(reportedStatus.phase);
result.setStateString(reportedStatus.stateString);

View File

@ -22,6 +22,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -260,6 +261,16 @@ public List<String> getDiagnostics() {
public String getAssignedContainerMgrAddress() {
return "localhost:9998";
}
@Override
public long getShuffleFinishTime() {
return 0;
}
@Override
public long getSortFinishTime() {
return 0;
}
};
}
@ -454,7 +465,7 @@ public Map<TaskId, Task> getTasks(TaskType taskType) {
@Override
public List<String> getDiagnostics() {
throw new UnsupportedOperationException("Not supported yet.");
return Collections.<String>emptyList();
}
@Override
@ -465,7 +476,7 @@ public boolean checkAccess(UserGroupInformation callerUGI,
@Override
public String getUserName() {
throw new UnsupportedOperationException("Not supported yet.");
return "mock";
}
@Override
@ -475,7 +486,7 @@ public Path getConfFile() {
@Override
public Map<JobACL, AccessControlList> getJobACLs() {
throw new UnsupportedOperationException("Not supported yet.");
return Collections.<JobACL, AccessControlList>emptyMap();
}
};
}

View File

@ -693,6 +693,16 @@ public long getFinishTime() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long getShuffleFinishTime() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long getSortFinishTime() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public String getAssignedContainerMgrAddress() {
throw new UnsupportedOperationException("Not supported yet.");

View File

@ -21,12 +21,17 @@
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.APP_ID;
import static org.junit.Assert.assertEquals;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -115,18 +120,42 @@ public long getStartTime() {
WebAppTests.testPage(AppView.class, AppContext.class, new TestAppContext());
}
@Test public void testJobView() {
WebAppTests.testPage(JobPage.class, AppContext.class, new TestAppContext());
AppContext appContext = new TestAppContext();
Map<String, String> params = getJobParams(appContext);
WebAppTests.testPage(JobPage.class, AppContext.class, appContext, params);
}
@Test public void testTasksView() {
WebAppTests.testPage(TasksPage.class, AppContext.class,
new TestAppContext());
AppContext appContext = new TestAppContext();
Map<String, String> params = getTaskParams(appContext);
WebAppTests.testPage(TasksPage.class, AppContext.class, appContext, params);
}
@Test public void testTaskView() {
WebAppTests.testPage(TaskPage.class, AppContext.class,
new TestAppContext());
AppContext appContext = new TestAppContext();
Map<String, String> params = getTaskParams(appContext);
WebAppTests.testPage(TaskPage.class, AppContext.class, appContext, params);
}
public static Map<String, String> getJobParams(AppContext appContext) {
JobId jobId = appContext.getAllJobs().entrySet().iterator().next().getKey();
Map<String, String> params = new HashMap<String, String>();
params.put(AMParams.JOB_ID, MRApps.toString(jobId));
return params;
}
public static Map<String, String> getTaskParams(AppContext appContext) {
JobId jobId = appContext.getAllJobs().entrySet().iterator().next().getKey();
Entry<TaskId, Task> e = appContext.getJob(jobId).getTasks().entrySet().iterator().next();
e.getValue().getType();
Map<String, String> params = new HashMap<String, String>();
params.put(AMParams.JOB_ID, MRApps.toString(jobId));
params.put(AMParams.TASK_ID, e.getKey().toString());
params.put(AMParams.TASK_TYPE, MRApps.taskSymbol(e.getValue().getType()));
return params;
}
public static void main(String[] args) {

View File

@ -24,6 +24,10 @@ public interface TaskAttemptReport {
public abstract float getProgress();
public abstract long getStartTime();
public abstract long getFinishTime();
/** @return the shuffle finish time. Applicable only for reduce attempts */
public abstract long getShuffleFinishTime();
/** @return the sort/merge finish time. Applicable only for reduce attempts */
public abstract long getSortFinishTime();
public abstract Counters getCounters();
public abstract String getDiagnosticInfo();
public abstract String getStateString();
@ -39,4 +43,14 @@ public interface TaskAttemptReport {
public abstract void setStateString(String stateString);
public abstract void setPhase(Phase phase);
/**
* Set the shuffle finish time. Applicable only for reduce attempts
* @param time the time the shuffle finished.
*/
public abstract void setShuffleFinishTime(long time);
/**
* Set the sort/merge finish time. Applicable only for reduce attempts
* @param time the time the shuffle finished.
*/
public abstract void setSortFinishTime(long time);
}

View File

@ -127,6 +127,31 @@ public void setFinishTime(long finishTime) {
maybeInitBuilder();
builder.setFinishTime((finishTime));
}
@Override
public long getShuffleFinishTime() {
TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getShuffleFinishTime());
}
@Override
public void setShuffleFinishTime(long time) {
maybeInitBuilder();
builder.setShuffleFinishTime(time);
}
@Override
public long getSortFinishTime() {
TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getSortFinishTime());
}
@Override
public void setSortFinishTime(long time) {
maybeInitBuilder();
builder.setSortFinishTime(time);
}
@Override
public TaskAttemptId getTaskAttemptId() {
TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
@ -262,7 +287,4 @@ private PhaseProto convertToProtoFormat(Phase e) {
private Phase convertFromProtoFormat(PhaseProto e) {
return MRProtoUtils.convertFromProtoFormat(e);
}
}

View File

@ -132,6 +132,7 @@ public void setStartTime(long startTime) {
maybeInitBuilder();
builder.setStartTime((startTime));
}
@Override
public long getFinishTime() {
TaskReportProtoOrBuilder p = viaProto ? proto : builder;
@ -143,6 +144,7 @@ public void setFinishTime(long finishTime) {
maybeInitBuilder();
builder.setFinishTime((finishTime));
}
@Override
public TaskId getTaskId() {
TaskReportProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -117,6 +117,8 @@ message TaskAttemptReportProto {
optional string diagnostic_info = 7;
optional string state_string = 8;
optional PhaseProto phase = 9;
optional int64 shuffle_finish_time = 10;
optional int64 sort_finish_time=11;
}
enum JobStateProto {

View File

@ -379,46 +379,46 @@ public void printAll() {
}
}
/** Get the job submit time */
/** @return the job submit time */
public long getSubmitTime() { return submitTime; }
/** Get the job finish time */
/** @return the job finish time */
public long getFinishTime() { return finishTime; }
/** Get the job id */
/** @return the job id */
public JobID getJobId() { return jobid; }
/** Get the user name */
/** @return the user name */
public String getUsername() { return username; }
/** Get the job name */
/** @return the job name */
public String getJobname() { return jobname; }
/** Get the job queue name */
/** @return the job queue name */
public String getJobQueueName() { return jobQueueName; }
/** Get the path for the job configuration file */
/** @return the path for the job configuration file */
public String getJobConfPath() { return jobConfPath; }
/** Get the job launch time */
/** @return the job launch time */
public long getLaunchTime() { return launchTime; }
/** Get the total number of maps */
/** @return the total number of maps */
public long getTotalMaps() { return totalMaps; }
/** Get the total number of reduces */
/** @return the total number of reduces */
public long getTotalReduces() { return totalReduces; }
/** Get the total number of failed maps */
/** @return the total number of failed maps */
public long getFailedMaps() { return failedMaps; }
/** Get the number of failed reduces */
/** @return the number of failed reduces */
public long getFailedReduces() { return failedReduces; }
/** Get the number of finished maps */
/** @return the number of finished maps */
public long getFinishedMaps() { return finishedMaps; }
/** Get the number of finished reduces */
/** @return the number of finished reduces */
public long getFinishedReduces() { return finishedReduces; }
/** Get the job status */
/** @return the job status */
public String getJobStatus() { return jobStatus; }
public String getErrorInfo() { return errorInfo; }
/** Get the counters for the job */
/** @return the counters for the job */
public Counters getTotalCounters() { return totalCounters; }
/** Get the map counters for the job */
/** @return the map counters for the job */
public Counters getMapCounters() { return mapCounters; }
/** Get the reduce counters for the job */
/** @return the reduce counters for the job */
public Counters getReduceCounters() { return reduceCounters; }
/** Get the map of all tasks in this job */
/** @return the map of all tasks in this job */
public Map<TaskID, TaskInfo> getAllTasks() { return tasksMap; }
/** Get the priority of this job */
/** @return the priority of this job */
public String getPriority() { return priority.toString(); }
public Map<JobACL, AccessControlList> getJobACLs() { return jobACLs; }
}
@ -458,27 +458,27 @@ public void printAll() {
}
}
/** Get the Task ID */
/** @return the Task ID */
public TaskID getTaskId() { return taskId; }
/** Get the start time of this task */
/** @return the start time of this task */
public long getStartTime() { return startTime; }
/** Get the finish time of this task */
/** @return the finish time of this task */
public long getFinishTime() { return finishTime; }
/** Get the task type */
/** @return the task type */
public TaskType getTaskType() { return taskType; }
/** Get the split locations */
/** @return the split locations */
public String getSplitLocations() { return splitLocations; }
/** Get the counters for this task */
/** @return the counters for this task */
public Counters getCounters() { return counters; }
/** Get the task status */
/** @return the task status */
public String getTaskStatus() { return status; }
/** Get the attempt Id that caused this task to fail */
/** @return the attempt Id that caused this task to fail */
public TaskAttemptID getFailedDueToAttemptId() {
return failedDueToAttemptId;
}
/** Get the error */
/** @return the error */
public String getError() { return error; }
/** Get the map of all attempts for this task */
/** @return the map of all attempts for this task */
public Map<TaskAttemptID, TaskAttemptInfo> getAllTaskAttempts() {
return attemptsMap;
}
@ -530,33 +530,33 @@ public void printAll() {
}
}
/** Get the attempt Id */
/** @return the attempt Id */
public TaskAttemptID getAttemptId() { return attemptId; }
/** Get the start time of the attempt */
/** @return the start time of the attempt */
public long getStartTime() { return startTime; }
/** Get the finish time of the attempt */
/** @return the finish time of the attempt */
public long getFinishTime() { return finishTime; }
/** Get the shuffle finish time. Applicable only for reduce attempts */
/** @return the shuffle finish time. Applicable only for reduce attempts */
public long getShuffleFinishTime() { return shuffleFinishTime; }
/** Get the sort finish time. Applicable only for reduce attempts */
/** @return the sort finish time. Applicable only for reduce attempts */
public long getSortFinishTime() { return sortFinishTime; }
/** Get the map finish time. Applicable only for map attempts */
/** @return the map finish time. Applicable only for map attempts */
public long getMapFinishTime() { return mapFinishTime; }
/** Get the error string */
/** @return the error string */
public String getError() { return error; }
/** Get the state */
/** @return the state */
public String getState() { return state; }
/** Get the task status */
/** @return the task status */
public String getTaskStatus() { return status; }
/** Get the task type */
/** @return the task type */
public TaskType getTaskType() { return taskType; }
/** Get the tracker name where the attempt executed */
/** @return the tracker name where the attempt executed */
public String getTrackerName() { return trackerName; }
/** Get the host name */
/** @return the host name */
public String getHostname() { return hostname; }
/** Get the counters for the attempt */
/** @return the counters for the attempt */
public Counters getCounters() { return counters; }
/** Get the HTTP port for the tracker */
/** @return the HTTP port for the tracker */
public int getHttpPort() { return httpPort; }
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.TaskStatus;
@ -28,7 +26,6 @@
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapred.ProgressSplitsBlock;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.avro.util.Utf8;

View File

@ -87,7 +87,9 @@ public CompletedJob(Configuration conf, JobId jobId, Path historyFile,
user = userName;
counters = TypeConverter.toYarn(jobInfo.getTotalCounters());
diagnostics.add(jobInfo.getErrorInfo());
report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class);
report =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
JobReport.class);
report.setJobId(jobId);
report.setJobState(JobState.valueOf(jobInfo.getJobStatus()));
report.setStartTime(jobInfo.getLaunchTime());
@ -194,11 +196,12 @@ public int compare(TaskAttempt o1, TaskAttempt o2) {
int attemptRunTime = -1;
if (taskAttempt.getLaunchTime() != 0 && taskAttempt.getFinishTime() != 0) {
attemptRunTime = (int) (taskAttempt.getFinishTime() - taskAttempt
.getLaunchTime());
attemptRunTime =
(int) (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
}
// Default to KILLED
TaskAttemptCompletionEventStatus taceStatus = TaskAttemptCompletionEventStatus.KILLED;
TaskAttemptCompletionEventStatus taceStatus =
TaskAttemptCompletionEventStatus.KILLED;
String taStateString = taskAttempt.getState().toString();
try {
taceStatus = TaskAttemptCompletionEventStatus.valueOf(taStateString);
@ -224,7 +227,8 @@ public Map<TaskId, Task> getTasks() {
}
//History data is leisurely loaded when task level data is requested
private synchronized void loadFullHistoryData(boolean loadTasks, Path historyFileAbsolute) throws IOException {
private synchronized void loadFullHistoryData(boolean loadTasks,
Path historyFileAbsolute) throws IOException {
LOG.info("Loading history file: [" + historyFileAbsolute + "]");
if (jobInfo != null) {
return; //data already loaded
@ -232,11 +236,13 @@ private synchronized void loadFullHistoryData(boolean loadTasks, Path historyFil
if (historyFileAbsolute != null) {
try {
JobHistoryParser parser = new JobHistoryParser(historyFileAbsolute.getFileSystem(conf), historyFileAbsolute);
jobInfo = parser.parse();
JobHistoryParser parser =
new JobHistoryParser(historyFileAbsolute.getFileSystem(conf),
historyFileAbsolute);
jobInfo = parser.parse();
} catch (IOException e) {
throw new YarnException("Could not load history file " + historyFileAbsolute,
e);
throw new YarnException("Could not load history file "
+ historyFileAbsolute, e);
}
} else {
throw new IOException("History file not found");
@ -295,7 +301,8 @@ public Map<TaskId, Task> getTasks(TaskType taskType) {
}
@Override
public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
public
boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
if (!UserGroupInformation.isSecurityEnabled()) {
return true;
}

View File

@ -71,6 +71,8 @@ public class CompletedTaskAttempt implements TaskAttempt {
report.setStartTime(attemptInfo.getStartTime());
report.setFinishTime(attemptInfo.getFinishTime());
report.setShuffleFinishTime(attemptInfo.getShuffleFinishTime());
report.setSortFinishTime(attemptInfo.getSortFinishTime());
if (localDiagMessage != null) {
report.setDiagnosticInfo(attemptInfo.getError() + ", " + localDiagMessage);
} else {
@ -158,10 +160,19 @@ public long getLaunchTime() {
public long getFinishTime() {
return report.getFinishTime();
}
@Override
public long getShuffleFinishTime() {
return report.getShuffleFinishTime();
}
@Override
public long getSortFinishTime() {
return report.getSortFinishTime();
}
@Override
public int getShufflePort() {
throw new UnsupportedOperationException("Not supported yet.");
}
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@ -55,6 +56,12 @@ public class HsJobBlock extends HtmlBlock {
int killedReduceAttempts = 0;
int failedReduceAttempts = 0;
int successfulReduceAttempts = 0;
long avgMapTime = 0;
long avgReduceTime = 0;
long avgShuffleTime = 0;
long avgSortTime = 0;
int numMaps;
int numReduces;
@Inject HsJobBlock(AppContext appctx) {
appContext = appctx;
@ -96,7 +103,7 @@ public class HsJobBlock extends HtmlBlock {
_("Started:", new Date(startTime)).
_("Finished:", new Date(finishTime)).
_("Elapsed:", StringUtils.formatTime(
Times.elapsed(startTime, finishTime)));
Times.elapsed(startTime, finishTime, false)));
List<String> diagnostics = job.getDiagnostics();
if(diagnostics != null && !diagnostics.isEmpty()) {
@ -106,7 +113,16 @@ public class HsJobBlock extends HtmlBlock {
}
infoBlock._("Diagnostics:", b.toString());
}
if(numMaps > 0) {
infoBlock._("Average Map Time", StringUtils.formatTime(avgMapTime));
}
if(numReduces > 0) {
infoBlock._("Average Reduce Time", StringUtils.formatTime(avgReduceTime));
infoBlock._("Average Shuffle Time", StringUtils.formatTime(avgShuffleTime));
infoBlock._("Average Merge Time", StringUtils.formatTime(avgSortTime));
}
for(Map.Entry<JobACL, AccessControlList> entry : acls.entrySet()) {
infoBlock._("ACL "+entry.getKey().getAclName()+":",
entry.getValue().getAclString());
@ -174,6 +190,8 @@ public class HsJobBlock extends HtmlBlock {
* @param job the job to get counts for.
*/
private void countTasksAndAttempts(Job job) {
numReduces = 0;
numMaps = 0;
Map<TaskId, Task> tasks = job.getTasks();
for (Task task : tasks.values()) {
// Attempts counts
@ -203,14 +221,38 @@ private void countTasksAndAttempts(Job job) {
successfulMapAttempts += successful;
failedMapAttempts += failed;
killedMapAttempts += killed;
if(attempt.getState() == TaskAttemptState.SUCCEEDED) {
numMaps++;
avgMapTime += (attempt.getFinishTime() -
attempt.getLaunchTime());
}
break;
case REDUCE:
successfulReduceAttempts += successful;
failedReduceAttempts += failed;
killedReduceAttempts += killed;
if(attempt.getState() == TaskAttemptState.SUCCEEDED) {
numReduces++;
avgShuffleTime += (attempt.getShuffleFinishTime() -
attempt.getLaunchTime());
avgSortTime += attempt.getSortFinishTime() -
attempt.getLaunchTime();
avgReduceTime += (attempt.getFinishTime() -
attempt.getShuffleFinishTime());
}
break;
}
}
}
if(numMaps > 0) {
avgMapTime = avgMapTime / numMaps;
}
if(numReduces > 0) {
avgReduceTime = avgReduceTime / numReduces;
avgShuffleTime = avgShuffleTime / numReduces;
avgSortTime = avgSortTime / numReduces;
}
}
}

View File

@ -18,27 +18,32 @@
package org.apache.hadoop.mapreduce.v2.hs.webapp;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.postInitID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
import java.util.Collection;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.webapp.App;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TD;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TFOOT;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.THEAD;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.InputType;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.common.base.Joiner;
@ -67,47 +72,162 @@ protected void render(Block html) {
h2($(TITLE));
return;
}
TBODY<TABLE<Hamlet>> tbody = html.
TaskType type = null;
String symbol = $(TASK_TYPE);
if (!symbol.isEmpty()) {
type = MRApps.taskType(symbol);
} else {
type = app.getTask().getType();
}
TR<THEAD<TABLE<Hamlet>>> headRow = html.
table("#attempts").
thead().
tr().
tr();
headRow.
th(".id", "Attempt").
th(".state", "State").
th(".node", "node").
th(".tsh", "Started").
th(".tsh", "Finished").
th(".tsh", "Elapsed").
th(".note", "Note")._()._().
tbody();
th(".tsh", "Start Time");
if(type == TaskType.REDUCE) {
headRow.th("Shuffle Finish Time");
headRow.th("Merge Finish Time");
}
headRow.th("Finish Time"); //Attempt
if(type == TaskType.REDUCE) {
headRow.th("Elapsed Time Shuffle"); //Attempt
headRow.th("Elapsed Time Merge"); //Attempt
headRow.th("Elapsed Time Reduce"); //Attempt
}
headRow.th("Elapsed Time").
th(".note", "Note");
TBODY<TABLE<Hamlet>> tbody = headRow._()._().tbody();
for (TaskAttempt ta : getTaskAttempts()) {
String taid = MRApps.toString(ta.getID());
ContainerId containerId = ta.getAssignedContainerID();
String nodeHttpAddr = ta.getNodeHttpAddress();
long startTime = ta.getLaunchTime();
long finishTime = ta.getFinishTime();
long elapsed = Times.elapsed(startTime, finishTime);
TD<TR<TBODY<TABLE<Hamlet>>>> nodeTd = tbody.
tr().
td(".id", taid).
td(".state", ta.getState().toString()).
td().
a(".nodelink", url("http://", nodeHttpAddr), nodeHttpAddr);
if (containerId != null) {
String containerIdStr = ConverterUtils.toString(containerId);
nodeTd._(" ").
a(".logslink", url("http://", nodeHttpAddr, "yarn", "containerlogs",
containerIdStr), "logs");
long attemptStartTime = ta.getLaunchTime();
long shuffleFinishTime = -1;
long sortFinishTime = -1;
long attemptFinishTime = ta.getFinishTime();
long elapsedShuffleTime = -1;
long elapsedSortTime = -1;
long elapsedReduceTime = -1;
if(type == TaskType.REDUCE) {
shuffleFinishTime = ta.getShuffleFinishTime();
sortFinishTime = ta.getSortFinishTime();
elapsedShuffleTime =
Times.elapsed(attemptStartTime, shuffleFinishTime, false);
elapsedSortTime =
Times.elapsed(shuffleFinishTime, sortFinishTime, false);
elapsedReduceTime =
Times.elapsed(sortFinishTime, attemptFinishTime, false);
}
nodeTd._().
td(".ts", Times.format(startTime)).
td(".ts", Times.format(finishTime)).
td(".dt", StringUtils.formatTime(elapsed)).
td(".note", Joiner.on('\n').join(ta.getDiagnostics()))._();
long attemptElapsed =
Times.elapsed(attemptStartTime, attemptFinishTime, false);
int sortId = ta.getID().getId() + (ta.getID().getTaskId().getId() * 10000);
TR<TBODY<TABLE<Hamlet>>> row = tbody.tr();
row.
td().
br().$title(String.valueOf(sortId))._(). // sorting
_(taid)._().
td(ta.getState().toString()).
td().a(".nodelink", url("http://", nodeHttpAddr), nodeHttpAddr)._();
row.td().
br().$title(String.valueOf(attemptStartTime))._().
_(Times.format(attemptStartTime))._();
if(type == TaskType.REDUCE) {
row.td().
br().$title(String.valueOf(shuffleFinishTime))._().
_(Times.format(shuffleFinishTime))._();
row.td().
br().$title(String.valueOf(sortFinishTime))._().
_(Times.format(sortFinishTime))._();
}
row.
td().
br().$title(String.valueOf(attemptFinishTime))._().
_(Times.format(attemptFinishTime))._();
if(type == TaskType.REDUCE) {
row.td().
br().$title(String.valueOf(elapsedShuffleTime))._().
_(formatTime(elapsedShuffleTime))._();
row.td().
br().$title(String.valueOf(elapsedSortTime))._().
_(formatTime(elapsedSortTime))._();
row.td().
br().$title(String.valueOf(elapsedReduceTime))._().
_(formatTime(elapsedReduceTime))._();
}
row.
td().
br().$title(String.valueOf(attemptElapsed))._().
_(formatTime(attemptElapsed))._().
td(".note", Joiner.on('\n').join(ta.getDiagnostics()));
row._();
}
tbody._()._();
TR<TFOOT<TABLE<Hamlet>>> footRow = tbody._().tfoot().tr();
footRow.
th().input("search_init").$type(InputType.text).
$name("attempt_name").$value("Attempt")._()._().
th().input("search_init").$type(InputType.text).
$name("attempt_state").$value("State")._()._().
th().input("search_init").$type(InputType.text).
$name("attempt_node").$value("Node")._()._().
th().input("search_init").$type(InputType.text).
$name("attempt_start_time").$value("Start Time")._()._();
if(type == TaskType.REDUCE) {
footRow.
th().input("search_init").$type(InputType.text).
$name("shuffle_time").$value("Shuffle Time")._()._();
footRow.
th().input("search_init").$type(InputType.text).
$name("merge_time").$value("Merge Time")._()._();
}
footRow.
th().input("search_init").$type(InputType.text).
$name("attempt_finish").$value("Finish Time")._()._();
if(type == TaskType.REDUCE) {
footRow.
th().input("search_init").$type(InputType.text).
$name("elapsed_shuffle_time").$value("Elapsed Shuffle Time")._()._();
footRow.
th().input("search_init").$type(InputType.text).
$name("elapsed_merge_time").$value("Elapsed Merge Time")._()._();
footRow.
th().input("search_init").$type(InputType.text).
$name("elapsed_reduce_time").$value("Elapsed Reduce Time")._()._();
}
footRow.
th().input("search_init").$type(InputType.text).
$name("attempt_elapsed").$value("Elapsed Time")._()._().
th().input("search_init").$type(InputType.text).
$name("note").$value("Note")._()._();
footRow._()._()._();
}
private String formatTime(long elapsed) {
return elapsed < 0 ? "N/A" : StringUtils.formatTime(elapsed);
}
/**
* @return true if this is a valid request else false.
*/
@ -134,6 +254,7 @@ protected Collection<TaskAttempt> getTaskAttempts() {
//Set up the java script and CSS for the attempts table
set(DATATABLES_ID, "attempts");
set(initID(DATATABLES, "attempts"), attemptsTableInit());
set(postInitID(DATATABLES, "attempts"), attemptsPostTableInit());
setTableStyles(html, "attempts");
}
@ -150,6 +271,49 @@ protected Collection<TaskAttempt> getTaskAttempts() {
* attempts table.
*/
private String attemptsTableInit() {
return tableInit().append("}").toString();
TaskType type = null;
String symbol = $(TASK_TYPE);
if (!symbol.isEmpty()) {
type = MRApps.taskType(symbol);
} else {
TaskId taskID = MRApps.toTaskID($(TASK_ID));
type = taskID.getTaskType();
}
StringBuilder b = tableInit().
append(",aoColumnDefs:[");
b.append("{'sType':'title-numeric', 'aTargets': [ 0");
if(type == TaskType.REDUCE) {
b.append(", 7, 8, 9, 10");
} else { //MAP
b.append(", 5");
}
b.append(" ] }");
b.append("]}");
return b.toString();
}
private String attemptsPostTableInit() {
return "var asInitVals = new Array();\n" +
"$('tfoot input').keyup( function () \n{"+
" attemptsDataTable.fnFilter( this.value, $('tfoot input').index(this) );\n"+
"} );\n"+
"$('tfoot input').each( function (i) {\n"+
" asInitVals[i] = this.value;\n"+
"} );\n"+
"$('tfoot input').focus( function () {\n"+
" if ( this.className == 'search_init' )\n"+
" {\n"+
" this.className = '';\n"+
" this.value = '';\n"+
" }\n"+
"} );\n"+
"$('tfoot input').blur( function (i) {\n"+
" if ( this.value == '' )\n"+
" {\n"+
" this.className = 'search_init';\n"+
" this.value = asInitVals[$('tfoot input').index(this)];\n"+
" }\n"+
"} );\n";
}
}

View File

@ -20,9 +20,11 @@
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.webapp.App;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.util.StringUtils;
@ -30,6 +32,10 @@
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TFOOT;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.THEAD;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.InputType;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.inject.Inject;
@ -59,27 +65,79 @@ public class HsTasksBlock extends HtmlBlock {
if (!symbol.isEmpty()) {
type = MRApps.taskType(symbol);
}
TBODY<TABLE<Hamlet>> tbody = html.
table("#tasks").
thead().
THEAD<TABLE<Hamlet>> thead = html.table("#tasks").thead();
//Create the spanning row
int attemptColSpan = type == TaskType.REDUCE ? 8 : 3;
thead.tr().
th().$colspan(5).$class("ui-state-default")._("Task")._().
th().$colspan(attemptColSpan).$class("ui-state-default").
_("Successful Attempt")._().
_();
TR<THEAD<TABLE<Hamlet>>> theadRow = thead.
tr().
th("Task").
th("Name").
th("State").
th("Start Time").
th("Finish Time").
th("Elapsed Time")._()._().
tbody();
th("Elapsed Time").
th("Start Time"); //Attempt
if(type == TaskType.REDUCE) {
theadRow.th("Shuffle Finish Time"); //Attempt
theadRow.th("Merge Finish Time"); //Attempt
}
theadRow.th("Finish Time"); //Attempt
if(type == TaskType.REDUCE) {
theadRow.th("Elapsed Time Shuffle"); //Attempt
theadRow.th("Elapsed Time Merge"); //Attempt
theadRow.th("Elapsed Time Reduce"); //Attempt
}
theadRow.th("Elapsed Time"); //Attempt
TBODY<TABLE<Hamlet>> tbody = theadRow._()._().tbody();
for (Task task : app.getJob().getTasks().values()) {
if (type != null && task.getType() != type) {
continue;
}
String tid = MRApps.toString(task.getID());
TaskReport report = task.getReport();
long startTime = report.getStartTime();
long finishTime = report.getFinishTime();
long elapsed = Times.elapsed(startTime, finishTime);
tbody.
tr().
long elapsed = Times.elapsed(startTime, finishTime, false);
long attemptStartTime = -1;
long shuffleFinishTime = -1;
long sortFinishTime = -1;
long attemptFinishTime = -1;
long elapsedShuffleTime = -1;
long elapsedSortTime = -1;;
long elapsedReduceTime = -1;
long attemptElapsed = -1;
TaskAttempt successful = getSuccessfulAttempt(task);
if(successful != null) {
attemptStartTime = successful.getLaunchTime();
attemptFinishTime = successful.getFinishTime();
if(type == TaskType.REDUCE) {
shuffleFinishTime = successful.getShuffleFinishTime();
sortFinishTime = successful.getSortFinishTime();
elapsedShuffleTime =
Times.elapsed(attemptStartTime, shuffleFinishTime, false);
elapsedSortTime =
Times.elapsed(shuffleFinishTime, sortFinishTime, false);
elapsedReduceTime =
Times.elapsed(sortFinishTime, attemptFinishTime, false);
}
attemptElapsed =
Times.elapsed(attemptStartTime, attemptFinishTime, false);
}
TR<TBODY<TABLE<Hamlet>>> row = tbody.tr();
row.
td().
br().$title(String.valueOf(task.getID().getId()))._(). // sorting
a(url("task", tid), tid)._().
@ -92,8 +150,86 @@ public class HsTasksBlock extends HtmlBlock {
_(Times.format(finishTime))._().
td().
br().$title(String.valueOf(elapsed))._().
_(StringUtils.formatTime(elapsed))._()._();
_(formatTime(elapsed))._().
td().
br().$title(String.valueOf(attemptStartTime))._().
_(Times.format(attemptStartTime))._();
if(type == TaskType.REDUCE) {
row.td().
br().$title(String.valueOf(shuffleFinishTime))._().
_(Times.format(shuffleFinishTime))._();
row.td().
br().$title(String.valueOf(sortFinishTime))._().
_(Times.format(sortFinishTime))._();
}
row.
td().
br().$title(String.valueOf(attemptFinishTime))._().
_(Times.format(attemptFinishTime))._();
if(type == TaskType.REDUCE) {
row.td().
br().$title(String.valueOf(elapsedShuffleTime))._().
_(formatTime(elapsedShuffleTime))._();
row.td().
br().$title(String.valueOf(elapsedSortTime))._().
_(formatTime(elapsedSortTime))._();
row.td().
br().$title(String.valueOf(elapsedReduceTime))._().
_(formatTime(elapsedReduceTime))._();
}
row.td().
br().$title(String.valueOf(attemptElapsed))._().
_(formatTime(attemptElapsed))._();
row._();
}
tbody._()._();
TR<TFOOT<TABLE<Hamlet>>> footRow = tbody._().tfoot().tr();
footRow.th().input("search_init").$type(InputType.text).$name("task")
.$value("ID")._()._().th().input("search_init").$type(InputType.text)
.$name("state").$value("State")._()._().th().input("search_init")
.$type(InputType.text).$name("start_time").$value("Start Time")._()._()
.th().input("search_init").$type(InputType.text).$name("finish_time")
.$value("Finish Time")._()._().th().input("search_init")
.$type(InputType.text).$name("elapsed_time").$value("Elapsed Time")._()
._().th().input("search_init").$type(InputType.text)
.$name("attempt_start_time").$value("Start Time")._()._();
if(type == TaskType.REDUCE) {
footRow.th().input("search_init").$type(InputType.text)
.$name("shuffle_time").$value("Shuffle Time")._()._();
footRow.th().input("search_init").$type(InputType.text)
.$name("merge_time").$value("Merge Time")._()._();
}
footRow.th().input("search_init").$type(InputType.text)
.$name("attempt_finish").$value("Finish Time")._()._();
if(type == TaskType.REDUCE) {
footRow.th().input("search_init").$type(InputType.text)
.$name("elapsed_shuffle_time").$value("Elapsed Shuffle Time")._()._();
footRow.th().input("search_init").$type(InputType.text)
.$name("elapsed_merge_time").$value("Elapsed Merge Time")._()._();
footRow.th().input("search_init").$type(InputType.text)
.$name("elapsed_reduce_time").$value("Elapsed Reduce Time")._()._();
}
footRow.th().input("search_init").$type(InputType.text)
.$name("attempt_elapsed").$value("Elapsed Time")._()._();
footRow._()._()._();
}
private String formatTime(long elapsed) {
return elapsed < 0 ? "N/A" : StringUtils.formatTime(elapsed);
}
private TaskAttempt getSuccessfulAttempt(Task task) {
for(TaskAttempt attempt: task.getAttempts().values()) {
if(attempt.getState() == TaskAttemptState.SUCCEEDED) {
return attempt;
}
}
return null;
}
}

View File

@ -18,12 +18,16 @@
package org.apache.hadoop.mapreduce.v2.hs.webapp;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.postInitID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.webapp.SubView;
/**
@ -40,9 +44,10 @@ public class HsTasksPage extends HsView {
set(DATATABLES_ID, "tasks");
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:1}");
set(initID(DATATABLES, "tasks"), tasksTableInit());
set(postInitID(DATATABLES, "tasks"), jobsPostTableInit());
setTableStyles(html, "tasks");
}
/**
* The content of this page is the TasksBlock
* @return HsTasksBlock.class
@ -56,9 +61,45 @@ public class HsTasksPage extends HsView {
* for the tasks table.
*/
private String tasksTableInit() {
return tableInit().
append(",aoColumns:[{sType:'title-numeric'},{sType:'title-numeric',").
append("bSearchable:false},null,{sType:'title-numeric'},").
append("{sType:'title-numeric'},{sType:'title-numeric'}]}").toString();
TaskType type = null;
String symbol = $(TASK_TYPE);
if (!symbol.isEmpty()) {
type = MRApps.taskType(symbol);
}
StringBuilder b = tableInit().
append(",aoColumnDefs:[");
b.append("{'sType':'title-numeric', 'aTargets': [ 0, 4");
if(type == TaskType.REDUCE) {
b.append(", 9, 10, 11, 12");
} else { //MAP
b.append(", 7");
}
b.append(" ] }");
b.append("]}");
return b.toString();
}
private String jobsPostTableInit() {
return "var asInitVals = new Array();\n" +
"$('tfoot input').keyup( function () \n{"+
" tasksDataTable.fnFilter( this.value, $('tfoot input').index(this) );\n"+
"} );\n"+
"$('tfoot input').each( function (i) {\n"+
" asInitVals[i] = this.value;\n"+
"} );\n"+
"$('tfoot input').focus( function () {\n"+
" if ( this.className == 'search_init' )\n"+
" {\n"+
" this.className = '';\n"+
" this.value = '';\n"+
" }\n"+
"} );\n"+
"$('tfoot input').blur( function (i) {\n"+
" if ( this.value == '' )\n"+
" {\n"+
" this.className = 'search_init';\n"+
" this.value = asInitVals[$('tfoot input').index(this)];\n"+
" }\n"+
"} );\n";
}
}

View File

@ -26,13 +26,18 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.webapp.AMParams;
import org.apache.hadoop.mapreduce.v2.app.webapp.TestAMWebApp;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -118,19 +123,27 @@ public long getStartTime() {
@Test public void testJobView() {
LOG.info("HsJobPage");
WebAppTests.testPage(HsJobPage.class, AppContext.class, new TestAppContext());
AppContext appContext = new TestAppContext();
Map<String, String> params = TestAMWebApp.getJobParams(appContext);
WebAppTests.testPage(HsJobPage.class, AppContext.class, appContext, params);
}
@Test public void testTasksView() {
@Test
public void testTasksView() {
LOG.info("HsTasksPage");
WebAppTests.testPage(HsTasksPage.class, AppContext.class,
new TestAppContext());
AppContext appContext = new TestAppContext();
Map<String, String> params = TestAMWebApp.getTaskParams(appContext);
WebAppTests.testPage(HsTasksPage.class, AppContext.class, appContext,
params);
}
@Test public void testTaskView() {
@Test
public void testTaskView() {
LOG.info("HsTaskPage");
WebAppTests.testPage(HsTaskPage.class, AppContext.class,
new TestAppContext());
AppContext appContext = new TestAppContext();
Map<String, String> params = TestAMWebApp.getTaskParams(appContext);
WebAppTests
.testPage(HsTaskPage.class, AppContext.class, appContext, params);
}
@Test public void testAttemptsWithJobView() {
@ -147,8 +160,10 @@ public long getStartTime() {
@Test public void testAttemptsView() {
LOG.info("HsAttemptsPage");
AppContext appContext = new TestAppContext();
Map<String, String> params = TestAMWebApp.getTaskParams(appContext);
WebAppTests.testPage(HsAttemptsPage.class, AppContext.class,
new TestAppContext());
appContext, params);
}
@Test public void testConfView() {

View File

@ -30,10 +30,18 @@ public class Times {
};
public static long elapsed(long started, long finished) {
return Times.elapsed(started, finished, true);
}
public static long elapsed(long started, long finished, boolean isRunning) {
if (finished > 0) {
return finished - started;
}
return started > 0 ? System.currentTimeMillis() - started : 0;
if (isRunning) {
return started > 0 ? System.currentTimeMillis() - started : 0;
} else {
return -1;
}
}
public static String format(long ts) {