MAPREDUCE-3349. Log rack-name in JobHistory for unsuccessful tasks. (Devaraj K and Amar Kamat via amarrk)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1221578 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Amar Kamat 2011-12-21 02:58:00 +00:00
parent 45620eee68
commit 264d3b7dd0
20 changed files with 348 additions and 63 deletions

View File

@ -47,6 +47,9 @@ Trunk (unreleased changes)
PB and Avro can all use it (Sanjay) PB and Avro can all use it (Sanjay)
BUG FIXES BUG FIXES
MAPREDUCE-3349. Log rack-name in JobHistory for unsuccessful tasks.
(Devaraj K and Amar Kamat via amarrk)
MAPREDUCE-3412. Fix 'ant docs'. (amarrk) MAPREDUCE-3412. Fix 'ant docs'. (amarrk)
MAPREDUCE-3346. [Rumen] LoggedTaskAttempt#getHostName() returns null. MAPREDUCE-3346. [Rumen] LoggedTaskAttempt#getHostName() returns null.

View File

@ -926,6 +926,8 @@ private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
: taskAttempt.containerNodeId.getHost(), : taskAttempt.containerNodeId.getHost(),
taskAttempt.containerNodeId == null ? -1 taskAttempt.containerNodeId == null ? -1
: taskAttempt.containerNodeId.getPort(), : taskAttempt.containerNodeId.getPort(),
taskAttempt.nodeRackName == null ? "UNKNOWN"
: taskAttempt.nodeRackName,
StringUtils.join( StringUtils.join(
LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
.getProgressSplitBlock().burst()); .getProgressSplitBlock().burst());

View File

@ -175,6 +175,7 @@
{"name": "taskType", "type": "string"}, {"name": "taskType", "type": "string"},
{"name": "taskStatus", "type": "string"}, {"name": "taskStatus", "type": "string"},
{"name": "finishTime", "type": "long"}, {"name": "finishTime", "type": "long"},
{"name": "rackname", "type": "string"},
{"name": "hostname", "type": "string"}, {"name": "hostname", "type": "string"},
{"name": "state", "type": "string"}, {"name": "state", "type": "string"},
{"name": "counters", "type": "JhCounters"} {"name": "counters", "type": "JhCounters"}
@ -202,6 +203,7 @@
{"name": "finishTime", "type": "long"}, {"name": "finishTime", "type": "long"},
{"name": "hostname", "type": "string"}, {"name": "hostname", "type": "string"},
{"name": "port", "type": "int"}, {"name": "port", "type": "int"},
{"name": "rackname", "type": "string"},
{"name": "status", "type": "string"}, {"name": "status", "type": "string"},
{"name": "error", "type": "string"}, {"name": "error", "type": "string"},
{"name": "clockSplits", "type": { "type": "array", "items": "int"}}, {"name": "clockSplits", "type": { "type": "array", "items": "int"}},

View File

@ -224,7 +224,7 @@ private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
attemptInfo.counters = event.getCounters(); attemptInfo.counters = event.getCounters();
attemptInfo.hostname = event.getHostname(); attemptInfo.hostname = event.getHostname();
attemptInfo.port = event.getPort(); attemptInfo.port = event.getPort();
attemptInfo.rackname = event.getRackname(); attemptInfo.rackname = event.getRackName();
} }
private void handleTaskAttemptFailedEvent( private void handleTaskAttemptFailedEvent(
@ -237,6 +237,7 @@ private void handleTaskAttemptFailedEvent(
attemptInfo.status = event.getTaskStatus(); attemptInfo.status = event.getTaskStatus();
attemptInfo.hostname = event.getHostname(); attemptInfo.hostname = event.getHostname();
attemptInfo.port = event.getPort(); attemptInfo.port = event.getPort();
attemptInfo.rackname = event.getRackName();
attemptInfo.shuffleFinishTime = event.getFinishTime(); attemptInfo.shuffleFinishTime = event.getFinishTime();
attemptInfo.sortFinishTime = event.getFinishTime(); attemptInfo.sortFinishTime = event.getFinishTime();
attemptInfo.mapFinishTime = event.getFinishTime(); attemptInfo.mapFinishTime = event.getFinishTime();

View File

@ -68,7 +68,10 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
datum.finishTime = finishTime; datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname); datum.hostname = new Utf8(hostname);
datum.port = port; datum.port = port;
datum.rackname = new Utf8(rackName); // This is needed for reading old jh files
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.state = new Utf8(state); datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters); datum.counters = EventWriter.toAvro(counters);
@ -139,8 +142,12 @@ public TaskType getTaskType() {
public String getHostname() { return datum.hostname.toString(); } public String getHostname() { return datum.hostname.toString(); }
/** Get the tracker rpc port */ /** Get the tracker rpc port */
public int getPort() { return datum.port; } public int getPort() { return datum.port; }
/** Get the rack name */ /** Get the rack name */
public String getRackname() { return datum.rackname.toString(); } public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
}
/** Get the state string */ /** Get the state string */
public String getState() { return datum.state.toString(); } public String getState() { return datum.state.toString(); }
/** Get the counters */ /** Get the counters */

View File

@ -69,7 +69,9 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
datum.finishTime = finishTime; datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname); datum.hostname = new Utf8(hostname);
datum.port = port; datum.port = port;
datum.rackname = new Utf8(rackName); if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.state = new Utf8(state); datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters); datum.counters = EventWriter.toAvro(counters);
@ -142,8 +144,12 @@ public TaskType getTaskType() {
public String getHostname() { return datum.hostname.toString(); } public String getHostname() { return datum.hostname.toString(); }
/** Get the tracker rpc port */ /** Get the tracker rpc port */
public int getPort() { return datum.port; } public int getPort() { return datum.port; }
/** Get the rack name of the node where the attempt ran */ /** Get the rack name of the node where the attempt ran */
public String getRackName() { return datum.rackname.toString(); } public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
}
/** Get the state string */ /** Get the state string */
public String getState() { return datum.state.toString(); } public String getState() { return datum.state.toString(); }
/** Get the counters for the attempt */ /** Get the counters for the attempt */

View File

@ -51,13 +51,16 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
*/ */
public TaskAttemptFinishedEvent(TaskAttemptID id, public TaskAttemptFinishedEvent(TaskAttemptID id,
TaskType taskType, String taskStatus, TaskType taskType, String taskStatus,
long finishTime, long finishTime, String rackName,
String hostname, String state, Counters counters) { String hostname, String state, Counters counters) {
datum.taskid = new Utf8(id.getTaskID().toString()); datum.taskid = new Utf8(id.getTaskID().toString());
datum.attemptId = new Utf8(id.toString()); datum.attemptId = new Utf8(id.toString());
datum.taskType = new Utf8(taskType.name()); datum.taskType = new Utf8(taskType.name());
datum.taskStatus = new Utf8(taskStatus); datum.taskStatus = new Utf8(taskStatus);
datum.finishTime = finishTime; datum.finishTime = finishTime;
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.hostname = new Utf8(hostname); datum.hostname = new Utf8(hostname);
datum.state = new Utf8(state); datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters); datum.counters = EventWriter.toAvro(counters);
@ -86,6 +89,12 @@ public TaskType getTaskType() {
public long getFinishTime() { return datum.finishTime; } public long getFinishTime() { return datum.finishTime; }
/** Get the host where the attempt executed */ /** Get the host where the attempt executed */
public String getHostname() { return datum.hostname.toString(); } public String getHostname() { return datum.hostname.toString(); }
/** Get the rackname where the attempt executed */
public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
}
/** Get the state string */ /** Get the state string */
public String getState() { return datum.state.toString(); } public String getState() { return datum.state.toString(); }
/** Get the counters for the attempt */ /** Get the counters for the attempt */

View File

@ -47,6 +47,7 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
* @param finishTime Finish time of the attempt * @param finishTime Finish time of the attempt
* @param hostname Name of the host where the attempt executed * @param hostname Name of the host where the attempt executed
* @param port rpc port for for the tracker * @param port rpc port for for the tracker
* @param rackName Name of the rack where the attempt executed
* @param error Error string * @param error Error string
* @param allSplits the "splits", or a pixelated graph of various * @param allSplits the "splits", or a pixelated graph of various
* measurable worker node state variables against progress. * measurable worker node state variables against progress.
@ -56,13 +57,16 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
public TaskAttemptUnsuccessfulCompletionEvent public TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID id, TaskType taskType, (TaskAttemptID id, TaskType taskType,
String status, long finishTime, String status, long finishTime,
String hostname, int port, String error, String hostname, int port, String rackName,
int[][] allSplits) { String error, int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString()); datum.taskid = new Utf8(id.getTaskID().toString());
datum.taskType = new Utf8(taskType.name()); datum.taskType = new Utf8(taskType.name());
datum.attemptId = new Utf8(id.toString()); datum.attemptId = new Utf8(id.toString());
datum.finishTime = finishTime; datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname); datum.hostname = new Utf8(hostname);
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.port = port; datum.port = port;
datum.error = new Utf8(error); datum.error = new Utf8(error);
datum.status = new Utf8(status); datum.status = new Utf8(status);
@ -99,7 +103,7 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
(TaskAttemptID id, TaskType taskType, (TaskAttemptID id, TaskType taskType,
String status, long finishTime, String status, long finishTime,
String hostname, String error) { String hostname, String error) {
this(id, taskType, status, finishTime, hostname, -1, error, null); this(id, taskType, status, finishTime, hostname, -1, null, error, null);
} }
TaskAttemptUnsuccessfulCompletionEvent() {} TaskAttemptUnsuccessfulCompletionEvent() {}
@ -125,6 +129,12 @@ public TaskAttemptID getTaskAttemptId() {
public String getHostname() { return datum.hostname.toString(); } public String getHostname() { return datum.hostname.toString(); }
/** Get the rpc port for the host where the attempt executed */ /** Get the rpc port for the host where the attempt executed */
public int getPort() { return datum.port; } public int getPort() { return datum.port; }
/** Get the rack name of the node where the attempt ran */
public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
}
/** Get the error string */ /** Get the error string */
public String getError() { return datum.error.toString(); } public String getError() { return datum.error.toString(); }
/** Get the task status */ /** Get the task status */

View File

@ -44,10 +44,13 @@
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory; import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@ -62,10 +65,12 @@
public class TestJobHistoryParsing { public class TestJobHistoryParsing {
private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class); private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
private static final String RACK_NAME = "/MyRackName";
public static class MyResolver implements DNSToSwitchMapping { public static class MyResolver implements DNSToSwitchMapping {
@Override @Override
public List<String> resolve(List<String> names) { public List<String> resolve(List<String> names) {
return Arrays.asList(new String[]{"/MyRackName"}); return Arrays.asList(new String[]{RACK_NAME});
} }
} }
@ -172,7 +177,7 @@ public void testHistoryParsing() throws Exception {
// Verify rack-name // Verify rack-name
Assert.assertEquals("rack-name is incorrect", taskAttemptInfo Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
.getRackname(), "/MyRackName"); .getRackname(), RACK_NAME);
} }
} }
@ -218,8 +223,88 @@ public void testHistoryParsing() throws Exception {
jobSummaryElements.get("status")); jobSummaryElements.get("status"));
} }
@Test
public void testHistoryParsingForFailedAttempts() throws Exception {
Configuration conf = new Configuration();
conf
.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this.getClass().getName(),
true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
String jobhistoryDir = JobHistoryUtils
.getHistoryIntermediateDoneDirForUser(conf);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId)
.getJobIndexInfo();
String jobhistoryFileName = FileNameIndexUtils
.getDoneFileName(jobIndexInfo);
Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
FSDataInputStream in = null;
FileContext fc = null;
try {
fc = FileContext.getFileContext(conf);
in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) {
LOG.info("Can not open history file: " + historyFilePath, ioe);
throw (new Exception("Can not open History File"));
}
JobHistoryParser parser = new JobHistoryParser(in);
JobInfo jobInfo = parser.parse();
int noOffailedAttempts = 0;
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
for (Task task : job.getTasks().values()) {
TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
for (TaskAttempt taskAttempt : task.getAttempts().values()) {
TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
TypeConverter.fromYarn((taskAttempt.getID())));
// Verify rack-name for all task attempts
Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
.getRackname(), RACK_NAME);
if (taskAttemptInfo.getTaskStatus().equals("FAILED")) {
noOffailedAttempts++;
}
}
}
Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts);
}
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@SuppressWarnings("unchecked")
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
}
}
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
TestJobHistoryParsing t = new TestJobHistoryParsing(); TestJobHistoryParsing t = new TestJobHistoryParsing();
t.testHistoryParsing(); t.testHistoryParsing();
t.testHistoryParsingForFailedAttempts();
} }
} }

View File

@ -2671,7 +2671,9 @@ public synchronized boolean completedTask(TaskInProgress tip,
// Update jobhistory // Update jobhistory
TaskTrackerStatus ttStatus = TaskTrackerStatus ttStatus =
this.jobtracker.getTaskTrackerStatus(status.getTaskTracker()); this.jobtracker.getTaskTrackerStatus(status.getTaskTracker());
String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString(); Node node = jobtracker.getNode(ttStatus.getHost());
String trackerHostname = node.getName();
String trackerRackName = node.getParent().getName();
TaskType taskType = getTaskType(tip); TaskType taskType = getTaskType(tip);
TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent( TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
@ -2685,7 +2687,7 @@ public synchronized boolean completedTask(TaskInProgress tip,
MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent( MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent(
statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(), statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
status.getMapFinishTime(), status.getMapFinishTime(),
status.getFinishTime(), trackerHostname, -1, "", status.getFinishTime(), trackerHostname, -1, trackerRackName,
status.getStateString(), status.getStateString(),
new org.apache.hadoop.mapreduce.Counters(status.getCounters()), new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
tip.getSplits(statusAttemptID).burst() tip.getSplits(statusAttemptID).burst()
@ -2698,7 +2700,7 @@ public synchronized boolean completedTask(TaskInProgress tip,
statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(), statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
status.getShuffleFinishTime(), status.getShuffleFinishTime(),
status.getSortFinishTime(), status.getFinishTime(), status.getSortFinishTime(), status.getFinishTime(),
trackerHostname, -1, "", status.getStateString(), trackerHostname, -1, trackerRackName, status.getStateString(),
new org.apache.hadoop.mapreduce.Counters(status.getCounters()), new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
tip.getSplits(statusAttemptID).burst() tip.getSplits(statusAttemptID).burst()
); );
@ -3208,7 +3210,7 @@ private void failedTask(TaskInProgress tip, TaskAttemptID taskid,
(taskid, (taskid,
taskType, taskStatus.getRunState().toString(), taskType, taskStatus.getRunState().toString(),
finishTime, finishTime,
taskTrackerHostName, -1, diagInfo, taskTrackerHostName, -1, null, diagInfo,
splits.burst()); splits.burst());
jobHistory.logEvent(tue, taskid.getJobID()); jobHistory.logEvent(tue, taskid.getJobID());

View File

@ -83,7 +83,7 @@ private static void testFailedKilledEventsForTypes(EventType expected,
for (TaskType t : types) { for (TaskType t : types) {
TaskAttemptUnsuccessfulCompletionEvent tauce = TaskAttemptUnsuccessfulCompletionEvent tauce =
new TaskAttemptUnsuccessfulCompletionEvent new TaskAttemptUnsuccessfulCompletionEvent
(id, t, state, 0L, "", -1, "", NULL_SPLITS_ARRAY); (id, t, state, 0L, "", -1, "", "", NULL_SPLITS_ARRAY);
assertEquals(expected, tauce.getEventType()); assertEquals(expected, tauce.getEventType());
} }
} }
@ -132,7 +132,8 @@ private static void testFinishedEventsForTypes(EventType expected,
for (TaskType t : types) { for (TaskType t : types) {
TaskAttemptFinishedEvent tafe = TaskAttemptFinishedEvent tafe =
new TaskAttemptFinishedEvent(id, t, new TaskAttemptFinishedEvent(id, t,
TaskStatus.State.SUCCEEDED.toString(), 0L, "", "", new Counters()); TaskStatus.State.SUCCEEDED.toString(), 0L, "", "", "",
new Counters());
assertEquals(expected, tafe.getEventType()); assertEquals(expected, tafe.getEventType());
} }
} }

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat; import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory; import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
@ -49,6 +50,9 @@
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.tools.rumen.TraceBuilder.MyOptions; import org.apache.hadoop.tools.rumen.TraceBuilder.MyOptions;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -246,8 +250,10 @@ public void testHadoop20JHParser() throws Exception {
} }
/** /**
* Validate the parsing of given history file name. Also validate the history * Validate the parsing of given history file name.
* file name suffixed with old/stale file suffix. *
* TODO: Also validate the history file name suffixed with old/stale file
* suffix.
* @param jhFileName job history file path * @param jhFileName job history file path
* @param jid JobID * @param jid JobID
*/ */
@ -257,13 +263,7 @@ private void validateHistoryFileNameParsing(Path jhFileName,
JobID.forName(JobHistoryUtils.extractJobID(jhFileName.getName())); JobID.forName(JobHistoryUtils.extractJobID(jhFileName.getName()));
assertEquals("TraceBuilder failed to parse the current JH filename" assertEquals("TraceBuilder failed to parse the current JH filename"
+ jhFileName, jid, extractedJID); + jhFileName, jid, extractedJID);
// test jobhistory filename with old/stale file suffix //TODO test jobhistory filename with old/stale file suffix
jhFileName = jhFileName.suffix(JobHistory.getOldFileSuffix("123"));
extractedJID =
JobID.forName(JobHistoryUtils.extractJobID(jhFileName.getName()));
assertEquals("TraceBuilder failed to parse the current JH filename"
+ "(old-suffix):" + jhFileName,
jid, extractedJID);
} }
/** /**
@ -318,8 +318,9 @@ public void testJobHistoryFilenameParsing() throws IOException {
.makeQualified(lfs.getUri(), lfs.getWorkingDirectory()); .makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
// Check if current jobhistory filenames are detected properly // Check if current jobhistory filenames are detected properly
Path jhFilename = org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils JobId jobId = TypeConverter.toYarn(jid);
.getStagingJobHistoryFile(rootInputDir, jid.toString(), 1); JobIndexInfo info = new JobIndexInfo(0L, 0L, "", "", jobId, 0, 0, "");
Path jhFilename = new Path(FileNameIndexUtils.getDoneFileName(info));
validateHistoryFileNameParsing(jhFilename, jid); validateHistoryFileNameParsing(jhFilename, jid);
// Check if Pre21 V1 jophistory file names are detected properly // Check if Pre21 V1 jophistory file names are detected properly
@ -932,18 +933,18 @@ public void testTopologyBuilder() throws Exception {
subject.process(new TaskAttemptFinishedEvent(TaskAttemptID subject.process(new TaskAttemptFinishedEvent(TaskAttemptID
.forName("attempt_200904211745_0003_m_000004_0"), TaskType .forName("attempt_200904211745_0003_m_000004_0"), TaskType
.valueOf("MAP"), "STATUS", 1234567890L, .valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.64/cluster50261\\.secondleveldomain\\.com", "/194\\.6\\.134\\.64", "cluster50261\\.secondleveldomain\\.com",
"SUCCESS", null)); "SUCCESS", null));
subject.process(new TaskAttemptUnsuccessfulCompletionEvent subject.process(new TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID.forName("attempt_200904211745_0003_m_000004_1"), (TaskAttemptID.forName("attempt_200904211745_0003_m_000004_1"),
TaskType.valueOf("MAP"), "STATUS", 1234567890L, TaskType.valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com", "cluster50262\\.secondleveldomain\\.com",
-1, "MACHINE_EXPLODED", splits)); -1, "/194\\.6\\.134\\.80", "MACHINE_EXPLODED", splits));
subject.process(new TaskAttemptUnsuccessfulCompletionEvent subject.process(new TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID.forName("attempt_200904211745_0003_m_000004_2"), (TaskAttemptID.forName("attempt_200904211745_0003_m_000004_2"),
TaskType.valueOf("MAP"), "STATUS", 1234567890L, TaskType.valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com", "cluster50263\\.secondleveldomain\\.com",
-1, "MACHINE_EXPLODED", splits)); -1, "/194\\.6\\.134\\.80", "MACHINE_EXPLODED", splits));
subject.process(new TaskStartedEvent(TaskID subject.process(new TaskStartedEvent(TaskID
.forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType .forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType
.valueOf("MAP"), .valueOf("MAP"),

View File

@ -5,6 +5,9 @@
"children" : [ { "children" : [ {
"name" : "cluster50213\\.secondleveldomain\\.com", "name" : "cluster50213\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50235\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50226\\.secondleveldomain\\.com", "name" : "cluster50226\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -20,6 +23,9 @@
}, { }, {
"name" : "cluster50231\\.secondleveldomain\\.com", "name" : "cluster50231\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50223\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50232\\.secondleveldomain\\.com", "name" : "cluster50232\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -98,12 +104,18 @@
}, { }, {
"name" : "cluster1236\\.secondleveldomain\\.com", "name" : "cluster1236\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1232\\.secondleveldomain\\.com",
"children" : null
} ] } ]
}, { }, {
"name" : "194\\.6\\.134\\.64", "name" : "194\\.6\\.134\\.64",
"children" : [ { "children" : [ {
"name" : "cluster50317\\.secondleveldomain\\.com", "name" : "cluster50317\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50283\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50292\\.secondleveldomain\\.com", "name" : "cluster50292\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -146,6 +158,9 @@
}, { }, {
"name" : "cluster50316\\.secondleveldomain\\.com", "name" : "cluster50316\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50303\\.secondleveldomain\\.com",
"children" : null
} ] } ]
}, { }, {
"name" : "194\\.6\\.129\\.128", "name" : "194\\.6\\.129\\.128",
@ -431,6 +446,9 @@
}, { }, {
"name" : "cluster50120\\.secondleveldomain\\.com", "name" : "cluster50120\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50132\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50130\\.secondleveldomain\\.com", "name" : "cluster50130\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -566,9 +584,15 @@
}, { }, {
"name" : "cluster50166\\.secondleveldomain\\.com", "name" : "cluster50166\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50173\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50170\\.secondleveldomain\\.com", "name" : "cluster50170\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50189\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50179\\.secondleveldomain\\.com", "name" : "cluster50179\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -578,6 +602,21 @@
"children" : [ { "children" : [ {
"name" : "cluster1283\\.secondleveldomain\\.com", "name" : "cluster1283\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1295\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1302\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1294\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1310\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1305\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster1299\\.secondleveldomain\\.com", "name" : "cluster1299\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -587,20 +626,14 @@
}, { }, {
"name" : "cluster1288\\.secondleveldomain\\.com", "name" : "cluster1288\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1302\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1294\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster1289\\.secondleveldomain\\.com", "name" : "cluster1289\\.secondleveldomain\\.com",
"children" : null "children" : null
}, { }, {
"name" : "cluster1315\\.secondleveldomain\\.com", "name" : "cluster1314\\.secondleveldomain\\.com",
"children" : null "children" : null
}, { }, {
"name" : "cluster1305\\.secondleveldomain\\.com", "name" : "cluster1315\\.secondleveldomain\\.com",
"children" : null "children" : null
}, { }, {
"name" : "cluster1316\\.secondleveldomain\\.com", "name" : "cluster1316\\.secondleveldomain\\.com",
@ -662,6 +695,9 @@
}, { }, {
"name" : "cluster3054\\.secondleveldomain\\.com", "name" : "cluster3054\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster3064\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster3077\\.secondleveldomain\\.com", "name" : "cluster3077\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -695,6 +731,9 @@
"children" : [ { "children" : [ {
"name" : "cluster50468\\.secondleveldomain\\.com", "name" : "cluster50468\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50445\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50476\\.secondleveldomain\\.com", "name" : "cluster50476\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -785,6 +824,9 @@
}, { }, {
"name" : "cluster50493\\.secondleveldomain\\.com", "name" : "cluster50493\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50511\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50510\\.secondleveldomain\\.com", "name" : "cluster50510\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -1100,6 +1142,9 @@
}, { }, {
"name" : "cluster1907\\.secondleveldomain\\.com", "name" : "cluster1907\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1917\\.secondleveldomain\\.com",
"children" : null
} ] } ]
}, { }, {
"name" : "192\\.30\\.63\\.192", "name" : "192\\.30\\.63\\.192",
@ -1223,6 +1268,9 @@
}, { }, {
"name" : "cluster1446\\.secondleveldomain\\.com", "name" : "cluster1446\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1440\\.secondleveldomain\\.com",
"children" : null
} ] } ]
}, { }, {
"name" : "194\\.6\\.132\\.128", "name" : "194\\.6\\.132\\.128",
@ -1238,6 +1286,9 @@
}, { }, {
"name" : "cluster50025\\.secondleveldomain\\.com", "name" : "cluster50025\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50024\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50021\\.secondleveldomain\\.com", "name" : "cluster50021\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -1292,6 +1343,9 @@
}, { }, {
"name" : "cluster50348\\.secondleveldomain\\.com", "name" : "cluster50348\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50346\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50325\\.secondleveldomain\\.com", "name" : "cluster50325\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -1379,6 +1433,9 @@
}, { }, {
"name" : "cluster1662\\.secondleveldomain\\.com", "name" : "cluster1662\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1647\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster1649\\.secondleveldomain\\.com", "name" : "cluster1649\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -1430,6 +1487,9 @@
}, { }, {
"name" : "cluster1503\\.secondleveldomain\\.com", "name" : "cluster1503\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1514\\.secondleveldomain\\.com",
"children" : null
} ] } ]
}, { }, {
"name" : "194\\.6\\.129\\.0", "name" : "194\\.6\\.129\\.0",
@ -1439,6 +1499,9 @@
}, { }, {
"name" : "cluster50539\\.secondleveldomain\\.com", "name" : "cluster50539\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50533\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50530\\.secondleveldomain\\.com", "name" : "cluster50530\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -1475,6 +1538,9 @@
}, { }, {
"name" : "cluster50418\\.secondleveldomain\\.com", "name" : "cluster50418\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50406\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50411\\.secondleveldomain\\.com", "name" : "cluster50411\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -1527,6 +1593,9 @@
}, { }, {
"name" : "194\\.6\\.128\\.64", "name" : "194\\.6\\.128\\.64",
"children" : [ { "children" : [ {
"name" : "cluster1613\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1639\\.secondleveldomain\\.com", "name" : "cluster1639\\.secondleveldomain\\.com",
"children" : null "children" : null
}, { }, {
@ -1574,6 +1643,9 @@
}, { }, {
"name" : "cluster1602\\.secondleveldomain\\.com", "name" : "cluster1602\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1627\\.secondleveldomain\\.com",
"children" : null
} ] } ]
}, { }, {
"name" : "194\\.6\\.132\\.192", "name" : "194\\.6\\.132\\.192",
@ -1661,6 +1733,9 @@
}, { }, {
"name" : "cluster1736\\.secondleveldomain\\.com", "name" : "cluster1736\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1735\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster1722\\.secondleveldomain\\.com", "name" : "cluster1722\\.secondleveldomain\\.com",
"children" : null "children" : null

View File

@ -1308,6 +1308,8 @@ private void processMapAttemptLine(ParsedLine line) {
if (host != null) { if (host != null) {
attempt.setHostName(host.getNodeName(), host.getRackName()); attempt.setHostName(host.getNodeName(), host.getRackName());
attempt.setLocation(host.makeLoggedLocation()); attempt.setLocation(host.makeLoggedLocation());
} else {
attempt.setHostName(hostName, null);
} }
List<LoggedLocation> locs = task.getPreferredLocations(); List<LoggedLocation> locs = task.getPreferredLocations();
@ -1470,9 +1472,13 @@ private void processReduceAttemptLine(ParsedLine line) {
} }
} }
ParsedHost host = getAndRecordParsedHost(hostName); if (hostName != null) {
if (host != null) { ParsedHost host = getAndRecordParsedHost(hostName);
attempt.setHostName(host.getNodeName(), host.getRackName()); if (host != null) {
attempt.setHostName(host.getNodeName(), host.getRackName());
} else {
attempt.setHostName(hostName, null);
}
} }
if (attemptID != null) { if (attemptID != null) {

View File

@ -463,10 +463,11 @@ private void processTaskAttemptUnsuccessfulCompletionEvent(
} }
attempt.setResult(getPre21Value(event.getTaskStatus())); attempt.setResult(getPre21Value(event.getTaskStatus()));
ParsedHost parsedHost = getAndRecordParsedHost(event.getHostname()); attempt.setHostName(event.getHostname(), event.getRackName());
ParsedHost pHost =
if (parsedHost != null) { getAndRecordParsedHost(event.getRackName(), event.getHostname());
attempt.setLocation(parsedHost.makeLoggedLocation()); if (pHost != null) {
attempt.setLocation(pHost.makeLoggedLocation());
} }
attempt.setFinishTime(event.getFinishTime()); attempt.setFinishTime(event.getFinishTime());
@ -495,8 +496,10 @@ private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
return; return;
} }
attempt.setResult(getPre21Value(event.getTaskStatus())); attempt.setResult(getPre21Value(event.getTaskStatus()));
attempt.setLocation(getAndRecordParsedHost(event.getHostname()) ParsedHost pHost = getAndRecordParsedHost(event.getRackName(), event.getHostname());
.makeLoggedLocation()); if (pHost != null) {
attempt.setLocation(pHost.makeLoggedLocation());
}
attempt.setFinishTime(event.getFinishTime()); attempt.setFinishTime(event.getFinishTime());
attempt attempt
.incorporateCounters(((TaskAttemptFinished) event.getDatum()).counters); .incorporateCounters(((TaskAttemptFinished) event.getDatum()).counters);
@ -512,6 +515,11 @@ private void processReduceAttemptFinishedEvent(
} }
attempt.setResult(getPre21Value(event.getTaskStatus())); attempt.setResult(getPre21Value(event.getTaskStatus()));
attempt.setHostName(event.getHostname(), event.getRackName()); attempt.setHostName(event.getHostname(), event.getRackName());
ParsedHost pHost =
getAndRecordParsedHost(event.getRackName(), event.getHostname());
if (pHost != null) {
attempt.setLocation(pHost.makeLoggedLocation());
}
// XXX There may be redundant location info available in the event. // XXX There may be redundant location info available in the event.
// We might consider extracting it from this event. Currently this // We might consider extracting it from this event. Currently this
@ -535,7 +543,13 @@ private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
return; return;
} }
attempt.setResult(getPre21Value(event.getTaskStatus())); attempt.setResult(getPre21Value(event.getTaskStatus()));
attempt.setHostName(event.getHostname(), event.getRackname()); attempt.setHostName(event.getHostname(), event.getRackName());
ParsedHost pHost =
getAndRecordParsedHost(event.getRackName(), event.getHostname());
if (pHost != null) {
attempt.setLocation(pHost.makeLoggedLocation());
}
// XXX There may be redundant location info available in the event. // XXX There may be redundant location info available in the event.
// We might consider extracting it from this event. Currently this // We might consider extracting it from this event. Currently this
@ -665,7 +679,19 @@ private LoggedTaskAttempt getOrMakeTaskAttempt(TaskType type,
} }
private ParsedHost getAndRecordParsedHost(String hostName) { private ParsedHost getAndRecordParsedHost(String hostName) {
ParsedHost result = ParsedHost.parse(hostName); return getAndRecordParsedHost(null, hostName);
}
private ParsedHost getAndRecordParsedHost(String rackName, String hostName) {
ParsedHost result = null;
if (rackName == null) {
// for old (pre-23) job history files where hostname was represented as
// /rackname/hostname
result = ParsedHost.parse(hostName);
} else {
// for new (post-23) job history files
result = new ParsedHost(rackName, hostName);
}
if (result != null) { if (result != null) {
ParsedHost canonicalResult = allHosts.get(result); ParsedHost canonicalResult = allHosts.get(result);

View File

@ -71,11 +71,17 @@ public static ParsedHost parse(String name) {
return new ParsedHost(matcher.group(1), matcher.group(2)); return new ParsedHost(matcher.group(1), matcher.group(2));
} }
private String process(String name) {
return name == null
? null
: name.startsWith("/") ? name.substring(1) : name;
}
public ParsedHost(LoggedLocation loc) { public ParsedHost(LoggedLocation loc) {
List<NodeName> coordinates = loc.getLayers(); List<NodeName> coordinates = loc.getLayers();
rackName = coordinates.get(0).getRackName(); rackName = process(coordinates.get(0).getRackName());
nodeName = coordinates.get(1).getHostName(); nodeName = process(coordinates.get(1).getHostName());
} }
LoggedLocation makeLoggedLocation() { LoggedLocation makeLoggedLocation() {
@ -101,8 +107,8 @@ public String getRackName() {
// expects the broadest name first // expects the broadest name first
ParsedHost(String rackName, String nodeName) { ParsedHost(String rackName, String nodeName) {
this.rackName = rackName; this.rackName = process(rackName);
this.nodeName = nodeName; this.nodeName = process(nodeName);
} }
@Override @Override

View File

@ -108,9 +108,12 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
TaskAttempt20LineEventEmitter that = TaskAttempt20LineEventEmitter that =
(TaskAttempt20LineEventEmitter) thatg; (TaskAttempt20LineEventEmitter) thatg;
ParsedHost pHost = ParsedHost.parse(hostName);
return new TaskAttemptFinishedEvent(taskAttemptID, return new TaskAttemptFinishedEvent(taskAttemptID,
that.originalTaskType, status, Long.parseLong(finishTime), that.originalTaskType, status, Long.parseLong(finishTime),
hostName, state, maybeParseCounters(counters)); pHost.getRackName(), pHost.getNodeName(), state,
maybeParseCounters(counters));
} }
return null; return null;
@ -138,10 +141,19 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
TaskAttempt20LineEventEmitter that = TaskAttempt20LineEventEmitter that =
(TaskAttempt20LineEventEmitter) thatg; (TaskAttempt20LineEventEmitter) thatg;
ParsedHost pHost = ParsedHost.parse(hostName);
String rackName = null;
// Earlier versions of MR logged on hostnames (without rackname) for
// unsuccessful attempts
if (pHost != null) {
rackName = pHost.getRackName();
hostName = pHost.getNodeName();
}
return new TaskAttemptUnsuccessfulCompletionEvent return new TaskAttemptUnsuccessfulCompletionEvent
(taskAttemptID, (taskAttemptID,
that.originalTaskType, status, Long.parseLong(finishTime), that.originalTaskType, status, Long.parseLong(finishTime),
hostName, -1, error, null); hostName, -1, rackName, error, null);
} }
return null; return null;

View File

@ -25,6 +25,8 @@
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
/** /**
@ -46,6 +48,10 @@ public void process(HistoryEvent event) {
processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event); processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
} else if (event instanceof TaskStartedEvent) { } else if (event instanceof TaskStartedEvent) {
processTaskStartedEvent((TaskStartedEvent) event); processTaskStartedEvent((TaskStartedEvent) event);
} else if (event instanceof MapAttemptFinishedEvent) {
processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
} else if (event instanceof ReduceAttemptFinishedEvent) {
processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
} }
// I do NOT expect these if statements to be exhaustive. // I do NOT expect these if statements to be exhaustive.
@ -78,15 +84,40 @@ private void processTaskStartedEvent(TaskStartedEvent event) {
private void processTaskAttemptUnsuccessfulCompletionEvent( private void processTaskAttemptUnsuccessfulCompletionEvent(
TaskAttemptUnsuccessfulCompletionEvent event) { TaskAttemptUnsuccessfulCompletionEvent event) {
recordParsedHost(event.getHostname()); recordParsedHost(event.getHostname(), event.getRackName());
} }
private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) { private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
recordParsedHost(event.getHostname()); recordParsedHost(event.getHostname(), event.getRackName());
} }
private void recordParsedHost(String hostName) { private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
ParsedHost result = ParsedHost.parse(hostName); recordParsedHost(event.getHostname(), event.getRackName());
}
private void processReduceAttemptFinishedEvent(ReduceAttemptFinishedEvent event) {
recordParsedHost(event.getHostname(), event.getRackName());
}
private void recordParsedHost(String hostName, String rackName) {
if (hostName == null) {
return;
}
ParsedHost result = null;
if (rackName == null) {
result = ParsedHost.parse(hostName);
} else {
result = new ParsedHost(rackName, hostName);
}
if (result != null && !allHosts.contains(result)) {
allHosts.add(result);
}
}
private void recordParsedHost(String nodeName) {
ParsedHost result = ParsedHost.parse(nodeName);
if (result != null && !allHosts.contains(result)) { if (result != null && !allHosts.contains(result)) {
allHosts.add(result); allHosts.add(result);