MAPREDUCE-2996. Add uber-ness information to JobHistory. Contributed by Jonathan Eagles.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1177531 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0b3c654d83
commit
964d4a1666
@ -1481,6 +1481,9 @@ Release 0.23.0 - Unreleased
|
|||||||
MAPREDUCE-2791. Added missing info on 'job -status' output. (Devaraj K via
|
MAPREDUCE-2791. Added missing info on 'job -status' output. (Devaraj K via
|
||||||
acmurthy)
|
acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-2996. Add uber-ness information to JobHistory. (Jonathan Eagles
|
||||||
|
via acmurthy)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -1135,7 +1135,8 @@ public void transition(JobImpl job, JobEvent event) {
|
|||||||
new JobInitedEvent(job.oldJobId,
|
new JobInitedEvent(job.oldJobId,
|
||||||
job.startTime,
|
job.startTime,
|
||||||
job.numMapTasks, job.numReduceTasks,
|
job.numMapTasks, job.numReduceTasks,
|
||||||
job.getState().toString()); //Will transition to state running. Currently in INITED
|
job.getState().toString(),
|
||||||
|
job.isUber()); //Will transition to state running. Currently in INITED
|
||||||
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
|
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
|
||||||
JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
|
JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
|
||||||
job.submitTime, job.startTime);
|
job.submitTime, job.startTime);
|
||||||
|
@ -64,7 +64,8 @@
|
|||||||
{"name": "launchTime", "type": "long"},
|
{"name": "launchTime", "type": "long"},
|
||||||
{"name": "totalMaps", "type": "int"},
|
{"name": "totalMaps", "type": "int"},
|
||||||
{"name": "totalReduces", "type": "int"},
|
{"name": "totalReduces", "type": "int"},
|
||||||
{"name": "jobStatus", "type": "string"}
|
{"name": "jobStatus", "type": "string"},
|
||||||
|
{"name": "uberized", "type": "boolean"}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
|
||||||
|
@ -302,6 +302,7 @@ private void handleJobInitedEvent(JobInitedEvent event) {
|
|||||||
info.launchTime = event.getLaunchTime();
|
info.launchTime = event.getLaunchTime();
|
||||||
info.totalMaps = event.getTotalMaps();
|
info.totalMaps = event.getTotalMaps();
|
||||||
info.totalReduces = event.getTotalReduces();
|
info.totalReduces = event.getTotalReduces();
|
||||||
|
info.uberized = event.getUberized();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleJobInfoChangeEvent(JobInfoChangeEvent event) {
|
private void handleJobInfoChangeEvent(JobInfoChangeEvent event) {
|
||||||
@ -346,6 +347,7 @@ public static class JobInfo {
|
|||||||
Map<JobACL, AccessControlList> jobACLs;
|
Map<JobACL, AccessControlList> jobACLs;
|
||||||
|
|
||||||
Map<TaskID, TaskInfo> tasksMap;
|
Map<TaskID, TaskInfo> tasksMap;
|
||||||
|
boolean uberized;
|
||||||
|
|
||||||
/** Create a job info object where job information will be stored
|
/** Create a job info object where job information will be stored
|
||||||
* after a parse
|
* after a parse
|
||||||
@ -373,7 +375,8 @@ public void printAll() {
|
|||||||
System.out.println("MAP_COUNTERS:" + mapCounters.toString());
|
System.out.println("MAP_COUNTERS:" + mapCounters.toString());
|
||||||
System.out.println("REDUCE_COUNTERS:" + reduceCounters.toString());
|
System.out.println("REDUCE_COUNTERS:" + reduceCounters.toString());
|
||||||
System.out.println("TOTAL_COUNTERS: " + totalCounters.toString());
|
System.out.println("TOTAL_COUNTERS: " + totalCounters.toString());
|
||||||
|
System.out.println("UBERIZED: " + uberized);
|
||||||
|
|
||||||
for (TaskInfo ti: tasksMap.values()) {
|
for (TaskInfo ti: tasksMap.values()) {
|
||||||
ti.printAll();
|
ti.printAll();
|
||||||
}
|
}
|
||||||
@ -421,6 +424,8 @@ public void printAll() {
|
|||||||
/** @return the priority of this job */
|
/** @return the priority of this job */
|
||||||
public String getPriority() { return priority.toString(); }
|
public String getPriority() { return priority.toString(); }
|
||||||
public Map<JobACL, AccessControlList> getJobACLs() { return jobACLs; }
|
public Map<JobACL, AccessControlList> getJobACLs() { return jobACLs; }
|
||||||
|
/** @return the uberized status of this job */
|
||||||
|
public boolean getUberized() { return uberized; }
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -42,14 +42,16 @@ public class JobInitedEvent implements HistoryEvent {
|
|||||||
* @param totalMaps
|
* @param totalMaps
|
||||||
* @param totalReduces
|
* @param totalReduces
|
||||||
* @param jobStatus
|
* @param jobStatus
|
||||||
|
* @param uberized True if the job's map and reduce stages were combined
|
||||||
*/
|
*/
|
||||||
public JobInitedEvent(JobID id, long launchTime, int totalMaps,
|
public JobInitedEvent(JobID id, long launchTime, int totalMaps,
|
||||||
int totalReduces, String jobStatus) {
|
int totalReduces, String jobStatus, boolean uberized) {
|
||||||
datum.jobid = new Utf8(id.toString());
|
datum.jobid = new Utf8(id.toString());
|
||||||
datum.launchTime = launchTime;
|
datum.launchTime = launchTime;
|
||||||
datum.totalMaps = totalMaps;
|
datum.totalMaps = totalMaps;
|
||||||
datum.totalReduces = totalReduces;
|
datum.totalReduces = totalReduces;
|
||||||
datum.jobStatus = new Utf8(jobStatus);
|
datum.jobStatus = new Utf8(jobStatus);
|
||||||
|
datum.uberized = uberized;
|
||||||
}
|
}
|
||||||
|
|
||||||
JobInitedEvent() { }
|
JobInitedEvent() { }
|
||||||
@ -67,9 +69,10 @@ public JobInitedEvent(JobID id, long launchTime, int totalMaps,
|
|||||||
public int getTotalReduces() { return datum.totalReduces; }
|
public int getTotalReduces() { return datum.totalReduces; }
|
||||||
/** Get the status */
|
/** Get the status */
|
||||||
public String getStatus() { return datum.jobStatus.toString(); }
|
public String getStatus() { return datum.jobStatus.toString(); }
|
||||||
/** Get the event type */
|
/** Get the event type */
|
||||||
public EventType getEventType() {
|
public EventType getEventType() {
|
||||||
return EventType.JOB_INITED;
|
return EventType.JOB_INITED;
|
||||||
}
|
}
|
||||||
|
/** Get whether the job's map and reduce stages were combined */
|
||||||
|
public boolean getUberized() { return datum.uberized; }
|
||||||
}
|
}
|
||||||
|
@ -290,8 +290,7 @@ public int getTotalReduces() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isUber() {
|
public boolean isUber() {
|
||||||
LOG.warn("isUber is not yet implemented");
|
return jobInfo.getUberized();
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -95,6 +95,8 @@ public void testHistoryParsing() throws Exception {
|
|||||||
2, jobInfo.getFinishedMaps());
|
2, jobInfo.getFinishedMaps());
|
||||||
Assert.assertEquals("incorrect finishedReduces ",
|
Assert.assertEquals("incorrect finishedReduces ",
|
||||||
1, jobInfo.getFinishedReduces());
|
1, jobInfo.getFinishedReduces());
|
||||||
|
Assert.assertEquals("incorrect uberized ",
|
||||||
|
job.isUber(), jobInfo.getUberized());
|
||||||
int totalTasks = jobInfo.getAllTasks().size();
|
int totalTasks = jobInfo.getAllTasks().size();
|
||||||
Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks);
|
Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks);
|
||||||
|
|
||||||
|
@ -697,7 +697,8 @@ public synchronized void initTasks()
|
|||||||
JobInitedEvent jie = new JobInitedEvent(
|
JobInitedEvent jie = new JobInitedEvent(
|
||||||
profile.getJobID(), this.launchTime,
|
profile.getJobID(), this.launchTime,
|
||||||
numMapTasks, numReduceTasks,
|
numMapTasks, numReduceTasks,
|
||||||
JobStatus.getJobRunState(JobStatus.PREP));
|
JobStatus.getJobRunState(JobStatus.PREP),
|
||||||
|
false);
|
||||||
|
|
||||||
jobHistory.logEvent(jie, jobId);
|
jobHistory.logEvent(jie, jobId);
|
||||||
|
|
||||||
|
@ -125,10 +125,12 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
|
|||||||
String status = line.get("JOB_STATUS");
|
String status = line.get("JOB_STATUS");
|
||||||
String totalMaps = line.get("TOTAL_MAPS");
|
String totalMaps = line.get("TOTAL_MAPS");
|
||||||
String totalReduces = line.get("TOTAL_REDUCES");
|
String totalReduces = line.get("TOTAL_REDUCES");
|
||||||
|
String uberized = line.get("UBERIZED");
|
||||||
|
|
||||||
if (launchTime != null && totalMaps != null && totalReduces != null) {
|
if (launchTime != null && totalMaps != null && totalReduces != null) {
|
||||||
return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
|
return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
|
||||||
.parseInt(totalMaps), Integer.parseInt(totalReduces), status);
|
.parseInt(totalMaps), Integer.parseInt(totalReduces), status,
|
||||||
|
Boolean.parseBoolean(uberized));
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
|
Loading…
Reference in New Issue
Block a user