From f634505d48d97e4d461980d68a0cbdf87223646d Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 24 Nov 2015 22:07:26 +0000 Subject: [PATCH] MAPREDUCE-5870. Support for passing Job priority through Application Submission Context in Mapreduce Side. Contributed by Sunil G --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/local/LocalContainerAllocator.java | 6 ++ .../v2/app/job/impl/TestJobImpl.java | 2 +- .../local/TestLocalContainerAllocator.java | 5 +- .../hadoop/mapreduce/TypeConverter.java | 48 +++++++++- .../hadoop/mapreduce/TestTypeConverter.java | 4 + .../org/apache/hadoop/mapred/JobConf.java | 94 +++++++++++++++++-- .../org/apache/hadoop/mapred/JobPriority.java | 6 ++ .../org/apache/hadoop/mapred/JobStatus.java | 2 +- .../java/org/apache/hadoop/mapreduce/Job.java | 67 +++++++++++-- .../apache/hadoop/mapreduce/JobPriority.java | 7 +- .../apache/hadoop/mapreduce/tools/CLI.java | 29 ++++-- .../org/apache/hadoop/mapred/TestJobConf.java | 47 +++++++++- .../org/apache/hadoop/mapreduce/TestJob.java | 2 +- .../org/apache/hadoop/mapred/YARNRunner.java | 19 +++- .../apache/hadoop/mapred/TestYARNRunner.java | 25 +++++ .../hadoop/mapreduce/TestMRJobClient.java | 5 +- .../hadoop/mapreduce/v2/TestMRJobs.java | 63 +++++++++++++ 18 files changed, 402 insertions(+), 32 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 3cbacc4027..ef4bb18cb2 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -434,6 +434,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6499. Add elapsed time for retired job in JobHistoryServer WebUI. (Lin Yiqun via aajisaka) + MAPREDUCE-5870. Support for passing Job priority through Application + Submission Context in Mapreduce Side (Sunil G via jlowe) + OPTIMIZATIONS MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java index aed1023314..74373570bd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.ClientRMProxy; @@ -146,6 +147,11 @@ protected synchronized void heartbeat() throws Exception { if (token != null) { updateAMRMToken(token); } + Priority priorityFromResponse = Priority.newInstance(allocateResponse + .getApplicationPriority().getPriority()); + + // Update the job priority to Job directly. + getJob().setJobPriority(priorityFromResponse); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 9e1892046b..33c5c48ecf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -914,7 +914,7 @@ public void testJobPriorityUpdate() throws Exception { assertJobState(job, JobStateInternal.RUNNING); // Update priority of job to 8, and see whether its updated - Priority updatedPriority = Priority.newInstance(5); + Priority updatedPriority = Priority.newInstance(8); job.setJobPriority(updatedPriority); assertJobState(job, JobStateInternal.RUNNING); Priority jobPriority = job.getReport().getJobPriority(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java index 167d804dee..38df8f03c5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.event.EventHandler; @@ -245,7 +246,7 @@ public AllocateResponse allocate(AllocateRequest request) amToken.getIdentifier(), amToken.getKind().toString(), amToken.getPassword(), amToken.getService().toString()); } - return AllocateResponse.newInstance(responseId, + AllocateResponse response = AllocateResponse.newInstance(responseId, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), @@ -254,6 +255,8 @@ public AllocateResponse allocate(AllocateRequest request) yarnToken, Collections.emptyList(), Collections.emptyList()); + response.setApplicationPriority(Priority.newInstance(0)); + return response; } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java index 88f61b0e4f..4af5b8929f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java @@ -84,6 +84,25 @@ public static JobId toYarn(org.apache.hadoop.mapreduce.JobID id) { return jobId; } + public static int toYarnApplicationPriority(String priority) { + JobPriority jobPriority = JobPriority.valueOf(priority); + switch (jobPriority) { + case VERY_HIGH : + return 5; + case HIGH : + return 4; + case NORMAL : + return 3; + case LOW : + return 2; + case VERY_LOW : + return 1; + case DEFAULT : + return 0; + } + throw new IllegalArgumentException("Unrecognized priority: " + priority); + } + private static String fromClusterTimeStamp(long clusterTimeStamp) { return Long.toString(clusterTimeStamp); } @@ -165,6 +184,8 @@ public static Phase toYarn(org.apache.hadoop.mapred.TaskStatus.Phase phase) { return Phase.REDUCE; case CLEANUP: return Phase.CLEANUP; + default: + break; } throw new YarnRuntimeException("Unrecognized Phase: " + phase); } @@ -327,10 +348,33 @@ private static JobPriority fromYarnPriority(int priority) { return JobPriority.VERY_LOW; case 0 : return JobPriority.DEFAULT; + default : + break; } return JobPriority.UNDEFINED_PRIORITY; } + public static org.apache.hadoop.mapreduce.JobPriority + fromYarnApplicationPriority(int priority) { + switch (priority) { + case 5 : + return org.apache.hadoop.mapreduce.JobPriority.VERY_HIGH; + case 4 : + return org.apache.hadoop.mapreduce.JobPriority.HIGH; + case 3 : + return org.apache.hadoop.mapreduce.JobPriority.NORMAL; + case 2 : + return org.apache.hadoop.mapreduce.JobPriority.LOW; + case 1 : + return org.apache.hadoop.mapreduce.JobPriority.VERY_LOW; + case 0 : + return org.apache.hadoop.mapreduce.JobPriority.DEFAULT; + default : + break; + } + return org.apache.hadoop.mapreduce.JobPriority.UNDEFINED_PRIORITY; + } + public static org.apache.hadoop.mapreduce.QueueState fromYarn( QueueState state) { org.apache.hadoop.mapreduce.QueueState qState = @@ -462,7 +506,9 @@ public static JobStatus fromYarn(ApplicationReport application, TypeConverter.fromYarn(application.getApplicationId()), 0.0f, 0.0f, 0.0f, 0.0f, TypeConverter.fromYarn(application.getYarnApplicationState(), application.getFinalApplicationStatus()), - org.apache.hadoop.mapreduce.JobPriority.NORMAL, + fromYarnApplicationPriority( + (application.getPriority() == null) ? 0 : + application.getPriority().getPriority()), application.getUser(), application.getName(), application.getQueue(), jobFile, trackingUrl, false ); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java index 60ce170177..2027d01b70 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java @@ -85,6 +85,7 @@ public void testFromYarn() throws Exception { applicationReport.setStartTime(appStartTime); applicationReport.setFinishTime(appFinishTime); applicationReport.setUser("TestTypeConverter-user"); + applicationReport.setPriority(Priority.newInstance(3)); ApplicationResourceUsageReport appUsageRpt = Records .newRecord(ApplicationResourceUsageReport.class); Resource r = Records.newRecord(Resource.class); @@ -99,6 +100,7 @@ public void testFromYarn() throws Exception { Assert.assertEquals(appStartTime, jobStatus.getStartTime()); Assert.assertEquals(appFinishTime, jobStatus.getFinishTime()); Assert.assertEquals(state.toString(), jobStatus.getState().toString()); + Assert.assertEquals(JobPriority.NORMAL, jobStatus.getPriority()); } @Test @@ -113,6 +115,7 @@ public void testFromYarnApplicationReport() { when(mockReport.getYarnApplicationState()).thenReturn(YarnApplicationState.KILLED); when(mockReport.getUser()).thenReturn("dummy-user"); when(mockReport.getQueue()).thenReturn("dummy-queue"); + when(mockReport.getPriority()).thenReturn(Priority.newInstance(4)); String jobFile = "dummy-path/job.xml"; try { @@ -146,6 +149,7 @@ public void testFromYarnApplicationReport() { Assert.assertEquals("num used slots info set incorrectly", 3, status.getNumUsedSlots()); Assert.assertEquals("rsvd mem info set incorrectly", 2048, status.getReservedMem()); Assert.assertEquals("used mem info set incorrectly", 2048, status.getUsedMem()); + Assert.assertEquals("priority set incorrectly", JobPriority.HIGH, status.getPriority()); } @Test diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java index 059593ae2a..2cfce1fb51 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java @@ -1559,25 +1559,105 @@ public void setMaxReduceTaskFailuresPercent(int percent) { /** * Set {@link JobPriority} for this job. - * + * * @param prio the {@link JobPriority} for this job. */ public void setJobPriority(JobPriority prio) { set(JobContext.PRIORITY, prio.toString()); } - + + /** + * Set {@link JobPriority} for this job. + * + * @param prio the {@link JobPriority} for this job. + */ + public void setJobPriorityAsInteger(int prio) { + set(JobContext.PRIORITY, Integer.toString(prio)); + } + /** * Get the {@link JobPriority} for this job. - * + * * @return the {@link JobPriority} for this job. */ public JobPriority getJobPriority() { String prio = get(JobContext.PRIORITY); - if(prio == null) { - return JobPriority.NORMAL; + if (prio == null) { + return JobPriority.DEFAULT; } - - return JobPriority.valueOf(prio); + + JobPriority priority = JobPriority.DEFAULT; + try { + priority = JobPriority.valueOf(prio); + } catch (IllegalArgumentException e) { + return convertToJobPriority(Integer.parseInt(prio)); + } + return priority; + } + + /** + * Get the priority for this job. + * + * @return the priority for this job. + */ + public int getJobPriorityAsInteger() { + String priority = get(JobContext.PRIORITY); + if (priority == null) { + return 0; + } + + int jobPriority = 0; + try { + jobPriority = convertPriorityToInteger(priority); + } catch (IllegalArgumentException e) { + return Integer.parseInt(priority); + } + return jobPriority; + } + + private int convertPriorityToInteger(String priority) { + JobPriority jobPriority = JobPriority.valueOf(priority); + switch (jobPriority) { + case VERY_HIGH : + return 5; + case HIGH : + return 4; + case NORMAL : + return 3; + case LOW : + return 2; + case VERY_LOW : + return 1; + case DEFAULT : + return 0; + default: + break; + } + + // If a user sets the priority as "UNDEFINED_PRIORITY", we can return + // 0 which is also default value. + return 0; + } + + private JobPriority convertToJobPriority(int priority) { + switch (priority) { + case 5 : + return JobPriority.VERY_HIGH; + case 4 : + return JobPriority.HIGH; + case 3 : + return JobPriority.NORMAL; + case 2 : + return JobPriority.LOW; + case 1 : + return JobPriority.VERY_LOW; + case 0 : + return JobPriority.DEFAULT; + default: + break; + } + + return JobPriority.UNDEFINED_PRIORITY; } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobPriority.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobPriority.java index b76d46d517..cdc1855c6a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobPriority.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobPriority.java @@ -22,6 +22,12 @@ /** * Used to describe the priority of the running job. + * DEFAULT : While submitting a job, if the user is not specifying priority, + * YARN has the capability to pick the default priority as per its config. + * Hence MapReduce can indicate such cases with this new enum. + * UNDEFINED_PRIORITY : YARN supports priority as an integer. Hence other than + * the five defined enums, YARN can consider other integers also. To generalize + * such cases, this specific enum is used. */ @InterfaceAudience.Public @InterfaceStability.Stable diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java index bf2577d4a9..5eebf35c79 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java @@ -148,7 +148,7 @@ public JobStatus(JobID jobid, float mapProgress, float reduceProgress, String user, String jobName, String jobFile, String trackingUrl) { this(jobid, mapProgress, reduceProgress, cleanupProgress, runState, - JobPriority.NORMAL, user, jobName, jobFile, trackingUrl); + JobPriority.DEFAULT, user, jobName, jobFile, trackingUrl); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index 9eea4cc76b..ded9d651bb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -407,7 +407,7 @@ public String getSchedulingInfo() { /** * Get scheduling info of the job. * - * @return the scheduling info of the job + * @return the priority info of the job */ public JobPriority getPriority() throws IOException, InterruptedException { ensureState(JobState.RUNNING); @@ -635,27 +635,78 @@ public void killJob() throws IOException { /** * Set the priority of a running job. - * @param priority the new priority for the job. + * @param jobPriority the new priority for the job. * @throws IOException */ - public void setPriority(JobPriority priority) - throws IOException, InterruptedException { + public void setPriority(JobPriority jobPriority) throws IOException, + InterruptedException { if (state == JobState.DEFINE) { - conf.setJobPriority( - org.apache.hadoop.mapred.JobPriority.valueOf(priority.name())); + if (jobPriority == JobPriority.UNDEFINED_PRIORITY) { + conf.setJobPriorityAsInteger(convertPriorityToInteger(jobPriority)); + } else { + conf.setJobPriority(org.apache.hadoop.mapred.JobPriority + .valueOf(jobPriority.name())); + } } else { ensureState(JobState.RUNNING); - final JobPriority tmpPriority = priority; + final int tmpPriority = convertPriorityToInteger(jobPriority); ugi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws IOException, InterruptedException { - cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString()); + cluster.getClient() + .setJobPriority(getJobID(), Integer.toString(tmpPriority)); return null; } }); } } + /** + * Set the priority of a running job. + * + * @param jobPriority + * the new priority for the job. + * @throws IOException + */ + public void setPriorityAsInteger(int jobPriority) throws IOException, + InterruptedException { + if (state == JobState.DEFINE) { + conf.setJobPriorityAsInteger(jobPriority); + } else { + ensureState(JobState.RUNNING); + final int tmpPriority = jobPriority; + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws IOException, InterruptedException { + cluster.getClient() + .setJobPriority(getJobID(), Integer.toString(tmpPriority)); + return null; + } + }); + } + } + + private int convertPriorityToInteger(JobPriority jobPriority) { + switch (jobPriority) { + case VERY_HIGH : + return 5; + case HIGH : + return 4; + case NORMAL : + return 3; + case LOW : + return 2; + case VERY_LOW : + return 1; + case DEFAULT : + return 0; + default: + break; + } + // For UNDEFINED_PRIORITY, we can set it to default for better handling + return 0; + } + /** * Get events indicating completion (success/failure) of component tasks. * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobPriority.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobPriority.java index 71785686e4..4069f1f30a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobPriority.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobPriority.java @@ -22,7 +22,12 @@ /** * Used to describe the priority of the running job. - * + * DEFAULT : While submitting a job, if the user is not specifying priority, + * YARN has the capability to pick the default priority as per its config. + * Hence MapReduce can indicate such cases with this new enum. + * UNDEFINED_PRIORITY : YARN supports priority as an integer. Hence other than + * the five defined enums, YARN can consider other integers also. To generalize + * such cases, this specific enum is used. */ @InterfaceAudience.Public @InterfaceStability.Evolving diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java index 2feb0d18e2..59f5adacd4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java @@ -97,6 +97,7 @@ public int run(String[] argv) throws Exception { String taskState = null; int fromEvent = 0; int nEvents = 0; + int jpvalue = 0; boolean getStatus = false; boolean getCounter = false; boolean killJob = false; @@ -149,11 +150,15 @@ public int run(String[] argv) throws Exception { } jobid = argv[1]; try { - jp = JobPriority.valueOf(argv[2]); + jp = JobPriority.valueOf(argv[2]); } catch (IllegalArgumentException iae) { - LOG.info(iae); - displayUsage(cmd); - return exitCode; + try { + jpvalue = Integer.parseInt(argv[2]); + } catch (NumberFormatException ne) { + LOG.info(ne); + displayUsage(cmd); + return exitCode; + } } setJobPriority = true; } else if ("-events".equals(cmd)) { @@ -322,7 +327,11 @@ public int run(String[] argv) throws Exception { if (job == null) { System.out.println("Could not find job " + jobid); } else { - job.setPriority(jp); + if (jp != null) { + job.setPriority(jp); + } else { + job.setPriorityAsInteger(jpvalue); + } System.out.println("Changed job priority."); exitCode = 0; } @@ -408,6 +417,10 @@ Cluster createCluster() throws IOException { private String getJobPriorityNames() { StringBuffer sb = new StringBuffer(); for (JobPriority p : JobPriority.values()) { + // UNDEFINED_PRIORITY need not to be displayed in usage + if (JobPriority.UNDEFINED_PRIORITY == p) { + continue; + } sb.append(p.name()).append(" "); } return sb.substring(0, sb.length()-1); @@ -444,7 +457,8 @@ private void displayUsage(String cmd) { } else if ("-set-priority".equals(cmd)) { System.err.println(prefix + "[" + cmd + " ]. " + "Valid values for priorities are: " - + jobPriorityValues); + + jobPriorityValues + + ". In addition to this, integers also can be used."); } else if ("-list-active-trackers".equals(cmd)) { System.err.println(prefix + "[" + cmd + "]"); } else if ("-list-blacklisted-trackers".equals(cmd)) { @@ -465,7 +479,8 @@ private void displayUsage(String cmd) { System.err.printf("\t[-counter ]%n"); System.err.printf("\t[-kill ]%n"); System.err.printf("\t[-set-priority ]. " + - "Valid values for priorities are: " + jobPriorityValues + "%n"); + "Valid values for priorities are: " + jobPriorityValues + + ". In addition to this, integers also can be used." + "%n"); System.err.printf("\t[-events <#-of-events>]%n"); System.err.printf("\t[-history ]%n"); System.err.printf("\t[-list [all]]%n"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobConf.java index 57fa4e69d8..58092f2f60 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobConf.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobConf.java @@ -99,7 +99,7 @@ public void testJobConf() { assertEquals(70, conf.getMaxReduceTaskFailuresPercent()); // by default - assertEquals(JobPriority.NORMAL.name(), conf.getJobPriority().name()); + assertEquals(JobPriority.DEFAULT.name(), conf.getJobPriority().name()); conf.setJobPriority(JobPriority.HIGH); assertEquals(JobPriority.HIGH.name(), conf.getJobPriority().name()); @@ -377,4 +377,49 @@ public void testParseMaximumHeapSizeMB() { Assert.assertEquals(-1, JobConf.parseMaximumHeapSizeMB("-Xmx4?")); Assert.assertEquals(-1, JobConf.parseMaximumHeapSizeMB("")); } + + /** + * Test various Job Priority + */ + @Test + public void testJobPriorityConf() { + JobConf conf = new JobConf(); + + // by default + assertEquals(JobPriority.DEFAULT.name(), conf.getJobPriority().name()); + assertEquals(0, conf.getJobPriorityAsInteger()); + // Set JobPriority.LOW using old API, and verify output from both getter + conf.setJobPriority(JobPriority.LOW); + assertEquals(JobPriority.LOW.name(), conf.getJobPriority().name()); + assertEquals(2, conf.getJobPriorityAsInteger()); + + // Set JobPriority.VERY_HIGH using old API, and verify output + conf.setJobPriority(JobPriority.VERY_HIGH); + assertEquals(JobPriority.VERY_HIGH.name(), conf.getJobPriority().name()); + assertEquals(5, conf.getJobPriorityAsInteger()); + + // Set 3 as priority using new API, and verify output from both getter + conf.setJobPriorityAsInteger(3); + assertEquals(JobPriority.NORMAL.name(), conf.getJobPriority().name()); + assertEquals(3, conf.getJobPriorityAsInteger()); + + // Set 4 as priority using new API, and verify output + conf.setJobPriorityAsInteger(4); + assertEquals(JobPriority.HIGH.name(), conf.getJobPriority().name()); + assertEquals(4, conf.getJobPriorityAsInteger()); + // Now set some high integer values and verify output from old api + conf.setJobPriorityAsInteger(57); + assertEquals(JobPriority.UNDEFINED_PRIORITY.name(), conf.getJobPriority() + .name()); + assertEquals(57, conf.getJobPriorityAsInteger()); + + // Error case where UNDEFINED_PRIORITY is set explicitly + conf.setJobPriority(JobPriority.UNDEFINED_PRIORITY); + assertEquals(JobPriority.UNDEFINED_PRIORITY.name(), conf.getJobPriority() + .name()); + + // As UNDEFINED_PRIORITY cannot be mapped to any integer value, resetting + // to default as 0. + assertEquals(0, conf.getJobPriorityAsInteger()); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java index 94f49acf97..71bacf7472 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java @@ -41,7 +41,7 @@ public void testJobToString() throws IOException, InterruptedException { when(cluster.getClient()).thenReturn(client); JobID jobid = new JobID("1014873536921", 6); JobStatus status = new JobStatus(jobid, 0.0f, 0.0f, 0.0f, 0.0f, - State.FAILED, JobPriority.NORMAL, "root", "TestJobToString", + State.FAILED, JobPriority.DEFAULT, "root", "TestJobToString", "job file", "tracking url"); when(client.getJobStatus(jobid)).thenReturn(status); when(client.getTaskReports(jobid, TaskType.MAP)).thenReturn( diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 84a27d996c..f15b5c1979 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -562,13 +562,30 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( appContext.setApplicationTags(new HashSet(tagsFromConf)); } + String jobPriority = jobConf.get(MRJobConfig.PRIORITY); + if (jobPriority != null) { + int iPriority; + try { + iPriority = TypeConverter.toYarnApplicationPriority(jobPriority); + } catch (IllegalArgumentException e) { + iPriority = Integer.parseInt(jobPriority); + } + appContext.setPriority(Priority.newInstance(iPriority)); + } + return appContext; } @Override public void setJobPriority(JobID arg0, String arg1) throws IOException, InterruptedException { - resMgrDelegate.setJobPriority(arg0, arg1); + ApplicationId appId = TypeConverter.toYarn(arg0).getAppId(); + try { + resMgrDelegate.updateApplicationPriority(appId, + Priority.newInstance(Integer.parseInt(arg1))); + } catch (YarnException e) { + throw new IOException(e); + } } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index 010446f8b8..3293a9d515 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; @@ -604,6 +605,30 @@ public void testAMStandardEnv() throws Exception { assertEquals("Bad SHELL setting", USER_SHELL, shell); } + @Test + public void testJobPriority() throws Exception { + JobConf jobConf = new JobConf(); + + jobConf.set(MRJobConfig.PRIORITY, "LOW"); + + YARNRunner yarnRunner = new YARNRunner(jobConf); + ApplicationSubmissionContext appSubCtx = buildSubmitContext(yarnRunner, + jobConf); + + // 2 corresponds to LOW + assertEquals(appSubCtx.getPriority(), Priority.newInstance(2)); + + // Set an integer explicitly + jobConf.set(MRJobConfig.PRIORITY, "12"); + + yarnRunner = new YARNRunner(jobConf); + appSubCtx = buildSubmitContext(yarnRunner, + jobConf); + + // Verify whether 12 is set to submission context + assertEquals(appSubCtx.getPriority(), Priority.newInstance(12)); + } + private ApplicationSubmissionContext buildSubmitContext( YARNRunner yarnRunner, JobConf jobConf) throws IOException { File jobxml = new File(testWorkDir, MRJobConfig.JOB_CONF_FILE); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java index 191195bd3b..2382239035 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java @@ -527,8 +527,9 @@ public void testChangingJobPriority(String jobId, Configuration conf) exitCode = runTool(conf, createJobClient(), new String[] { "-set-priority", jobId, "VERY_LOW" }, new ByteArrayOutputStream()); assertEquals("Exit code", 0, exitCode); - // because this method does not implemented still. - verifyJobPriority(jobId, "NORMAL", conf, createJobClient()); + // set-priority is fired after job is completed in YARN, hence need not + // have to update the priority. + verifyJobPriority(jobId, "DEFAULT", conf, createJobClient()); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index 2973c39897..e685c65342 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -159,6 +160,7 @@ public static void setup() throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir"); + conf.setInt("yarn.cluster.max-application-priority", 10); mrCluster.init(conf); mrCluster.start(); } @@ -242,6 +244,67 @@ private void testSleepJobInternal(boolean useRemoteJar) throws Exception { // JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value } + @Test(timeout = 3000000) + public void testJobWithChangePriority() throws Exception { + + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; + } + + Configuration sleepConf = new Configuration(mrCluster.getConfig()); + // set master address to local to test that local mode applied if framework + // equals local + sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); + sleepConf + .setInt("yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms", 5); + + SleepJob sleepJob = new SleepJob(); + sleepJob.setConf(sleepConf); + Job job = sleepJob.createJob(1, 1, 1000, 20, 50, 1); + + job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. + job.setJarByClass(SleepJob.class); + job.setMaxMapAttempts(1); // speed up failures + job.submit(); + + // Set the priority to HIGH + job.setPriority(JobPriority.HIGH); + waitForPriorityToUpdate(job, JobPriority.HIGH); + // Verify the priority from job itself + Assert.assertEquals(job.getPriority(), JobPriority.HIGH); + + // Change priority to NORMAL (3) with new api + job.setPriorityAsInteger(3); // Verify the priority from job itself + waitForPriorityToUpdate(job, JobPriority.NORMAL); + Assert.assertEquals(job.getPriority(), JobPriority.NORMAL); + + // Change priority to a high integer value with new api + job.setPriorityAsInteger(89); // Verify the priority from job itself + waitForPriorityToUpdate(job, JobPriority.UNDEFINED_PRIORITY); + Assert.assertEquals(job.getPriority(), JobPriority.UNDEFINED_PRIORITY); + + boolean succeeded = job.waitForCompletion(true); + Assert.assertTrue(succeeded); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + } + + private void waitForPriorityToUpdate(Job job, JobPriority expectedStatus) + throws IOException, InterruptedException { + // Max wait time to get the priority update can be kept as 20sec (100 * + // 100ms) + int waitCnt = 200; + while (waitCnt-- > 0) { + if (job.getPriority().equals(expectedStatus)) { + // Stop waiting as priority is updated. + break; + } else { + Thread.sleep(100); + } + } + } + @Test(timeout = 300000) public void testConfVerificationWithClassloader() throws Exception { testConfVerification(true, false, false, false);