MAPREDUCE-5732. Report proper queue when job has been automatically placed (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1562641 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-01-29 23:51:40 +00:00
parent ed551ff3c6
commit db80705719
17 changed files with 140 additions and 12 deletions

View File

@ -209,6 +209,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the
YARN resource model (Sandy Ryza) YARN resource model (Sandy Ryza)
MAPREDUCE-5732. Report proper queue when job has been automatically placed
(Sandy Ryza)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

View File

@ -525,6 +525,12 @@ protected void handleEvent(JobHistoryEvent event) {
JobInitedEvent jie = (JobInitedEvent) event.getHistoryEvent(); JobInitedEvent jie = (JobInitedEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setJobStartTime(jie.getLaunchTime()); mi.getJobIndexInfo().setJobStartTime(jie.getLaunchTime());
} }
if (event.getHistoryEvent().getEventType() == EventType.JOB_QUEUE_CHANGED) {
JobQueueChangeEvent jQueueEvent =
(JobQueueChangeEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setQueueName(jQueueEvent.getJobQueueName());
}
// If this is JobFinishedEvent, close the writer and setup the job-index // If this is JobFinishedEvent, close the writer and setup the job-index
if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) { if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {

View File

@ -39,7 +39,7 @@
/** /**
* Main interface to interact with the job. Provides only getters. * Main interface to interact with the job.
*/ */
public interface Job { public interface Job {
@ -98,4 +98,6 @@ public interface Job {
List<AMInfo> getAMInfos(); List<AMInfo> getAMInfos();
boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation); boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation);
public void setQueueName(String queueName);
} }

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent; import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobQueueChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
@ -181,7 +182,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final EventHandler eventHandler; private final EventHandler eventHandler;
private final MRAppMetrics metrics; private final MRAppMetrics metrics;
private final String userName; private final String userName;
private final String queueName; private String queueName;
private final long appSubmitTime; private final long appSubmitTime;
private final AppContext appContext; private final AppContext appContext;
@ -1123,6 +1124,13 @@ public String getQueueName() {
return queueName; return queueName;
} }
@Override
public void setQueueName(String queueName) {
this.queueName = queueName;
JobQueueChangeEvent jqce = new JobQueueChangeEvent(oldJobId, queueName);
eventHandler.handle(new JobHistoryEvent(jobId, jqce));
}
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see org.apache.hadoop.mapreduce.v2.app.job.Job#getConfFile() * @see org.apache.hadoop.mapreduce.v2.app.job.Job#getConfFile()

View File

@ -109,11 +109,11 @@ protected void serviceInit(Configuration conf) throws Exception {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
scheduler= createSchedulerProxy(); scheduler= createSchedulerProxy();
register();
startAllocatorThread();
JobID id = TypeConverter.fromYarn(this.applicationId); JobID id = TypeConverter.fromYarn(this.applicationId);
JobId jobId = TypeConverter.toYarn(id); JobId jobId = TypeConverter.toYarn(id);
job = context.getJob(jobId); job = context.getJob(jobId);
register();
startAllocatorThread();
super.serviceStart(); super.serviceStart();
} }
@ -161,6 +161,9 @@ protected void register() {
} }
this.applicationACLs = response.getApplicationACLs(); this.applicationACLs = response.getApplicationACLs();
LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory()); LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
String queue = response.getQueue();
LOG.info("queue: " + queue);
job.setQueueName(queue);
} catch (Exception are) { } catch (Exception are) {
LOG.error("Exception while registering", are); LOG.error("Exception while registering", are);
throw new YarnRuntimeException(are); throw new YarnRuntimeException(are);

View File

@ -81,6 +81,15 @@ public void testJobPriorityChange() throws Exception {
assertEquals(test.getPriority(), JobPriority.LOW); assertEquals(test.getPriority(), JobPriority.LOW);
} }
@Test(timeout = 10000)
public void testJobQueueChange() throws Exception {
org.apache.hadoop.mapreduce.JobID jid = new JobID("001", 1);
JobQueueChangeEvent test = new JobQueueChangeEvent(jid,
"newqueue");
assertEquals(test.getJobId().toString(), jid.toString());
assertEquals(test.getJobQueueName(), "newqueue");
}
/** /**
* simple test TaskUpdatedEvent and TaskUpdated * simple test TaskUpdatedEvent and TaskUpdated

View File

@ -117,6 +117,9 @@ public class MRApp extends MRAppMaster {
private File testWorkDir; private File testWorkDir;
private Path testAbsPath; private Path testAbsPath;
private ClusterInfo clusterInfo; private ClusterInfo clusterInfo;
// Queue to pretend the RM assigned us
private String assignedQueue;
public static String NM_HOST = "localhost"; public static String NM_HOST = "localhost";
public static int NM_PORT = 1234; public static int NM_PORT = 1234;
@ -133,7 +136,7 @@ public class MRApp extends MRAppMaster {
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, Clock clock) { boolean cleanOnStart, Clock clock) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock); this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock, null);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
@ -146,6 +149,12 @@ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart) { boolean cleanOnStart) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1); this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, String assignedQueue) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1,
new SystemClock(), assignedQueue);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, boolean unregistered) { boolean cleanOnStart, boolean unregistered) {
@ -178,7 +187,7 @@ private static ContainerId getContainerId(ApplicationId applicationId,
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) { boolean cleanOnStart, int startCount) {
this(maps, reduces, autoComplete, testName, cleanOnStart, startCount, this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
new SystemClock()); new SystemClock(), null);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
@ -191,33 +200,34 @@ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock, boolean unregistered) { boolean cleanOnStart, int startCount, Clock clock, boolean unregistered) {
this(getApplicationAttemptId(applicationId, startCount), getContainerId( this(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), maps, reduces, autoComplete, testName, applicationId, startCount), maps, reduces, autoComplete, testName,
cleanOnStart, startCount, clock, unregistered); cleanOnStart, startCount, clock, unregistered, null);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock) { boolean cleanOnStart, int startCount, Clock clock, String assignedQueue) {
this(getApplicationAttemptId(applicationId, startCount), getContainerId( this(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), maps, reduces, autoComplete, testName, applicationId, startCount), maps, reduces, autoComplete, testName,
cleanOnStart, startCount, clock, true); cleanOnStart, startCount, clock, true, assignedQueue);
} }
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName, int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, boolean unregistered) { boolean cleanOnStart, int startCount, boolean unregistered) {
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName, this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
cleanOnStart, startCount, new SystemClock(), unregistered); cleanOnStart, startCount, new SystemClock(), unregistered, null);
} }
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName, int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) { boolean cleanOnStart, int startCount) {
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName, this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
cleanOnStart, startCount, new SystemClock(), true); cleanOnStart, startCount, new SystemClock(), true, null);
} }
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName, int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock, boolean unregistered) { boolean cleanOnStart, int startCount, Clock clock, boolean unregistered,
String assignedQueue) {
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); .currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
this.testWorkDir = new File("target", testName); this.testWorkDir = new File("target", testName);
@ -239,6 +249,7 @@ public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
// If safeToReportTerminationToUser is set to true, we can verify whether // If safeToReportTerminationToUser is set to true, we can verify whether
// the job can reaches the final state when MRAppMaster shuts down. // the job can reaches the final state when MRAppMaster shuts down.
this.successfullyUnregistered.set(unregistered); this.successfullyUnregistered.set(unregistered);
this.assignedQueue = assignedQueue;
} }
@Override @Override
@ -285,6 +296,9 @@ public Job submit(Configuration conf, boolean mapSpeculative,
start(); start();
DefaultMetricsSystem.shutdown(); DefaultMetricsSystem.shutdown();
Job job = getContext().getAllJobs().values().iterator().next(); Job job = getContext().getAllJobs().values().iterator().next();
if (assignedQueue != null) {
job.setQueueName(assignedQueue);
}
// Write job.xml // Write job.xml
String jobFile = MRApps.getJobFile(conf, user, String jobFile = MRApps.getJobFile(conf, user,

View File

@ -39,6 +39,7 @@ public class MockAppContext implements AppContext {
final Map<JobId, Job> jobs; final Map<JobId, Job> jobs;
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
Set<String> blacklistedNodes; Set<String> blacklistedNodes;
String queue;
public MockAppContext(int appid) { public MockAppContext(int appid) {
appID = MockJobs.newAppID(appid); appID = MockJobs.newAppID(appid);

View File

@ -629,6 +629,11 @@ public Configuration loadConfFile() throws IOException {
jobConf.addResource(fc.open(configFile), configFile.toString()); jobConf.addResource(fc.open(configFile), configFile.toString());
return jobConf; return jobConf;
} }
@Override
public void setQueueName(String queueName) {
// do nothing
}
}; };
} }

View File

@ -505,6 +505,11 @@ public List<AMInfo> getAMInfos() {
public Configuration loadConfFile() { public Configuration loadConfFile() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public void setQueueName(String queueName) {
// do nothing
}
} }
/* /*

View File

@ -122,6 +122,13 @@
] ]
}, },
{"type": "record", "name": "JobQueueChange",
"fields": [
{"name": "jobid", "type": "string"},
{"name": "jobQueueName", "type": "string"}
]
},
{"type": "record", "name": "JobUnsuccessfulCompletion", {"type": "record", "name": "JobUnsuccessfulCompletion",
"fields": [ "fields": [
{"name": "jobid", "type": "string"}, {"name": "jobid", "type": "string"},
@ -267,6 +274,7 @@
"JOB_FINISHED", "JOB_FINISHED",
"JOB_PRIORITY_CHANGED", "JOB_PRIORITY_CHANGED",
"JOB_STATUS_CHANGED", "JOB_STATUS_CHANGED",
"JOB_QUEUE_CHANGED",
"JOB_FAILED", "JOB_FAILED",
"JOB_KILLED", "JOB_KILLED",
"JOB_ERROR", "JOB_ERROR",
@ -306,6 +314,7 @@
"JobInited", "JobInited",
"AMStarted", "AMStarted",
"JobPriorityChange", "JobPriorityChange",
"JobQueueChange",
"JobStatusChanged", "JobStatusChanged",
"JobSubmitted", "JobSubmitted",
"JobUnsuccessfulCompletion", "JobUnsuccessfulCompletion",

View File

@ -98,6 +98,8 @@ public HistoryEvent getNextEvent() throws IOException {
result = new JobFinishedEvent(); break; result = new JobFinishedEvent(); break;
case JOB_PRIORITY_CHANGED: case JOB_PRIORITY_CHANGED:
result = new JobPriorityChangeEvent(); break; result = new JobPriorityChangeEvent(); break;
case JOB_QUEUE_CHANGED:
result = new JobQueueChangeEvent(); break;
case JOB_STATUS_CHANGED: case JOB_STATUS_CHANGED:
result = new JobStatusChangedEvent(); break; result = new JobStatusChangedEvent(); break;
case JOB_FAILED: case JOB_FAILED:

View File

@ -183,6 +183,9 @@ public void handleEvent(HistoryEvent event) {
case JOB_PRIORITY_CHANGED: case JOB_PRIORITY_CHANGED:
handleJobPriorityChangeEvent((JobPriorityChangeEvent) event); handleJobPriorityChangeEvent((JobPriorityChangeEvent) event);
break; break;
case JOB_QUEUE_CHANGED:
handleJobQueueChangeEvent((JobQueueChangeEvent) event);
break;
case JOB_FAILED: case JOB_FAILED:
case JOB_KILLED: case JOB_KILLED:
case JOB_ERROR: case JOB_ERROR:
@ -385,6 +388,10 @@ private void handleJobFinishedEvent(JobFinishedEvent event) {
private void handleJobPriorityChangeEvent(JobPriorityChangeEvent event) { private void handleJobPriorityChangeEvent(JobPriorityChangeEvent event) {
info.priority = event.getPriority(); info.priority = event.getPriority();
} }
private void handleJobQueueChangeEvent(JobQueueChangeEvent event) {
info.jobQueueName = event.getJobQueueName();
}
private void handleJobInitedEvent(JobInitedEvent event) { private void handleJobInitedEvent(JobInitedEvent event) {
info.launchTime = event.getLaunchTime(); info.launchTime = event.getLaunchTime();

View File

@ -453,4 +453,9 @@ public List<AMInfo> getAMInfos() {
} }
return amInfos; return amInfos;
} }
@Override
public void setQueueName(String queueName) {
throw new UnsupportedOperationException("Can't set job's queue name in history");
}
} }

View File

@ -190,5 +190,10 @@ public Map<JobACL, AccessControlList> getJobACLs() {
public List<AMInfo> getAMInfos() { public List<AMInfo> getAMInfos() {
return null; return null;
} }
@Override
public void setQueueName(String queueName) {
throw new UnsupportedOperationException("Can't set job's queue name in history");
}
} }

View File

@ -155,6 +155,41 @@ public void testJobHistoryEventHandlerIsFirstServiceToStop() {
Assert.assertEquals("JobHistoryEventHandler", Assert.assertEquals("JobHistoryEventHandler",
services[services.length - 1].getName()); services[services.length - 1].getName());
} }
@Test
public void testAssignedQueue() throws Exception {
Configuration conf = new Configuration();
MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(),
true, "assignedQueue");
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
app.waitForState(job, JobState.SUCCEEDED);
//make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
/*
* Use HistoryContext to read logged events and verify the number of
* completed maps
*/
HistoryContext context = new JobHistory();
// test start and stop states
((JobHistory)context).init(conf);
((JobHistory)context).start();
Assert.assertTrue( context.getStartTime()>0);
Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STARTED);
// get job before stopping JobHistory
Job parsedJob = context.getJob(jobId);
// stop JobHistory
((JobHistory)context).stop();
Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STOPPED);
Assert.assertEquals("QueueName not correct", "assignedQueue",
parsedJob.getQueueName());
}
private void verifyTask(Task task) { private void verifyTask(Task task) {
Assert.assertEquals("Task state not currect", TaskState.SUCCEEDED, Assert.assertEquals("Task state not currect", TaskState.SUCCEEDED,
@ -184,6 +219,11 @@ public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
super(maps, reduces, autoComplete, testName, cleanOnStart); super(maps, reduces, autoComplete, testName, cleanOnStart);
} }
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, String assignedQueue) {
super(maps, reduces, autoComplete, testName, cleanOnStart, assignedQueue);
}
@Override @Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler( protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) { AppContext context) {

View File

@ -415,5 +415,9 @@ public boolean checkAccess(UserGroupInformation callerUGI,
return aclsMgr.checkAccess(callerUGI, jobOperation, return aclsMgr.checkAccess(callerUGI, jobOperation,
this.getUserName(), jobAcls.get(jobOperation)); this.getUserName(), jobAcls.get(jobOperation));
} }
@Override
public void setQueueName(String queueName) {
}
} }
} }