diff --git a/mapreduce/CHANGES.txt b/mapreduce/CHANGES.txt index 1188ee677a..cd185e19e5 100644 --- a/mapreduce/CHANGES.txt +++ b/mapreduce/CHANGES.txt @@ -37,6 +37,8 @@ Trunk (unreleased changes) IMPROVEMENTS + MAPREDUCE-2680. Display queue name in job client CLI. (acmurthy) + MAPREDUCE-2679. Minor changes to sync trunk with MR-279 branch. (acmurthy) MAPREDUCE-2400. Remove Cluster's dependency on JobTracker via a diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/JobClient.java b/mapreduce/src/java/org/apache/hadoop/mapred/JobClient.java index 245e362866..4a96877873 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/JobClient.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/JobClient.java @@ -755,10 +755,10 @@ public class JobClient extends CLI { */ public JobStatus[] getAllJobs() throws IOException { try { - Job jobs[] = cluster.getAllJobs(); + org.apache.hadoop.mapreduce.JobStatus[] jobs = cluster.getAllJobStatuses(); JobStatus[] stats = new JobStatus[jobs.length]; for (int i = 0; i < jobs.length; i++) { - stats[i] = JobStatus.downgrade(jobs[i].getStatus()); + stats[i] = JobStatus.downgrade(jobs[i]); } return stats; } catch (InterruptedException ie) { @@ -870,10 +870,10 @@ public class JobClient extends CLI { } void displayJobList(JobStatus[] jobs) { - System.out.printf("JobId\tState\tStartTime\tUserName\tPriority\tSchedulingInfo\n"); + System.out.printf("JobId\tState\tStartTime\tUserName\tQueue\tPriority\tSchedulingInfo\n"); for (JobStatus job : jobs) { - System.out.printf("%s\t%d\t%d\t%s\t%s\t%s\n", job.getJobID(), job.getRunState(), - job.getStartTime(), job.getUsername(), + System.out.printf("%s\t%d\t%d\t%s\t%s\t%s\t%s\n", job.getJobID(), job.getRunState(), + job.getStartTime(), job.getUsername(), job.getQueue(), job.getJobPriority().name(), job.getSchedulingInfo()); } } diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/JobStatus.java b/mapreduce/src/java/org/apache/hadoop/mapred/JobStatus.java index aea51706aa..90b68872ff 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/JobStatus.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/JobStatus.java @@ -156,11 +156,37 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus { float reduceProgress, float cleanupProgress, int runState, JobPriority jp, String user, String jobName, String jobFile, String trackingUrl) { - super(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress, - getEnum(runState), org.apache.hadoop.mapreduce.JobPriority.valueOf(jp.name()), - user, jobName, jobFile, trackingUrl); + this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress, + runState, jp, + user, jobName, "default", jobFile, trackingUrl); } + /** + * Create a job status object for a given jobid. + * @param jobid The jobid of the job + * @param setupProgress The progress made on the setup + * @param mapProgress The progress made on the maps + * @param reduceProgress The progress made on the reduces + * @param cleanupProgress The progress made on the cleanup + * @param runState The current state of the job + * @param jp Priority of the job. + * @param user userid of the person who submitted the job. + * @param jobName user-specified job name. + * @param queue job queue name. + * @param jobFile job configuration file. + * @param trackingUrl link to the web-ui for details of the job. + */ + public JobStatus(JobID jobid, float setupProgress, float mapProgress, + float reduceProgress, float cleanupProgress, + int runState, JobPriority jp, + String user, String jobName, String queue, + String jobFile, String trackingUrl) { + super(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress, + getEnum(runState), org.apache.hadoop.mapreduce.JobPriority.valueOf(jp.name()), + user, jobName, queue, jobFile, trackingUrl); + } + + public static JobStatus downgrade(org.apache.hadoop.mapreduce.JobStatus stat){ JobStatus old = new JobStatus(JobID.downgrade(stat.getJobID()), stat.getSetupProgress(), stat.getMapProgress(), stat.getReduceProgress(), diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java index fcc0ee1c63..23b350546e 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java @@ -224,11 +224,23 @@ public class Cluster { * @return array of {@link Job} * @throws IOException * @throws InterruptedException + * @deprecated Use {@link #getAllJobStatuses()} instead. */ + @Deprecated public Job[] getAllJobs() throws IOException, InterruptedException { return getJobs(client.getAllJobs()); } - + + /** + * Get job status for all jobs in the cluster. + * @return job status for all jobs in cluster + * @throws IOException + * @throws InterruptedException + */ + public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException { + return client.getAllJobs(); + } + /** * Grab the jobtracker system directory path where * job-specific files will be placed. diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/JobStatus.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/JobStatus.java index c02b3c10e4..9e438989cf 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/JobStatus.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/JobStatus.java @@ -78,6 +78,7 @@ public class JobStatus implements Writable, Cloneable { private State runState; private long startTime; private String user; + private String queue; private JobPriority priority; private String schedulingInfo="NA"; @@ -115,22 +116,48 @@ public class JobStatus implements Writable, Cloneable { float reduceProgress, float cleanupProgress, State runState, JobPriority jp, String user, String jobName, String jobFile, String trackingUrl) { - this.jobid = jobid; - this.setupProgress = setupProgress; - this.mapProgress = mapProgress; - this.reduceProgress = reduceProgress; - this.cleanupProgress = cleanupProgress; - this.runState = runState; - this.user = user; - if (jp == null) { - throw new IllegalArgumentException("Job Priority cannot be null."); - } - priority = jp; - this.jobName = jobName; - this.jobFile = jobFile; - this.trackingUrl = trackingUrl; + this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress, + runState, jp, user, jobName, "default", jobFile, trackingUrl); } - + + /** + * Create a job status object for a given jobid. + * @param jobid The jobid of the job + * @param setupProgress The progress made on the setup + * @param mapProgress The progress made on the maps + * @param reduceProgress The progress made on the reduces + * @param cleanupProgress The progress made on the cleanup + * @param runState The current state of the job + * @param jp Priority of the job. + * @param user userid of the person who submitted the job. + * @param jobName user-specified job name. + * @param queue queue name + * @param jobFile job configuration file. + * @param trackingUrl link to the web-ui for details of the job. + */ + public JobStatus(JobID jobid, float setupProgress, float mapProgress, + float reduceProgress, float cleanupProgress, + State runState, JobPriority jp, + String user, String jobName, String queue, + String jobFile, String trackingUrl) { + this.jobid = jobid; + this.setupProgress = setupProgress; + this.mapProgress = mapProgress; + this.reduceProgress = reduceProgress; + this.cleanupProgress = cleanupProgress; + this.runState = runState; + this.user = user; + this.queue = queue; + if (jp == null) { + throw new IllegalArgumentException("Job Priority cannot be null."); + } + priority = jp; + this.jobName = jobName; + this.jobFile = jobFile; + this.trackingUrl = trackingUrl; + } + + /** * Sets the map progress of this job * @param p The value of map progress to set to @@ -243,6 +270,22 @@ public class JobStatus implements Writable, Cloneable { this.jobACLs = acls; } + /** + * Set queue name + * @param queue queue name + */ + protected synchronized void setQueue(String queue) { + this.queue = queue; + } + + /** + * Get queue name + * @return queue name + */ + public synchronized String getQueue() { + return queue; + } + /** * @return Percentage of progress in maps */ diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/tools/CLI.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/tools/CLI.java index 76c477b486..0ac7053cba 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/tools/CLI.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/tools/CLI.java @@ -444,13 +444,13 @@ public class CLI extends Configured implements Tool { */ private void listJobs(Cluster cluster) throws IOException, InterruptedException { - List runningJobs = new ArrayList(); - for (Job job : cluster.getAllJobs()) { - if (!job.isComplete()) { + List runningJobs = new ArrayList(); + for (JobStatus job : cluster.getAllJobStatuses()) { + if (!job.isJobComplete()) { runningJobs.add(job); } } - displayJobList(runningJobs.toArray(new Job[0])); + displayJobList(runningJobs.toArray(new JobStatus[0])); } /** @@ -459,7 +459,7 @@ public class CLI extends Configured implements Tool { */ private void listAllJobs(Cluster cluster) throws IOException, InterruptedException { - displayJobList(cluster.getAllJobs()); + displayJobList(cluster.getAllJobStatuses()); } /** @@ -523,15 +523,16 @@ public class CLI extends Configured implements Tool { } } - protected void displayJobList(Job[] jobs) + protected void displayJobList(JobStatus[] jobs) throws IOException, InterruptedException { System.out.println("Total jobs:" + jobs.length); System.out.println("JobId\tState\tStartTime\t" + - "UserName\tPriority\tSchedulingInfo"); - for (Job job : jobs) { - System.out.printf("%s\t%s\t%d\t%s\t%s\t%s\n", job.getJobID().toString(), - job.getJobState(), job.getStartTime(), - job.getUser(), job.getPriority().name(), job.getSchedulingInfo()); + "UserName\tQueue\tPriority\tSchedulingInfo"); + for (JobStatus job : jobs) { + System.out.printf("%s\t%s\t%d\t%s\t%s\t%s\t%s\n", job.getJobID().toString(), + job.getState(), job.getStartTime(), + job.getUsername(), job.getQueue(), + job.getPriority().name(), job.getSchedulingInfo()); } }