MAPREDUCE-3035. Fixed MR JobHistory to ensure rack information is present. Contributed by chakravarthy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1195575 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-31 17:27:42 +00:00
parent 7e056015ad
commit 9db078212f
19 changed files with 121 additions and 37 deletions

View File

@ -1883,6 +1883,9 @@ Release 0.23.0 - Unreleased
a couple of events in failure states correctly. (Hitesh Shah and Siddharth
Seth via vinodkv)
MAPREDUCE-3035. Fixed MR JobHistory to ensure rack information is present.
(chakravarthy via acmurthy)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -59,6 +59,11 @@ public interface TaskAttempt {
*/
String getNodeHttpAddress();
/**
* @return node's rack name if a container is assigned, otherwise null.
*/
String getNodeRackName();
/**
* @return time at which container is launched. If container is not launched
* yet, returns 0.

View File

@ -429,6 +429,7 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
private NodeId containerNodeId;
private String containerMgrAddress;
private String nodeHttpAddress;
private String nodeRackName;
private WrappedJvmID jvmID;
private ContainerToken containerToken;
private Resource assignedCapability;
@ -728,6 +729,19 @@ public String getNodeHttpAddress() {
}
}
/**
* If container Assigned then return the node's rackname, otherwise null.
*/
@Override
public String getNodeRackName() {
this.readLock.lock();
try {
return this.nodeRackName;
} finally {
this.readLock.unlock();
}
}
protected abstract org.apache.hadoop.mapred.Task createRemoteTask();
@Override
@ -1014,6 +1028,8 @@ public void transition(final TaskAttemptImpl taskAttempt,
taskAttempt.containerMgrAddress = taskAttempt.containerNodeId
.toString();
taskAttempt.nodeHttpAddress = cEvent.getContainer().getNodeHttpAddress();
taskAttempt.nodeRackName = RackResolver.resolve(
taskAttempt.containerNodeId.getHost()).getNetworkLocation();
taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
taskAttempt.assignedCapability = cEvent.getContainer().getResource();
// this is a _real_ Task (classic Hadoop mapred flavor):
@ -1254,8 +1270,10 @@ private void logAttemptFinishedEvent(TaskAttemptState state) {
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
state.toString(),
this.reportedStatus.mapFinishTime,
finishTime, this.containerMgrAddress == null ? "UNKNOWN"
: this.containerMgrAddress,
finishTime,
this.containerNodeId == null ? "UNKNOWN"
: this.containerNodeId.getHost(),
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
TypeConverter.fromYarn(getCounters()),
getProgressSplitBlock().burst());
@ -1268,8 +1286,10 @@ private void logAttemptFinishedEvent(TaskAttemptState state) {
state.toString(),
this.reportedStatus.shuffleFinishTime,
this.reportedStatus.sortFinishTime,
finishTime, this.containerMgrAddress == null ? "UNKNOWN"
: this.containerMgrAddress,
finishTime,
this.containerNodeId == null ? "UNKNOWN"
: this.containerNodeId.getHost(),
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
TypeConverter.fromYarn(getCounters()),
getProgressSplitBlock().burst());

View File

@ -330,6 +330,7 @@ class MockContainerLauncher implements ContainerLauncher {
//We are running locally so set the shuffle port to -1
int shufflePort = -1;
@SuppressWarnings("unchecked")
@Override
public void handle(ContainerLauncherEvent event) {
switch (event.getType()) {

View File

@ -279,6 +279,11 @@ public long getShuffleFinishTime() {
public long getSortFinishTime() {
return 0;
}
@Override
public String getNodeRackName() {
return "/default-rack";
}
};
}

View File

@ -689,6 +689,11 @@ public String getNodeHttpAddress() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public String getNodeRackName() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long getLaunchTime() {
return startMockTime;

View File

@ -136,6 +136,7 @@
{"name": "mapFinishTime", "type": "long"},
{"name": "finishTime", "type": "long"},
{"name": "hostname", "type": "string"},
{"name": "rackname", "type": "string"},
{"name": "state", "type": "string"},
{"name": "counters", "type": "JhCounters"},
{"name": "clockSplits", "type": { "type": "array", "items": "int"}},
@ -155,6 +156,7 @@
{"name": "sortFinishTime", "type": "long"},
{"name": "finishTime", "type": "long"},
{"name": "hostname", "type": "string"},
{"name": "rackname", "type": "string"},
{"name": "state", "type": "string"},
{"name": "counters", "type": "JhCounters"},
{"name": "clockSplits", "type": { "type": "array", "items": "int"}},

View File

@ -209,6 +209,7 @@ private void handleTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
attemptInfo.sortFinishTime = event.getSortFinishTime();
attemptInfo.counters = event.getCounters();
attemptInfo.hostname = event.getHostname();
attemptInfo.rackname = event.getRackName();
}
private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
@ -221,6 +222,7 @@ private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
attemptInfo.mapFinishTime = event.getMapFinishTime();
attemptInfo.counters = event.getCounters();
attemptInfo.hostname = event.getHostname();
attemptInfo.rackname = event.getRackname();
}
private void handleTaskAttemptFailedEvent(
@ -540,6 +542,7 @@ public static class TaskAttemptInfo {
int httpPort;
int shufflePort;
String hostname;
String rackname;
ContainerId containerId;
/** Create a Task Attempt Info which will store attempt level information
@ -548,7 +551,7 @@ public static class TaskAttemptInfo {
public TaskAttemptInfo() {
startTime = finishTime = shuffleFinishTime = sortFinishTime =
mapFinishTime = -1;
error = state = trackerName = hostname = "";
error = state = trackerName = hostname = rackname = "";
httpPort = -1;
shufflePort = -1;
}
@ -596,6 +599,8 @@ public void printAll() {
public String getTrackerName() { return trackerName; }
/** @return the host name */
public String getHostname() { return hostname; }
/** @return the rack name */
public String getRackname() { return rackname; }
/** @return the counters for the attempt */
public Counters getCounters() { return counters; }
/** @return the HTTP port for the tracker */

View File

@ -18,17 +18,14 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.ProgressSplitsBlock;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapred.ProgressSplitsBlock;
import org.apache.avro.util.Utf8;
/**
* Event to record successful completion of a map attempt
@ -47,6 +44,7 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
* @param mapFinishTime Finish time of the map phase
* @param finishTime Finish time of the attempt
* @param hostname Name of the host where the map executed
* @param rackName Name of the rack where the map executed
* @param state State string for the attempt
* @param counters Counters for the attempt
* @param allSplits the "splits", or a pixelated graph of various
@ -59,7 +57,7 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
*/
public MapAttemptFinishedEvent
(TaskAttemptID id, TaskType taskType, String taskStatus,
long mapFinishTime, long finishTime, String hostname,
long mapFinishTime, long finishTime, String hostname, String rackName,
String state, Counters counters,
int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString());
@ -69,6 +67,7 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
datum.mapFinishTime = mapFinishTime;
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
datum.rackname = new Utf8(rackName);
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
@ -107,7 +106,8 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
(TaskAttemptID id, TaskType taskType, String taskStatus,
long mapFinishTime, long finishTime, String hostname,
String state, Counters counters) {
this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, state, counters, null);
this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, null,
state, counters, null);
}
@ -136,6 +136,8 @@ public TaskType getTaskType() {
public long getFinishTime() { return datum.finishTime; }
/** Get the host name */
public String getHostname() { return datum.hostname.toString(); }
/** Get the rack name */
public String getRackname() { return datum.rackname.toString(); }
/** Get the state string */
public String getState() { return datum.state.toString(); }
/** Get the counters */

View File

@ -46,6 +46,7 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
* @param sortFinishTime Finish time of the sort phase
* @param finishTime Finish time of the attempt
* @param hostname Name of the host where the attempt executed
* @param rackName Name of the rack where the attempt executed
* @param state State of the attempt
* @param counters Counters for the attempt
* @param allSplits the "splits", or a pixelated graph of various
@ -56,7 +57,7 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
public ReduceAttemptFinishedEvent
(TaskAttemptID id, TaskType taskType, String taskStatus,
long shuffleFinishTime, long sortFinishTime, long finishTime,
String hostname, String state, Counters counters,
String hostname, String rackName, String state, Counters counters,
int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString());
datum.attemptId = new Utf8(id.toString());
@ -66,6 +67,7 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
datum.sortFinishTime = sortFinishTime;
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
datum.rackname = new Utf8(rackName);
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
@ -106,7 +108,7 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
String hostname, String state, Counters counters) {
this(id, taskType, taskStatus,
shuffleFinishTime, sortFinishTime, finishTime,
hostname, state, counters, null);
hostname, null, state, counters, null);
}
ReduceAttemptFinishedEvent() {}
@ -136,6 +138,8 @@ public TaskType getTaskType() {
public long getFinishTime() { return datum.finishTime; }
/** Get the name of the host where the attempt ran */
public String getHostname() { return datum.hostname.toString(); }
/** Get the rack name of the node where the attempt ran */
public String getRackName() { return datum.rackname.toString(); }
/** Get the state string */
public String getState() { return datum.state.toString(); }
/** Get the counters for the attempt */

View File

@ -105,6 +105,11 @@ public String getNodeHttpAddress() {
return attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort();
}
@Override
public String getNodeRackName() {
return attemptInfo.getRackname();
}
@Override
public Counters getCounters() {
return counters;

View File

@ -114,6 +114,7 @@ protected void render(Block html) {
String nodeHttpAddr = ta.getNodeHttpAddress();
String containerIdString = ta.getAssignedContainerID().toString();
String nodeIdString = ta.getAssignedContainerMgrAddress();
String nodeRackName = ta.getNodeRackName();
long attemptStartTime = ta.getLaunchTime();
long shuffleFinishTime = -1;
@ -139,10 +140,10 @@ protected void render(Block html) {
TR<TBODY<TABLE<Hamlet>>> row = tbody.tr();
TD<TR<TBODY<TABLE<Hamlet>>>> td = row.td();
td.br().$title(String.valueOf(sortId))._()
. // sorting
_(taid)._().td(ta.getState().toString()).td()
.a(".nodelink", url("http://", nodeHttpAddr), nodeHttpAddr);
td.br().$title(String.valueOf(sortId))._(). // sorting
_(taid)._().td(ta.getState().toString()).td().a(".nodelink",
url("http://", nodeHttpAddr),
nodeRackName + "/" + nodeHttpAddr);
td._(" ").a(".logslink",
url("logs", nodeIdString, containerIdString, taid, app.getJob()
.getUserName()), "logs");

View File

@ -19,7 +19,9 @@
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
@ -28,13 +30,15 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -47,18 +51,31 @@
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.RackResolver;
import org.junit.Test;
public class TestJobHistoryParsing {
private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
public static class MyResolver implements DNSToSwitchMapping {
@Override
public List<String> resolve(List<String> names) {
return Arrays.asList(new String[]{"MyRackName"});
}
}
@Test
public void testHistoryParsing() throws Exception {
Configuration conf = new Configuration();
long amStartTimeEst = System.currentTimeMillis();
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(),
true);
app.submit(conf);
@ -107,7 +124,8 @@ public void testHistoryParsing() throws Exception {
jobInfo.getFinishedReduces());
Assert.assertEquals("incorrect uberized ", job.isUber(),
jobInfo.getUberized());
int totalTasks = jobInfo.getAllTasks().size();
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
int totalTasks = allTasks.size();
Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks);
// Verify aminfo
@ -125,7 +143,7 @@ public void testHistoryParsing() throws Exception {
ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1);
// Assert at taskAttempt level
for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) {
for (TaskInfo taskInfo : allTasks.values()) {
int taskAttemptCount = taskInfo.getAllTaskAttempts().size();
Assert
.assertEquals("total number of task attempts ", 1, taskAttemptCount);
@ -138,7 +156,7 @@ public void testHistoryParsing() throws Exception {
// Deep compare Job and JobInfo
for (Task task : job.getTasks().values()) {
TaskInfo taskInfo = jobInfo.getAllTasks().get(
TaskInfo taskInfo = allTasks.get(
TypeConverter.fromYarn(task.getID()));
Assert.assertNotNull("TaskInfo not found", taskInfo);
for (TaskAttempt taskAttempt : task.getAttempts().values()) {
@ -147,6 +165,10 @@ public void testHistoryParsing() throws Exception {
Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo);
Assert.assertEquals("Incorrect shuffle port for task attempt",
taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort());
// Verify rack-name
Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
.getRackname(), "MyRackName");
}
}

View File

@ -2685,7 +2685,7 @@ public synchronized boolean completedTask(TaskInProgress tip,
MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent(
statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
status.getMapFinishTime(),
status.getFinishTime(), trackerHostname,
status.getFinishTime(), trackerHostname, null,
status.getStateString(),
new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
tip.getSplits(statusAttemptID).burst()
@ -2698,7 +2698,7 @@ public synchronized boolean completedTask(TaskInProgress tip,
statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
status.getShuffleFinishTime(),
status.getSortFinishTime(), status.getFinishTime(),
trackerHostname, status.getStateString(),
trackerHostname, null, status.getStateString(),
new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
tip.getSplits(statusAttemptID).burst()
);

View File

@ -1323,13 +1323,11 @@ private void processMapAttemptLine(ParsedLine line) {
int distance = Integer.MAX_VALUE;
if (hostName != null) {
attempt.setHostName(hostName);
ParsedHost host = null;
host = getAndRecordParsedHost(hostName);
ParsedHost host = getAndRecordParsedHost(hostName);
if (host != null) {
attempt.setHostName(host.getNodeName(), host.getRackName());
attempt.setLocation(host.makeLoggedLocation());
}
@ -1492,8 +1490,10 @@ private void processReduceAttemptLine(ParsedLine line) {
failedReduceAttemptTimes.enter(runtime);
}
}
if (hostName != null) {
attempt.setHostName(hostName);
ParsedHost host = getAndRecordParsedHost(hostName);
if (host != null) {
attempt.setHostName(host.getNodeName(), host.getRackName());
}
if (attemptID != null) {

View File

@ -522,7 +522,8 @@ private void processReduceAttemptFinishedEvent(
return;
}
attempt.setResult(getPre21Value(event.getTaskStatus()));
attempt.setHostName(event.getHostname());
attempt.setHostName(event.getHostname(), event.getRackName());
// XXX There may be redundant location info available in the event.
// We might consider extracting it from this event. Currently this
// is redundant, but making this will add future-proofing.
@ -545,7 +546,8 @@ private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
return;
}
attempt.setResult(getPre21Value(event.getTaskStatus()));
attempt.setHostName(event.getHostname());
attempt.setHostName(event.getHostname(), event.getRackname());
// XXX There may be redundant location info available in the event.
// We might consider extracting it from this event. Currently this
// is redundant, but making this will add future-proofing.

View File

@ -49,6 +49,7 @@ public class LoggedTaskAttempt implements DeepCompare {
long startTime = -1L;
long finishTime = -1L;
String hostName;
String rackName;
long hdfsBytesRead = -1L;
long hdfsBytesWritten = -1L;
@ -328,8 +329,9 @@ public String getHostName() {
return hostName;
}
void setHostName(String hostName) {
this.hostName = hostName == null ? null : hostName.intern();
void setHostName(String hostName, String rackName) {
this.hostName = hostName == null || this.rackName == null ? null
: hostName.intern() + "/" + rackName.intern();
}
public long getHdfsBytesRead() {

View File

@ -73,7 +73,7 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
that.originalTaskType, status,
Long.parseLong(finishTime),
Long.parseLong(finishTime),
hostName, state, maybeParseCounters(counters),
hostName, null, state, maybeParseCounters(counters),
null);
}
}

View File

@ -77,7 +77,7 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
Long.parseLong(shuffleFinish),
Long.parseLong(sortFinish),
Long.parseLong(finishTime),
hostName,
hostName, null,
state, maybeParseCounters(counters),
null);
}