MAPREDUCE-3450. NM port info no longer available in JobHistory. (Siddharth Seth via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1208327 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2011-11-30 08:43:00 +00:00
parent 68ff73c802
commit 94bf0dacba
16 changed files with 63 additions and 28 deletions

View File

@ -197,6 +197,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3433. Finding counters by legacy group name returns empty
counters. (tomwhite)
MAPREDUCE-3450. NM port info no longer available in JobHistory.
(Siddharth Seth via mahadev)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -922,8 +922,11 @@ private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
.getTaskType()), attemptState.toString(),
taskAttempt.finishTime,
taskAttempt.containerMgrAddress == null ? "UNKNOWN"
: taskAttempt.containerMgrAddress, StringUtils.join(
taskAttempt.containerNodeId == null ? "UNKNOWN"
: taskAttempt.containerNodeId.getHost(),
taskAttempt.containerNodeId == null ? -1
: taskAttempt.containerNodeId.getPort(),
StringUtils.join(
LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
.getProgressSplitBlock().burst());
return tauce;
@ -1273,6 +1276,7 @@ private void logAttemptFinishedEvent(TaskAttemptState state) {
finishTime,
this.containerNodeId == null ? "UNKNOWN"
: this.containerNodeId.getHost(),
this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
TypeConverter.fromYarn(getCounters()),
@ -1288,7 +1292,8 @@ private void logAttemptFinishedEvent(TaskAttemptState state) {
this.reportedStatus.sortFinishTime,
finishTime,
this.containerNodeId == null ? "UNKNOWN"
: this.containerNodeId.getHost(),
: this.containerNodeId.getHost(),
this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
TypeConverter.fromYarn(getCounters()),

View File

@ -136,6 +136,7 @@
{"name": "mapFinishTime", "type": "long"},
{"name": "finishTime", "type": "long"},
{"name": "hostname", "type": "string"},
{"name": "port", "type": "int"},
{"name": "rackname", "type": "string"},
{"name": "state", "type": "string"},
{"name": "counters", "type": "JhCounters"},
@ -156,6 +157,7 @@
{"name": "sortFinishTime", "type": "long"},
{"name": "finishTime", "type": "long"},
{"name": "hostname", "type": "string"},
{"name": "port", "type": "int"},
{"name": "rackname", "type": "string"},
{"name": "state", "type": "string"},
{"name": "counters", "type": "JhCounters"},
@ -199,6 +201,7 @@
{"name": "attemptId", "type": "string"},
{"name": "finishTime", "type": "long"},
{"name": "hostname", "type": "string"},
{"name": "port", "type": "int"},
{"name": "status", "type": "string"},
{"name": "error", "type": "string"},
{"name": "clockSplits", "type": { "type": "array", "items": "int"}},

View File

@ -209,6 +209,7 @@ private void handleTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
attemptInfo.sortFinishTime = event.getSortFinishTime();
attemptInfo.counters = event.getCounters();
attemptInfo.hostname = event.getHostname();
attemptInfo.port = event.getPort();
attemptInfo.rackname = event.getRackName();
}
@ -222,6 +223,7 @@ private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
attemptInfo.mapFinishTime = event.getMapFinishTime();
attemptInfo.counters = event.getCounters();
attemptInfo.hostname = event.getHostname();
attemptInfo.port = event.getPort();
attemptInfo.rackname = event.getRackname();
}
@ -234,6 +236,7 @@ private void handleTaskAttemptFailedEvent(
attemptInfo.error = event.getError();
attemptInfo.status = event.getTaskStatus();
attemptInfo.hostname = event.getHostname();
attemptInfo.port = event.getPort();
attemptInfo.shuffleFinishTime = event.getFinishTime();
attemptInfo.sortFinishTime = event.getFinishTime();
attemptInfo.mapFinishTime = event.getFinishTime();
@ -542,6 +545,7 @@ public static class TaskAttemptInfo {
int httpPort;
int shufflePort;
String hostname;
int port;
String rackname;
ContainerId containerId;
@ -552,6 +556,7 @@ public TaskAttemptInfo() {
startTime = finishTime = shuffleFinishTime = sortFinishTime =
mapFinishTime = -1;
error = state = trackerName = hostname = rackname = "";
port = -1;
httpPort = -1;
shufflePort = -1;
}
@ -599,6 +604,8 @@ public void printAll() {
public String getTrackerName() { return trackerName; }
/** @return the host name */
public String getHostname() { return hostname; }
/** @return the port */
public int getPort() { return port; }
/** @return the rack name */
public String getRackname() { return rackname; }
/** @return the counters for the attempt */

View File

@ -44,6 +44,7 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
* @param mapFinishTime Finish time of the map phase
* @param finishTime Finish time of the attempt
* @param hostname Name of the host where the map executed
* @param port RPC port for the tracker host.
* @param rackName Name of the rack where the map executed
* @param state State string for the attempt
* @param counters Counters for the attempt
@ -57,9 +58,8 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
*/
public MapAttemptFinishedEvent
(TaskAttemptID id, TaskType taskType, String taskStatus,
long mapFinishTime, long finishTime, String hostname, String rackName,
String state, Counters counters,
int[][] allSplits) {
long mapFinishTime, long finishTime, String hostname, int port,
String rackName, String state, Counters counters, int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString());
datum.attemptId = new Utf8(id.toString());
datum.taskType = new Utf8(taskType.name());
@ -67,6 +67,7 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
datum.mapFinishTime = mapFinishTime;
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
datum.port = port;
datum.rackname = new Utf8(rackName);
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
@ -106,7 +107,7 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
(TaskAttemptID id, TaskType taskType, String taskStatus,
long mapFinishTime, long finishTime, String hostname,
String state, Counters counters) {
this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, "",
this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, -1, "",
state, counters, null);
}
@ -136,6 +137,8 @@ public TaskType getTaskType() {
public long getFinishTime() { return datum.finishTime; }
/** Get the host name */
public String getHostname() { return datum.hostname.toString(); }
/** Get the tracker rpc port */
public int getPort() { return datum.port; }
/** Get the rack name */
public String getRackname() { return datum.rackname.toString(); }
/** Get the state string */

View File

@ -46,6 +46,7 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
* @param sortFinishTime Finish time of the sort phase
* @param finishTime Finish time of the attempt
* @param hostname Name of the host where the attempt executed
* @param port RPC port for the tracker host.
* @param rackName Name of the rack where the attempt executed
* @param state State of the attempt
* @param counters Counters for the attempt
@ -57,8 +58,8 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
public ReduceAttemptFinishedEvent
(TaskAttemptID id, TaskType taskType, String taskStatus,
long shuffleFinishTime, long sortFinishTime, long finishTime,
String hostname, String rackName, String state, Counters counters,
int[][] allSplits) {
String hostname, int port, String rackName, String state,
Counters counters, int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString());
datum.attemptId = new Utf8(id.toString());
datum.taskType = new Utf8(taskType.name());
@ -67,6 +68,7 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
datum.sortFinishTime = sortFinishTime;
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
datum.port = port;
datum.rackname = new Utf8(rackName);
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
@ -108,7 +110,7 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
String hostname, String state, Counters counters) {
this(id, taskType, taskStatus,
shuffleFinishTime, sortFinishTime, finishTime,
hostname, "", state, counters, null);
hostname, -1, "", state, counters, null);
}
ReduceAttemptFinishedEvent() {}
@ -138,6 +140,8 @@ public TaskType getTaskType() {
public long getFinishTime() { return datum.finishTime; }
/** Get the name of the host where the attempt ran */
public String getHostname() { return datum.hostname.toString(); }
/** Get the tracker rpc port */
public int getPort() { return datum.port; }
/** Get the rack name of the node where the attempt ran */
public String getRackName() { return datum.rackname.toString(); }
/** Get the state string */

View File

@ -46,6 +46,7 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
* @param status Status of the attempt
* @param finishTime Finish time of the attempt
* @param hostname Name of the host where the attempt executed
* @param port rpc port for for the tracker
* @param error Error string
* @param allSplits the "splits", or a pixelated graph of various
* measurable worker node state variables against progress.
@ -55,13 +56,14 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
public TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID id, TaskType taskType,
String status, long finishTime,
String hostname, String error,
String hostname, int port, String error,
int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString());
datum.taskType = new Utf8(taskType.name());
datum.attemptId = new Utf8(id.toString());
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
datum.port = port;
datum.error = new Utf8(error);
datum.status = new Utf8(status);
@ -97,7 +99,7 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
(TaskAttemptID id, TaskType taskType,
String status, long finishTime,
String hostname, String error) {
this(id, taskType, status, finishTime, hostname, error, null);
this(id, taskType, status, finishTime, hostname, -1, error, null);
}
TaskAttemptUnsuccessfulCompletionEvent() {}
@ -121,6 +123,8 @@ public TaskAttemptID getTaskAttemptId() {
public long getFinishTime() { return datum.finishTime; }
/** Get the name of the host where the attempt executed */
public String getHostname() { return datum.hostname.toString(); }
/** Get the rpc port for the host where the attempt executed */
public int getPort() { return datum.port; }
/** Get the error string */
public String getError() { return datum.error.toString(); }
/** Get the task status */

View File

@ -80,12 +80,11 @@ public class CompletedTaskAttempt implements TaskAttempt {
report.setStateString(attemptInfo.getState());
report.setCounters(getCounters());
report.setContainerId(attemptInfo.getContainerId());
String []hostSplits = attemptInfo.getHostname().split(":");
if (hostSplits.length != 2) {
if (attemptInfo.getHostname() == null) {
report.setNodeManagerHost("UNKNOWN");
} else {
report.setNodeManagerHost(hostSplits[0]);
report.setNodeManagerPort(Integer.parseInt(hostSplits[1]));
report.setNodeManagerHost(attemptInfo.getHostname());
report.setNodeManagerPort(attemptInfo.getPort());
}
report.setNodeManagerHttpPort(attemptInfo.getHttpPort());
}
@ -97,7 +96,7 @@ public ContainerId getAssignedContainerID() {
@Override
public String getAssignedContainerMgrAddress() {
return attemptInfo.getHostname();
return attemptInfo.getHostname() + ":" + attemptInfo.getPort();
}
@Override

View File

@ -165,6 +165,9 @@ private void verifyAttempt(TaskAttempt attempt) {
//Verify the wrong ctor is not being used. Remove after mrv1 is removed.
ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1);
Assert.assertFalse(attempt.getAssignedContainerID().equals(fakeCid));
//Verify complete contianerManagerAddress
Assert.assertEquals(MRApp.NM_HOST + ":" + MRApp.NM_PORT,
attempt.getAssignedContainerMgrAddress());
}
static class MRAppWithHistory extends MRApp {

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@ -64,13 +65,14 @@ public class TestJobHistoryParsing {
public static class MyResolver implements DNSToSwitchMapping {
@Override
public List<String> resolve(List<String> names) {
return Arrays.asList(new String[]{"MyRackName"});
return Arrays.asList(new String[]{"/MyRackName"});
}
}
@Test
public void testHistoryParsing() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
long amStartTimeEst = System.currentTimeMillis();
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
@ -165,10 +167,12 @@ public void testHistoryParsing() throws Exception {
Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo);
Assert.assertEquals("Incorrect shuffle port for task attempt",
taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort());
Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname());
Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort());
// Verify rack-name
Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
.getRackname(), "MyRackName");
.getRackname(), "/MyRackName");
}
}

View File

@ -2685,7 +2685,7 @@ public synchronized boolean completedTask(TaskInProgress tip,
MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent(
statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
status.getMapFinishTime(),
status.getFinishTime(), trackerHostname, "",
status.getFinishTime(), trackerHostname, -1, "",
status.getStateString(),
new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
tip.getSplits(statusAttemptID).burst()
@ -2698,7 +2698,7 @@ public synchronized boolean completedTask(TaskInProgress tip,
statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
status.getShuffleFinishTime(),
status.getSortFinishTime(), status.getFinishTime(),
trackerHostname, "", status.getStateString(),
trackerHostname, -1, "", status.getStateString(),
new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
tip.getSplits(statusAttemptID).burst()
);
@ -3208,7 +3208,7 @@ private void failedTask(TaskInProgress tip, TaskAttemptID taskid,
(taskid,
taskType, taskStatus.getRunState().toString(),
finishTime,
taskTrackerHostName, diagInfo,
taskTrackerHostName, -1, diagInfo,
splits.burst());
jobHistory.logEvent(tue, taskid.getJobID());

View File

@ -83,7 +83,7 @@ private static void testFailedKilledEventsForTypes(EventType expected,
for (TaskType t : types) {
TaskAttemptUnsuccessfulCompletionEvent tauce =
new TaskAttemptUnsuccessfulCompletionEvent
(id, t, state, 0L, "", "", NULL_SPLITS_ARRAY);
(id, t, state, 0L, "", -1, "", NULL_SPLITS_ARRAY);
assertEquals(expected, tauce.getEventType());
}
}

View File

@ -938,12 +938,12 @@ public void testTopologyBuilder() throws Exception {
(TaskAttemptID.forName("attempt_200904211745_0003_m_000004_1"),
TaskType.valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com",
"MACHINE_EXPLODED", splits));
-1, "MACHINE_EXPLODED", splits));
subject.process(new TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID.forName("attempt_200904211745_0003_m_000004_2"),
TaskType.valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com",
"MACHINE_EXPLODED", splits));
-1, "MACHINE_EXPLODED", splits));
subject.process(new TaskStartedEvent(TaskID
.forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType
.valueOf("MAP"),

View File

@ -73,7 +73,7 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
that.originalTaskType, status,
Long.parseLong(finishTime),
Long.parseLong(finishTime),
hostName, null, state, maybeParseCounters(counters),
hostName, -1, null, state, maybeParseCounters(counters),
null);
}
}

View File

@ -77,7 +77,7 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
Long.parseLong(shuffleFinish),
Long.parseLong(sortFinish),
Long.parseLong(finishTime),
hostName, null,
hostName, -1, null,
state, maybeParseCounters(counters),
null);
}

View File

@ -141,7 +141,7 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
return new TaskAttemptUnsuccessfulCompletionEvent
(taskAttemptID,
that.originalTaskType, status, Long.parseLong(finishTime),
hostName, error, null);
hostName, -1, error, null);
}
return null;