MAPREDUCE-5129. Allow tags to JobHistory for deeper analytics. Contributed by Billie Rinaldi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1465188 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-04-06 05:30:56 +00:00
parent a9d515aed8
commit 7f13207ed1
7 changed files with 62 additions and 5 deletions

View File

@ -166,6 +166,9 @@ Release 2.0.5-alpha - UNRELEASED
IMPROVEMENTS
MAPREDUCE-5129. Allow tags to JobHistory for deeper analytics. (billie via
acmurthy)
OPTIMIZATIONS
BUG FIXES

View File

@ -1305,7 +1305,8 @@ public JobStateInternal transition(JobImpl job, JobEvent event) {
job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
getWorkflowAdjacencies(job.conf));
getWorkflowAdjacencies(job.conf),
job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?

View File

@ -114,6 +114,7 @@ public void testJobNoTasks() {
conf.set(MRJobConfig.WORKFLOW_NODE_NAME, "testNodeName");
conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key1", "value1");
conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key2", "value2");
conf.set(MRJobConfig.WORKFLOW_TAGS, "tag1,tag2");
AsyncDispatcher dispatcher = new AsyncDispatcher();
@ -126,7 +127,8 @@ public void testJobNoTasks() {
commitHandler.start();
JobSubmittedEventHandler jseHandler = new JobSubmittedEventHandler("testId",
"testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ");
"testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ",
"tag1,tag2");
dispatcher.register(EventType.class, jseHandler);
JobImpl job = createStubbedJob(conf, dispatcher, 0);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
@ -706,14 +708,18 @@ private static class JobSubmittedEventHandler implements
private String workflowAdjacencies;
private String workflowTags;
private Boolean assertBoolean;
public JobSubmittedEventHandler(String workflowId, String workflowName,
String workflowNodeName, String workflowAdjacencies) {
String workflowNodeName, String workflowAdjacencies,
String workflowTags) {
this.workflowId = workflowId;
this.workflowName = workflowName;
this.workflowNodeName = workflowNodeName;
this.workflowAdjacencies = workflowAdjacencies;
this.workflowTags = workflowTags;
assertBoolean = null;
}
@ -739,6 +745,10 @@ public void handle(JobHistoryEvent jhEvent) {
setAssertValue(false);
return;
}
if (!workflowTags.equals(jsEvent.getWorkflowTags())) {
setAssertValue(false);
return;
}
setAssertValue(true);
}

View File

@ -95,7 +95,8 @@
{"name": "workflowId", "type": "string"},
{"name": "workflowName", "type": "string"},
{"name": "workflowNodeName", "type": "string"},
{"name": "workflowAdjacencies", "type": "string"}
{"name": "workflowAdjacencies", "type": "string"},
{"name": "workflowTags", "type": "string"}
]
},

View File

@ -664,6 +664,8 @@ public interface MRJobConfig {
public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
"^mapreduce\\.workflow\\.adjacency\\..+";
public static final String WORKFLOW_TAGS = "mapreduce.workflow.tags";
/**
* The maximum number of application attempts.
* It is a application-specific setting.

View File

@ -75,6 +75,31 @@ public JobSubmittedEvent(JobID id, String jobName, String userName,
Map<JobACL, AccessControlList> jobACLs, String jobQueueName,
String workflowId, String workflowName, String workflowNodeName,
String workflowAdjacencies) {
this(id, jobName, userName, submitTime, jobConfPath, jobACLs,
jobQueueName, workflowId, workflowName, workflowNodeName,
workflowAdjacencies, "");
}
/**
* Create an event to record job submission
* @param id The job Id of the job
* @param jobName Name of the job
* @param userName Name of the user who submitted the job
* @param submitTime Time of submission
* @param jobConfPath Path of the Job Configuration file
* @param jobACLs The configured acls for the job.
* @param jobQueueName The job-queue to which this job was submitted to
* @param workflowId The Id of the workflow
* @param workflowName The name of the workflow
* @param workflowNodeName The node name of the workflow
* @param workflowAdjacencies The adjacencies of the workflow
* @param workflowTags Comma-separated tags for the workflow
*/
public JobSubmittedEvent(JobID id, String jobName, String userName,
long submitTime, String jobConfPath,
Map<JobACL, AccessControlList> jobACLs, String jobQueueName,
String workflowId, String workflowName, String workflowNodeName,
String workflowAdjacencies, String workflowTags) {
datum.jobid = new Utf8(id.toString());
datum.jobName = new Utf8(jobName);
datum.userName = new Utf8(userName);
@ -101,6 +126,9 @@ public JobSubmittedEvent(JobID id, String jobName, String userName,
if (workflowAdjacencies != null) {
datum.workflowAdjacencies = new Utf8(workflowAdjacencies);
}
if (workflowTags != null) {
datum.workflowTags = new Utf8(workflowTags);
}
}
JobSubmittedEvent() {}
@ -168,6 +196,13 @@ public String getWorkflowAdjacencies() {
}
return null;
}
/** Get the workflow tags */
public String getWorkflowTags() {
if (datum.workflowTags != null) {
return datum.workflowTags.toString();
}
return null;
}
/** Get the event type */
public EventType getEventType() { return EventType.JOB_SUBMITTED; }

View File

@ -92,6 +92,10 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
if (workflowAdjacencies == null) {
workflowAdjacencies = "";
}
String workflowTags = line.get("WORKFLOW_TAGS");
if (workflowTags == null) {
workflowTags = "";
}
if (submitTime != null) {
@ -104,7 +108,8 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
new HashMap<JobACL, AccessControlList>();
return new JobSubmittedEvent(jobID, jobName, user,
that.originalSubmitTime, jobConf, jobACLs, jobQueueName,
workflowId, workflowName, workflowNodeName, workflowAdjacencies);
workflowId, workflowName, workflowNodeName, workflowAdjacencies,
workflowTags);
}
return null;