MAPREDUCE-4838. Add additional fields like Locality, Avataar to the JobHistory logs. Contributed by Zhijie Shen
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1439714 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
df10249669
commit
c163dc2fce
@ -212,6 +212,9 @@ Release 2.0.3-alpha - Unreleased
|
|||||||
MAPREDUCE-4809. Change visibility of classes for pluggable sort changes.
|
MAPREDUCE-4809. Change visibility of classes for pluggable sort changes.
|
||||||
(masokan via tucu)
|
(masokan via tucu)
|
||||||
|
|
||||||
|
MAPREDUCE-4838. Add additional fields like Locality, Avataar to the
|
||||||
|
JobHistory logs. (Zhijie Shen via sseth)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
@ -1192,6 +1193,39 @@ private int getBlockSize() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
/**
|
||||||
|
* Get the workflow adjacencies from the job conf
|
||||||
|
* The string returned is of the form "key"="value" "key"="value" ...
|
||||||
|
*/
|
||||||
|
private static String getWorkflowAdjacencies(Configuration conf) {
|
||||||
|
int prefixLen = MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING.length();
|
||||||
|
Map<String,String> adjacencies =
|
||||||
|
conf.getValByRegex(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN);
|
||||||
|
if (adjacencies.isEmpty()) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
int size = 0;
|
||||||
|
for (Entry<String,String> entry : adjacencies.entrySet()) {
|
||||||
|
int keyLen = entry.getKey().length();
|
||||||
|
size += keyLen - prefixLen;
|
||||||
|
size += entry.getValue().length() + 6;
|
||||||
|
}
|
||||||
|
StringBuilder sb = new StringBuilder(size);
|
||||||
|
for (Entry<String,String> entry : adjacencies.entrySet()) {
|
||||||
|
int keyLen = entry.getKey().length();
|
||||||
|
sb.append("\"");
|
||||||
|
sb.append(escapeString(entry.getKey().substring(prefixLen, keyLen)));
|
||||||
|
sb.append("\"=\"");
|
||||||
|
sb.append(escapeString(entry.getValue()));
|
||||||
|
sb.append("\" ");
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String escapeString(String data) {
|
||||||
|
return StringUtils.escapeString(data, StringUtils.ESCAPE_CHAR,
|
||||||
|
new char[] {'"', '=', '.'});
|
||||||
|
}
|
||||||
|
|
||||||
public static class InitTransition
|
public static class InitTransition
|
||||||
implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
|
implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
|
||||||
@ -1217,7 +1251,11 @@ public JobStateInternal transition(JobImpl job, JobEvent event) {
|
|||||||
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
|
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
|
||||||
job.appSubmitTime,
|
job.appSubmitTime,
|
||||||
job.remoteJobConfFile.toString(),
|
job.remoteJobConfFile.toString(),
|
||||||
job.jobACLs, job.queueName);
|
job.jobACLs, job.queueName,
|
||||||
|
job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
|
||||||
|
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
|
||||||
|
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
|
||||||
|
getWorkflowAdjacencies(job.conf));
|
||||||
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
|
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
|
||||||
//TODO JH Verify jobACLs, UserName via UGI?
|
//TODO JH Verify jobACLs, UserName via UGI?
|
||||||
|
|
||||||
|
@ -66,6 +66,8 @@
|
|||||||
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
|
||||||
import org.apache.hadoop.mapreduce.security.TokenCache;
|
import org.apache.hadoop.mapreduce.security.TokenCache;
|
||||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.Locality;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
|
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
|
||||||
@ -156,7 +158,8 @@ public abstract class TaskAttemptImpl implements
|
|||||||
private final org.apache.hadoop.mapred.JobID oldJobId;
|
private final org.apache.hadoop.mapred.JobID oldJobId;
|
||||||
private final TaskAttemptListener taskAttemptListener;
|
private final TaskAttemptListener taskAttemptListener;
|
||||||
private final Resource resourceCapability;
|
private final Resource resourceCapability;
|
||||||
private final String[] dataLocalHosts;
|
protected Set<String> dataLocalHosts;
|
||||||
|
protected Set<String> dataLocalRacks;
|
||||||
private final List<String> diagnostics = new ArrayList<String>();
|
private final List<String> diagnostics = new ArrayList<String>();
|
||||||
private final Lock readLock;
|
private final Lock readLock;
|
||||||
private final Lock writeLock;
|
private final Lock writeLock;
|
||||||
@ -175,6 +178,8 @@ public abstract class TaskAttemptImpl implements
|
|||||||
private int shufflePort = -1;
|
private int shufflePort = -1;
|
||||||
private String trackerName;
|
private String trackerName;
|
||||||
private int httpPort;
|
private int httpPort;
|
||||||
|
private Locality locality;
|
||||||
|
private Avataar avataar;
|
||||||
|
|
||||||
private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
|
private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
|
||||||
new CleanupContainerTransition();
|
new CleanupContainerTransition();
|
||||||
@ -532,8 +537,16 @@ public TaskAttemptImpl(TaskId taskId, int i,
|
|||||||
getMemoryRequired(conf, taskId.getTaskType()));
|
getMemoryRequired(conf, taskId.getTaskType()));
|
||||||
this.resourceCapability.setVirtualCores(
|
this.resourceCapability.setVirtualCores(
|
||||||
getCpuRequired(conf, taskId.getTaskType()));
|
getCpuRequired(conf, taskId.getTaskType()));
|
||||||
this.dataLocalHosts = dataLocalHosts;
|
|
||||||
|
this.dataLocalHosts = resolveHosts(dataLocalHosts);
|
||||||
RackResolver.init(conf);
|
RackResolver.init(conf);
|
||||||
|
this.dataLocalRacks = new HashSet<String>();
|
||||||
|
for (String host : this.dataLocalHosts) {
|
||||||
|
this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation());
|
||||||
|
}
|
||||||
|
|
||||||
|
locality = Locality.OFF_SWITCH;
|
||||||
|
avataar = Avataar.VIRGIN;
|
||||||
|
|
||||||
// This "this leak" is okay because the retained pointer is in an
|
// This "this leak" is okay because the retained pointer is in an
|
||||||
// instance variable.
|
// instance variable.
|
||||||
@ -1032,6 +1045,23 @@ public TaskAttemptStateInternal getInternalState() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Locality getLocality() {
|
||||||
|
return locality;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLocality(Locality locality) {
|
||||||
|
this.locality = locality;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Avataar getAvataar()
|
||||||
|
{
|
||||||
|
return avataar;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAvataar(Avataar avataar) {
|
||||||
|
this.avataar = avataar;
|
||||||
|
}
|
||||||
|
|
||||||
private static TaskAttemptState getExternalState(
|
private static TaskAttemptState getExternalState(
|
||||||
TaskAttemptStateInternal smState) {
|
TaskAttemptStateInternal smState) {
|
||||||
switch (smState) {
|
switch (smState) {
|
||||||
@ -1232,25 +1262,27 @@ public void transition(TaskAttemptImpl taskAttempt,
|
|||||||
taskAttempt.attemptId,
|
taskAttempt.attemptId,
|
||||||
taskAttempt.resourceCapability));
|
taskAttempt.resourceCapability));
|
||||||
} else {
|
} else {
|
||||||
Set<String> racks = new HashSet<String>();
|
|
||||||
for (String host : taskAttempt.dataLocalHosts) {
|
|
||||||
racks.add(RackResolver.resolve(host).getNetworkLocation());
|
|
||||||
}
|
|
||||||
taskAttempt.eventHandler.handle(new ContainerRequestEvent(
|
taskAttempt.eventHandler.handle(new ContainerRequestEvent(
|
||||||
taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt
|
taskAttempt.attemptId, taskAttempt.resourceCapability,
|
||||||
.resolveHosts(taskAttempt.dataLocalHosts), racks
|
taskAttempt.dataLocalHosts.toArray(
|
||||||
.toArray(new String[racks.size()])));
|
new String[taskAttempt.dataLocalHosts.size()]),
|
||||||
|
taskAttempt.dataLocalRacks.toArray(
|
||||||
|
new String[taskAttempt.dataLocalRacks.size()])));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String[] resolveHosts(String[] src) {
|
protected Set<String> resolveHosts(String[] src) {
|
||||||
String[] result = new String[src.length];
|
Set<String> result = new HashSet<String>();
|
||||||
for (int i = 0; i < src.length; i++) {
|
if (src != null) {
|
||||||
if (isIP(src[i])) {
|
for (int i = 0; i < src.length; i++) {
|
||||||
result[i] = resolveHost(src[i]);
|
if (src[i] == null) {
|
||||||
} else {
|
continue;
|
||||||
result[i] = src[i];
|
} else if (isIP(src[i])) {
|
||||||
|
result.add(resolveHost(src[i]));
|
||||||
|
} else {
|
||||||
|
result.add(src[i]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
@ -1300,6 +1332,20 @@ public void transition(final TaskAttemptImpl taskAttempt,
|
|||||||
taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
|
taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
|
||||||
taskAttempt.taskAttemptListener.registerPendingTask(
|
taskAttempt.taskAttemptListener.registerPendingTask(
|
||||||
taskAttempt.remoteTask, taskAttempt.jvmID);
|
taskAttempt.remoteTask, taskAttempt.jvmID);
|
||||||
|
|
||||||
|
taskAttempt.locality = Locality.OFF_SWITCH;
|
||||||
|
if (taskAttempt.dataLocalHosts.size() > 0) {
|
||||||
|
String cHost = taskAttempt.resolveHost(
|
||||||
|
taskAttempt.containerNodeId.getHost());
|
||||||
|
if (taskAttempt.dataLocalHosts.contains(cHost)) {
|
||||||
|
taskAttempt.locality = Locality.NODE_LOCAL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (taskAttempt.locality == Locality.OFF_SWITCH) {
|
||||||
|
if (taskAttempt.dataLocalRacks.contains(taskAttempt.nodeRackName)) {
|
||||||
|
taskAttempt.locality = Locality.RACK_LOCAL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//launch the container
|
//launch the container
|
||||||
//create the container object to be launched for a given Task attempt
|
//create the container object to be launched for a given Task attempt
|
||||||
@ -1376,7 +1422,7 @@ public void transition(TaskAttemptImpl taskAttempt,
|
|||||||
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Not generating HistoryFinish event since start event not " +
|
LOG.debug("Not generating HistoryFinish event since start event not " +
|
||||||
"generated for taskAttempt: " + taskAttempt.getID());
|
"generated for taskAttempt: " + taskAttempt.getID());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1421,7 +1467,8 @@ public void transition(TaskAttemptImpl taskAttempt,
|
|||||||
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
|
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
|
||||||
taskAttempt.launchTime,
|
taskAttempt.launchTime,
|
||||||
nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
|
nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
|
||||||
taskAttempt.shufflePort, taskAttempt.containerID);
|
taskAttempt.shufflePort, taskAttempt.containerID,
|
||||||
|
taskAttempt.locality.toString(), taskAttempt.avataar.toString());
|
||||||
taskAttempt.eventHandler.handle
|
taskAttempt.eventHandler.handle
|
||||||
(new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
|
(new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
|
||||||
taskAttempt.eventHandler.handle
|
taskAttempt.eventHandler.handle
|
||||||
@ -1510,7 +1557,7 @@ public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
|
|||||||
// handling failed map/reduce events.
|
// handling failed map/reduce events.
|
||||||
}else {
|
}else {
|
||||||
LOG.debug("Not generating HistoryFinish event since start event not " +
|
LOG.debug("Not generating HistoryFinish event since start event not " +
|
||||||
"generated for taskAttempt: " + taskAttempt.getID());
|
"generated for taskAttempt: " + taskAttempt.getID());
|
||||||
}
|
}
|
||||||
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|
||||||
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
|
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
|
||||||
@ -1580,7 +1627,7 @@ public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
|
|||||||
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
||||||
}else {
|
}else {
|
||||||
LOG.debug("Not generating HistoryFinish event since start event not " +
|
LOG.debug("Not generating HistoryFinish event since start event not " +
|
||||||
"generated for taskAttempt: " + taskAttempt.getID());
|
"generated for taskAttempt: " + taskAttempt.getID());
|
||||||
}
|
}
|
||||||
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|
||||||
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
|
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
|
||||||
@ -1648,7 +1695,7 @@ public void transition(TaskAttemptImpl taskAttempt,
|
|||||||
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
||||||
}else {
|
}else {
|
||||||
LOG.debug("Not generating HistoryFinish event since start event not " +
|
LOG.debug("Not generating HistoryFinish event since start event not " +
|
||||||
"generated for taskAttempt: " + taskAttempt.getID());
|
"generated for taskAttempt: " + taskAttempt.getID());
|
||||||
}
|
}
|
||||||
// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
|
// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
|
||||||
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|
||||||
|
@ -46,6 +46,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
|
||||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
||||||
@ -594,8 +595,9 @@ protected TaskAttempt getSuccessfulAttempt() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// This is always called in the Write Lock
|
// This is always called in the Write Lock
|
||||||
private void addAndScheduleAttempt() {
|
private void addAndScheduleAttempt(Avataar avataar) {
|
||||||
TaskAttempt attempt = createAttempt();
|
TaskAttempt attempt = createAttempt();
|
||||||
|
((TaskAttemptImpl) attempt).setAvataar(avataar);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Created attempt " + attempt.getID());
|
LOG.debug("Created attempt " + attempt.getID());
|
||||||
}
|
}
|
||||||
@ -749,7 +751,7 @@ private static class InitialScheduleTransition
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void transition(TaskImpl task, TaskEvent event) {
|
public void transition(TaskImpl task, TaskEvent event) {
|
||||||
task.addAndScheduleAttempt();
|
task.addAndScheduleAttempt(Avataar.VIRGIN);
|
||||||
task.scheduledTime = task.clock.getTime();
|
task.scheduledTime = task.clock.getTime();
|
||||||
TaskStartedEvent tse = new TaskStartedEvent(
|
TaskStartedEvent tse = new TaskStartedEvent(
|
||||||
TypeConverter.fromYarn(task.taskId), task.getLaunchTime(),
|
TypeConverter.fromYarn(task.taskId), task.getLaunchTime(),
|
||||||
@ -772,7 +774,7 @@ private static class RedundantScheduleTransition
|
|||||||
@Override
|
@Override
|
||||||
public void transition(TaskImpl task, TaskEvent event) {
|
public void transition(TaskImpl task, TaskEvent event) {
|
||||||
LOG.info("Scheduling a redundant attempt for task " + task.taskId);
|
LOG.info("Scheduling a redundant attempt for task " + task.taskId);
|
||||||
task.addAndScheduleAttempt();
|
task.addAndScheduleAttempt(Avataar.SPECULATIVE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -849,7 +851,7 @@ public void transition(TaskImpl task, TaskEvent event) {
|
|||||||
task.finishedAttempts.add(taskAttemptId);
|
task.finishedAttempts.add(taskAttemptId);
|
||||||
task.inProgressAttempts.remove(taskAttemptId);
|
task.inProgressAttempts.remove(taskAttemptId);
|
||||||
if (task.successfulAttempt == null) {
|
if (task.successfulAttempt == null) {
|
||||||
task.addAndScheduleAttempt();
|
task.addAndScheduleAttempt(Avataar.VIRGIN);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -937,7 +939,7 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
|
|||||||
task.inProgressAttempts.remove(taskAttemptId);
|
task.inProgressAttempts.remove(taskAttemptId);
|
||||||
if (task.inProgressAttempts.size() == 0
|
if (task.inProgressAttempts.size() == 0
|
||||||
&& task.successfulAttempt == null) {
|
&& task.successfulAttempt == null) {
|
||||||
task.addAndScheduleAttempt();
|
task.addAndScheduleAttempt(Avataar.VIRGIN);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
task.handleTaskAttemptCompletion(
|
task.handleTaskAttemptCompletion(
|
||||||
@ -1053,7 +1055,7 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
|
|||||||
// from the map splitInfo. So the bad node might be sent as a location
|
// from the map splitInfo. So the bad node might be sent as a location
|
||||||
// to the RM. But the RM would ignore that just like it would ignore
|
// to the RM. But the RM would ignore that just like it would ignore
|
||||||
// currently pending container requests affinitized to bad nodes.
|
// currently pending container requests affinitized to bad nodes.
|
||||||
task.addAndScheduleAttempt();
|
task.addAndScheduleAttempt(Avataar.VIRGIN);
|
||||||
return TaskStateInternal.SCHEDULED;
|
return TaskStateInternal.SCHEDULED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -67,7 +67,6 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.util.RackResolver;
|
import org.apache.hadoop.yarn.util.RackResolver;
|
||||||
|
|
||||||
|
@ -33,6 +33,9 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mapreduce.JobACL;
|
import org.apache.hadoop.mapreduce.JobACL;
|
||||||
import org.apache.hadoop.mapreduce.JobContext;
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.EventType;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.JobStatus.State;
|
import org.apache.hadoop.mapreduce.JobStatus.State;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
@ -66,6 +69,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.state.StateMachine;
|
import org.apache.hadoop.yarn.state.StateMachine;
|
||||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||||
@ -105,6 +109,13 @@ public void testJobNoTasks() {
|
|||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
|
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
|
||||||
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
||||||
|
conf.set(MRJobConfig.WORKFLOW_ID, "testId");
|
||||||
|
conf.set(MRJobConfig.WORKFLOW_NAME, "testName");
|
||||||
|
conf.set(MRJobConfig.WORKFLOW_NODE_NAME, "testNodeName");
|
||||||
|
conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key1", "value1");
|
||||||
|
conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key2", "value2");
|
||||||
|
|
||||||
|
|
||||||
AsyncDispatcher dispatcher = new AsyncDispatcher();
|
AsyncDispatcher dispatcher = new AsyncDispatcher();
|
||||||
dispatcher.init(conf);
|
dispatcher.init(conf);
|
||||||
dispatcher.start();
|
dispatcher.start();
|
||||||
@ -114,6 +125,9 @@ public void testJobNoTasks() {
|
|||||||
commitHandler.init(conf);
|
commitHandler.init(conf);
|
||||||
commitHandler.start();
|
commitHandler.start();
|
||||||
|
|
||||||
|
JobSubmittedEventHandler jseHandler = new JobSubmittedEventHandler("testId",
|
||||||
|
"testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ");
|
||||||
|
dispatcher.register(EventType.class, jseHandler);
|
||||||
JobImpl job = createStubbedJob(conf, dispatcher, 0);
|
JobImpl job = createStubbedJob(conf, dispatcher, 0);
|
||||||
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
|
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
|
||||||
assertJobState(job, JobStateInternal.INITED);
|
assertJobState(job, JobStateInternal.INITED);
|
||||||
@ -121,6 +135,11 @@ public void testJobNoTasks() {
|
|||||||
assertJobState(job, JobStateInternal.SUCCEEDED);
|
assertJobState(job, JobStateInternal.SUCCEEDED);
|
||||||
dispatcher.stop();
|
dispatcher.stop();
|
||||||
commitHandler.stop();
|
commitHandler.stop();
|
||||||
|
try {
|
||||||
|
Assert.assertTrue(jseHandler.getAssertValue());
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Assert.fail("Workflow related attributes are not tested properly");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=20000)
|
@Test(timeout=20000)
|
||||||
@ -614,6 +633,67 @@ private static void assertJobState(JobImpl job, JobStateInternal state) {
|
|||||||
Assert.assertEquals(state, job.getInternalState());
|
Assert.assertEquals(state, job.getInternalState());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class JobSubmittedEventHandler implements
|
||||||
|
EventHandler<JobHistoryEvent> {
|
||||||
|
|
||||||
|
private String workflowId;
|
||||||
|
|
||||||
|
private String workflowName;
|
||||||
|
|
||||||
|
private String workflowNodeName;
|
||||||
|
|
||||||
|
private String workflowAdjacencies;
|
||||||
|
|
||||||
|
private Boolean assertBoolean;
|
||||||
|
|
||||||
|
public JobSubmittedEventHandler(String workflowId, String workflowName,
|
||||||
|
String workflowNodeName, String workflowAdjacencies) {
|
||||||
|
this.workflowId = workflowId;
|
||||||
|
this.workflowName = workflowName;
|
||||||
|
this.workflowNodeName = workflowNodeName;
|
||||||
|
this.workflowAdjacencies = workflowAdjacencies;
|
||||||
|
assertBoolean = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(JobHistoryEvent jhEvent) {
|
||||||
|
if (jhEvent.getType() != EventType.JOB_SUBMITTED) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
JobSubmittedEvent jsEvent = (JobSubmittedEvent) jhEvent.getHistoryEvent();
|
||||||
|
if (!workflowId.equals(jsEvent.getWorkflowId())) {
|
||||||
|
setAssertValue(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!workflowName.equals(jsEvent.getWorkflowName())) {
|
||||||
|
setAssertValue(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!workflowNodeName.equals(jsEvent.getWorkflowNodeName())) {
|
||||||
|
setAssertValue(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!workflowAdjacencies.equals(jsEvent.getWorkflowAdjacencies())) {
|
||||||
|
setAssertValue(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
setAssertValue(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void setAssertValue(Boolean bool) {
|
||||||
|
assertBoolean = bool;
|
||||||
|
notify();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized boolean getAssertValue() throws InterruptedException {
|
||||||
|
while (assertBoolean == null) {
|
||||||
|
wait();
|
||||||
|
}
|
||||||
|
return assertBoolean;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private static class StubbedJob extends JobImpl {
|
private static class StubbedJob extends JobImpl {
|
||||||
//override the init transition
|
//override the init transition
|
||||||
private final InitTransition initTransition;
|
private final InitTransition initTransition;
|
||||||
|
@ -48,6 +48,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.Locality;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||||
@ -157,6 +158,7 @@ public void testHostResolveAttempt() throws Exception {
|
|||||||
createMapTaskAttemptImplForTest(eventHandler, splitInfo);
|
createMapTaskAttemptImplForTest(eventHandler, splitInfo);
|
||||||
TaskAttemptImpl spyTa = spy(mockTaskAttempt);
|
TaskAttemptImpl spyTa = spy(mockTaskAttempt);
|
||||||
when(spyTa.resolveHost(hosts[0])).thenReturn("host1");
|
when(spyTa.resolveHost(hosts[0])).thenReturn("host1");
|
||||||
|
spyTa.dataLocalHosts = spyTa.resolveHosts(splitInfo.getLocations());
|
||||||
|
|
||||||
TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
|
TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
|
||||||
rct.transition(spyTa, mockTAEvent);
|
rct.transition(spyTa, mockTAEvent);
|
||||||
@ -360,6 +362,8 @@ public void testLaunchFailedWhileKilling() throws Exception {
|
|||||||
taImpl.handle(new TaskAttemptEvent(attemptId,
|
taImpl.handle(new TaskAttemptEvent(attemptId,
|
||||||
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
|
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
|
||||||
assertFalse(eventHandler.internalError);
|
assertFalse(eventHandler.internalError);
|
||||||
|
assertEquals("Task attempt is not assigned on the local node",
|
||||||
|
Locality.NODE_LOCAL, taImpl.getLocality());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -398,7 +402,7 @@ public void testContainerCleanedWhileRunning() throws Exception {
|
|||||||
mock(Token.class), new Credentials(),
|
mock(Token.class), new Credentials(),
|
||||||
new SystemClock(), appCtx);
|
new SystemClock(), appCtx);
|
||||||
|
|
||||||
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
|
NodeId nid = BuilderUtils.newNodeId("127.0.0.2", 0);
|
||||||
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
|
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
|
||||||
Container container = mock(Container.class);
|
Container container = mock(Container.class);
|
||||||
when(container.getId()).thenReturn(contId);
|
when(container.getId()).thenReturn(contId);
|
||||||
@ -416,6 +420,8 @@ public void testContainerCleanedWhileRunning() throws Exception {
|
|||||||
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
||||||
assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
|
assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
|
||||||
eventHandler.internalError);
|
eventHandler.internalError);
|
||||||
|
assertEquals("Task attempt is not assigned on the local rack",
|
||||||
|
Locality.RACK_LOCAL, taImpl.getLocality());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -439,7 +445,7 @@ public void testContainerCleanedWhileCommitting() throws Exception {
|
|||||||
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
|
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
|
||||||
|
|
||||||
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
|
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
|
||||||
when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
|
when(splits.getLocations()).thenReturn(new String[] {});
|
||||||
|
|
||||||
AppContext appCtx = mock(AppContext.class);
|
AppContext appCtx = mock(AppContext.class);
|
||||||
ClusterInfo clusterInfo = mock(ClusterInfo.class);
|
ClusterInfo clusterInfo = mock(ClusterInfo.class);
|
||||||
@ -475,6 +481,8 @@ public void testContainerCleanedWhileCommitting() throws Exception {
|
|||||||
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
||||||
assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
|
assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
|
||||||
eventHandler.internalError);
|
eventHandler.internalError);
|
||||||
|
assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH,
|
||||||
|
taImpl.getLocality());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -38,6 +38,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||||
@ -46,10 +47,12 @@
|
|||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
@ -254,6 +257,7 @@ private void scheduleTaskAttempt(TaskId taskId) {
|
|||||||
mockTask.handle(new TaskEvent(taskId,
|
mockTask.handle(new TaskEvent(taskId,
|
||||||
TaskEventType.T_SCHEDULE));
|
TaskEventType.T_SCHEDULE));
|
||||||
assertTaskScheduledState();
|
assertTaskScheduledState();
|
||||||
|
assertTaskAttemptAvataar(Avataar.VIRGIN);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void killTask(TaskId taskId) {
|
private void killTask(TaskId taskId) {
|
||||||
@ -338,6 +342,19 @@ private void assertTaskKillWaitState() {
|
|||||||
private void assertTaskSucceededState() {
|
private void assertTaskSucceededState() {
|
||||||
assertEquals(TaskState.SUCCEEDED, mockTask.getState());
|
assertEquals(TaskState.SUCCEEDED, mockTask.getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link Avataar}
|
||||||
|
*/
|
||||||
|
private void assertTaskAttemptAvataar(Avataar avataar) {
|
||||||
|
for (TaskAttempt taskAttempt : mockTask.getAttempts().values()) {
|
||||||
|
if (((TaskAttemptImpl) taskAttempt).getAvataar() == avataar) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fail("There is no " + (avataar == Avataar.VIRGIN ? "virgin" : "speculative")
|
||||||
|
+ "task attempt");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInit() {
|
public void testInit() {
|
||||||
@ -516,6 +533,9 @@ private void runSpeculativeTaskAttemptSucceeds(
|
|||||||
|
|
||||||
// The task should still be in the succeeded state
|
// The task should still be in the succeeded state
|
||||||
assertTaskSucceededState();
|
assertTaskSucceededState();
|
||||||
|
|
||||||
|
// The task should contain speculative a task attempt
|
||||||
|
assertTaskAttemptAvataar(Avataar.SPECULATIVE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -0,0 +1,24 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.v2.api.records;
|
||||||
|
|
||||||
|
public enum Avataar {
|
||||||
|
VIRGIN,
|
||||||
|
SPECULATIVE
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.v2.api.records;
|
||||||
|
|
||||||
|
public enum Locality {
|
||||||
|
NODE_LOCAL,
|
||||||
|
RACK_LOCAL,
|
||||||
|
OFF_SWITCH
|
||||||
|
}
|
@ -91,7 +91,11 @@
|
|||||||
"values": "string"
|
"values": "string"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{"name": "jobQueueName", "type": "string"}
|
{"name": "jobQueueName", "type": "string"},
|
||||||
|
{"name": "workflowId", "type": "string"},
|
||||||
|
{"name": "workflowName", "type": "string"},
|
||||||
|
{"name": "workflowNodeName", "type": "string"},
|
||||||
|
{"name": "workflowAdjacencies", "type": "string"}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
|
||||||
@ -191,7 +195,9 @@
|
|||||||
{"name": "trackerName", "type": "string"},
|
{"name": "trackerName", "type": "string"},
|
||||||
{"name": "httpPort", "type": "int"},
|
{"name": "httpPort", "type": "int"},
|
||||||
{"name": "shufflePort", "type": "int"},
|
{"name": "shufflePort", "type": "int"},
|
||||||
{"name": "containerId", "type": "string"}
|
{"name": "containerId", "type": "string"},
|
||||||
|
{"name": "locality", "type": "string"},
|
||||||
|
{"name": "avataar", "type": "string"}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
|
||||||
|
@ -647,5 +647,18 @@ public interface MRJobConfig {
|
|||||||
"$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*",
|
"$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*",
|
||||||
"$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*",
|
"$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*",
|
||||||
};
|
};
|
||||||
|
|
||||||
|
public static final String WORKFLOW_ID = "mapreduce.workflow.id";
|
||||||
|
|
||||||
|
public static final String WORKFLOW_NAME = "mapreduce.workflow.name";
|
||||||
|
|
||||||
|
public static final String WORKFLOW_NODE_NAME =
|
||||||
|
"mapreduce.workflow.node.name";
|
||||||
|
|
||||||
|
public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
|
||||||
|
"mapreduce.workflow.adjacency.";
|
||||||
|
|
||||||
|
public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
|
||||||
|
"^mapreduce\\.workflow\\.adjacency\\..+";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -52,6 +52,29 @@ public class JobSubmittedEvent implements HistoryEvent {
|
|||||||
public JobSubmittedEvent(JobID id, String jobName, String userName,
|
public JobSubmittedEvent(JobID id, String jobName, String userName,
|
||||||
long submitTime, String jobConfPath,
|
long submitTime, String jobConfPath,
|
||||||
Map<JobACL, AccessControlList> jobACLs, String jobQueueName) {
|
Map<JobACL, AccessControlList> jobACLs, String jobQueueName) {
|
||||||
|
this(id, jobName, userName, submitTime, jobConfPath, jobACLs,
|
||||||
|
jobQueueName, "", "", "", "");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an event to record job submission
|
||||||
|
* @param id The job Id of the job
|
||||||
|
* @param jobName Name of the job
|
||||||
|
* @param userName Name of the user who submitted the job
|
||||||
|
* @param submitTime Time of submission
|
||||||
|
* @param jobConfPath Path of the Job Configuration file
|
||||||
|
* @param jobACLs The configured acls for the job.
|
||||||
|
* @param jobQueueName The job-queue to which this job was submitted to
|
||||||
|
* @param workflowId The Id of the workflow
|
||||||
|
* @param workflowName The name of the workflow
|
||||||
|
* @param workflowNodeName The node name of the workflow
|
||||||
|
* @param workflowAdjacencies The adjacencies of the workflow
|
||||||
|
*/
|
||||||
|
public JobSubmittedEvent(JobID id, String jobName, String userName,
|
||||||
|
long submitTime, String jobConfPath,
|
||||||
|
Map<JobACL, AccessControlList> jobACLs, String jobQueueName,
|
||||||
|
String workflowId, String workflowName, String workflowNodeName,
|
||||||
|
String workflowAdjacencies) {
|
||||||
datum.jobid = new Utf8(id.toString());
|
datum.jobid = new Utf8(id.toString());
|
||||||
datum.jobName = new Utf8(jobName);
|
datum.jobName = new Utf8(jobName);
|
||||||
datum.userName = new Utf8(userName);
|
datum.userName = new Utf8(userName);
|
||||||
@ -66,6 +89,18 @@ public JobSubmittedEvent(JobID id, String jobName, String userName,
|
|||||||
if (jobQueueName != null) {
|
if (jobQueueName != null) {
|
||||||
datum.jobQueueName = new Utf8(jobQueueName);
|
datum.jobQueueName = new Utf8(jobQueueName);
|
||||||
}
|
}
|
||||||
|
if (workflowId != null) {
|
||||||
|
datum.workflowId = new Utf8(workflowId);
|
||||||
|
}
|
||||||
|
if (workflowName != null) {
|
||||||
|
datum.workflowName = new Utf8(workflowName);
|
||||||
|
}
|
||||||
|
if (workflowNodeName != null) {
|
||||||
|
datum.workflowNodeName = new Utf8(workflowNodeName);
|
||||||
|
}
|
||||||
|
if (workflowAdjacencies != null) {
|
||||||
|
datum.workflowAdjacencies = new Utf8(workflowAdjacencies);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
JobSubmittedEvent() {}
|
JobSubmittedEvent() {}
|
||||||
@ -105,6 +140,34 @@ public Map<JobACL, AccessControlList> getJobAcls() {
|
|||||||
}
|
}
|
||||||
return jobAcls;
|
return jobAcls;
|
||||||
}
|
}
|
||||||
|
/** Get the id of the workflow */
|
||||||
|
public String getWorkflowId() {
|
||||||
|
if (datum.workflowId != null) {
|
||||||
|
return datum.workflowId.toString();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
/** Get the name of the workflow */
|
||||||
|
public String getWorkflowName() {
|
||||||
|
if (datum.workflowName != null) {
|
||||||
|
return datum.workflowName.toString();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
/** Get the node name of the workflow */
|
||||||
|
public String getWorkflowNodeName() {
|
||||||
|
if (datum.workflowNodeName != null) {
|
||||||
|
return datum.workflowNodeName.toString();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
/** Get the adjacencies of the workflow */
|
||||||
|
public String getWorkflowAdjacencies() {
|
||||||
|
if (datum.workflowAdjacencies != null) {
|
||||||
|
return datum.workflowAdjacencies.toString();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
/** Get the event type */
|
/** Get the event type */
|
||||||
public EventType getEventType() { return EventType.JOB_SUBMITTED; }
|
public EventType getEventType() { return EventType.JOB_SUBMITTED; }
|
||||||
|
|
||||||
|
@ -46,10 +46,13 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
|
|||||||
* @param httpPort The port number of the tracker
|
* @param httpPort The port number of the tracker
|
||||||
* @param shufflePort The shuffle port number of the container
|
* @param shufflePort The shuffle port number of the container
|
||||||
* @param containerId The containerId for the task attempt.
|
* @param containerId The containerId for the task attempt.
|
||||||
|
* @param locality The locality of the task attempt
|
||||||
|
* @param avataar The avataar of the task attempt
|
||||||
*/
|
*/
|
||||||
public TaskAttemptStartedEvent( TaskAttemptID attemptId,
|
public TaskAttemptStartedEvent( TaskAttemptID attemptId,
|
||||||
TaskType taskType, long startTime, String trackerName,
|
TaskType taskType, long startTime, String trackerName,
|
||||||
int httpPort, int shufflePort, ContainerId containerId) {
|
int httpPort, int shufflePort, ContainerId containerId,
|
||||||
|
String locality, String avataar) {
|
||||||
datum.attemptId = new Utf8(attemptId.toString());
|
datum.attemptId = new Utf8(attemptId.toString());
|
||||||
datum.taskid = new Utf8(attemptId.getTaskID().toString());
|
datum.taskid = new Utf8(attemptId.getTaskID().toString());
|
||||||
datum.startTime = startTime;
|
datum.startTime = startTime;
|
||||||
@ -58,14 +61,21 @@ public TaskAttemptStartedEvent( TaskAttemptID attemptId,
|
|||||||
datum.httpPort = httpPort;
|
datum.httpPort = httpPort;
|
||||||
datum.shufflePort = shufflePort;
|
datum.shufflePort = shufflePort;
|
||||||
datum.containerId = new Utf8(containerId.toString());
|
datum.containerId = new Utf8(containerId.toString());
|
||||||
|
if (locality != null) {
|
||||||
|
datum.locality = new Utf8(locality);
|
||||||
|
}
|
||||||
|
if (avataar != null) {
|
||||||
|
datum.avataar = new Utf8(avataar);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Remove after MrV1 is removed.
|
// TODO Remove after MrV1 is removed.
|
||||||
// Using a dummy containerId to prevent jobHistory parse failures.
|
// Using a dummy containerId to prevent jobHistory parse failures.
|
||||||
public TaskAttemptStartedEvent(TaskAttemptID attemptId, TaskType taskType,
|
public TaskAttemptStartedEvent(TaskAttemptID attemptId, TaskType taskType,
|
||||||
long startTime, String trackerName, int httpPort, int shufflePort) {
|
long startTime, String trackerName, int httpPort, int shufflePort,
|
||||||
|
String locality, String avataar) {
|
||||||
this(attemptId, taskType, startTime, trackerName, httpPort, shufflePort,
|
this(attemptId, taskType, startTime, trackerName, httpPort, shufflePort,
|
||||||
ConverterUtils.toContainerId("container_-1_-1_-1_-1"));
|
ConverterUtils.toContainerId("container_-1_-1_-1_-1"), locality, avataar);
|
||||||
}
|
}
|
||||||
|
|
||||||
TaskAttemptStartedEvent() {}
|
TaskAttemptStartedEvent() {}
|
||||||
@ -105,4 +115,19 @@ public EventType getEventType() {
|
|||||||
public ContainerId getContainerId() {
|
public ContainerId getContainerId() {
|
||||||
return ConverterUtils.toContainerId(datum.containerId.toString());
|
return ConverterUtils.toContainerId(datum.containerId.toString());
|
||||||
}
|
}
|
||||||
|
/** Get the locality */
|
||||||
|
public String getLocality() {
|
||||||
|
if (datum.locality != null) {
|
||||||
|
return datum.locality.toString();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
/** Get the avataar */
|
||||||
|
public String getAvataar() {
|
||||||
|
if (datum.avataar != null) {
|
||||||
|
return datum.avataar.toString();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -67,6 +67,11 @@ public Hadoop20JHParser(InputStream input) throws IOException {
|
|||||||
reader = new LineReader(input);
|
reader = new LineReader(input);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Hadoop20JHParser(LineReader reader) throws IOException {
|
||||||
|
super();
|
||||||
|
this.reader = reader;
|
||||||
|
}
|
||||||
|
|
||||||
Map<String, HistoryEventEmitter> liveEmitters =
|
Map<String, HistoryEventEmitter> liveEmitters =
|
||||||
new HashMap<String, HistoryEventEmitter>();
|
new HashMap<String, HistoryEventEmitter>();
|
||||||
Queue<HistoryEvent> remainingEvents = new LinkedList<HistoryEvent>();
|
Queue<HistoryEvent> remainingEvents = new LinkedList<HistoryEvent>();
|
||||||
|
@ -76,6 +76,23 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
|
|||||||
}
|
}
|
||||||
String jobName = line.get("JOBNAME");
|
String jobName = line.get("JOBNAME");
|
||||||
String jobQueueName = line.get("JOB_QUEUE");// could be null
|
String jobQueueName = line.get("JOB_QUEUE");// could be null
|
||||||
|
String workflowId = line.get("WORKFLOW_ID");
|
||||||
|
if (workflowId == null) {
|
||||||
|
workflowId = "";
|
||||||
|
}
|
||||||
|
String workflowName = line.get("WORKFLOW_NAME");
|
||||||
|
if (workflowName == null) {
|
||||||
|
workflowName = "";
|
||||||
|
}
|
||||||
|
String workflowNodeName = line.get("WORKFLOW_NODE_NAME");
|
||||||
|
if (workflowNodeName == null) {
|
||||||
|
workflowNodeName = "";
|
||||||
|
}
|
||||||
|
String workflowAdjacencies = line.get("WORKFLOW_ADJACENCIES");
|
||||||
|
if (workflowAdjacencies == null) {
|
||||||
|
workflowAdjacencies = "";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
if (submitTime != null) {
|
if (submitTime != null) {
|
||||||
Job20LineHistoryEventEmitter that =
|
Job20LineHistoryEventEmitter that =
|
||||||
@ -86,7 +103,8 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
|
|||||||
Map<JobACL, AccessControlList> jobACLs =
|
Map<JobACL, AccessControlList> jobACLs =
|
||||||
new HashMap<JobACL, AccessControlList>();
|
new HashMap<JobACL, AccessControlList>();
|
||||||
return new JobSubmittedEvent(jobID, jobName, user,
|
return new JobSubmittedEvent(jobID, jobName, user,
|
||||||
that.originalSubmitTime, jobConf, jobACLs, jobQueueName);
|
that.originalSubmitTime, jobConf, jobACLs, jobQueueName,
|
||||||
|
workflowId, workflowName, workflowNodeName, workflowAdjacencies);
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
|
@ -65,6 +65,14 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
|
|||||||
String taskType = line.get("TASK_TYPE");
|
String taskType = line.get("TASK_TYPE");
|
||||||
String trackerName = line.get("TRACKER_NAME");
|
String trackerName = line.get("TRACKER_NAME");
|
||||||
String httpPort = line.get("HTTP_PORT");
|
String httpPort = line.get("HTTP_PORT");
|
||||||
|
String locality = line.get("LOCALITY");
|
||||||
|
if (locality == null) {
|
||||||
|
locality = "";
|
||||||
|
}
|
||||||
|
String avataar = line.get("AVATAAR");
|
||||||
|
if (avataar == null) {
|
||||||
|
avataar = "";
|
||||||
|
}
|
||||||
|
|
||||||
if (startTime != null && taskType != null) {
|
if (startTime != null && taskType != null) {
|
||||||
TaskAttempt20LineEventEmitter that =
|
TaskAttempt20LineEventEmitter that =
|
||||||
@ -79,7 +87,8 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
|
|||||||
.parseInt(httpPort);
|
.parseInt(httpPort);
|
||||||
|
|
||||||
return new TaskAttemptStartedEvent(taskAttemptID,
|
return new TaskAttemptStartedEvent(taskAttemptID,
|
||||||
that.originalTaskType, that.originalStartTime, trackerName, port, -1);
|
that.originalTaskType, that.originalStartTime, trackerName, port, -1,
|
||||||
|
locality, avataar);
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
|
Loading…
Reference in New Issue
Block a user