From 3f282762d1afc916de9207d3adeda852ca344853 Mon Sep 17 00:00:00 2001 From: subru Date: Wed, 24 Sep 2014 18:01:38 -0700 Subject: [PATCH] MAPREDUCE-6103.Adding reservation APIs to MR resource manager delegate. Contributed by Subru Krishnan and Carlo Curino. (cherry picked from commit aa92dd45f2d8c89a8a17ad2e4449aa3ff08bc53a) --- YARN-1051-CHANGES.txt | 3 +++ .../java/org/apache/hadoop/mapreduce/Job.java | 21 ++++++++++++++++ .../apache/hadoop/mapreduce/JobSubmitter.java | 8 +++++++ .../apache/hadoop/mapreduce/MRJobConfig.java | 2 ++ .../hadoop/mapred/ResourceMgrDelegate.java | 24 +++++++++++++++++++ .../org/apache/hadoop/mapred/YARNRunner.java | 22 +++++++++++++++++ .../hadoop/mapred/TestClientRedirect.java | 24 +++++++++++++++++++ 7 files changed, 104 insertions(+) diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt index c4106b277e..6a271972e6 100644 --- a/YARN-1051-CHANGES.txt +++ b/YARN-1051-CHANGES.txt @@ -23,3 +23,6 @@ subsystem with the scheduler. (Subru Krishnan and Carlo Curino via subru) YARN-2080. Integrating reservation system with ResourceManager and client-RM protocol. (Subru Krishnan and Carlo Curino via subru) + +MAPREDUCE-6103. Adding reservation APIs to MR resource manager +delegate. (Subru Krishnan and Carlo Curino via subru) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index 3f8d13928c..cfc34377d4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.util.ConfigUtil; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ReservationId; /** * The job submitter's view of the Job. @@ -112,6 +113,7 @@ public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } private JobStatus status; private long statustime; private Cluster cluster; + private ReservationId reservationId; /** * @deprecated Use {@link #getInstance()} @@ -1523,5 +1525,24 @@ public boolean isUber() throws IOException, InterruptedException { updateStatus(); return status.isUber(); } + + /** + * Get the reservation to which the job is submitted to, if any + * + * @return the reservationId the identifier of the job's reservation, null if + * the job does not have any reservation associated with it + */ + public ReservationId getReservationId() { + return reservationId; + } + + /** + * Set the reservation to which the job is submitted to + * + * @param reservationId the reservationId to set + */ + public void setReservationId(ReservationId reservationId) { + this.reservationId = reservationId; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java index 6cd569a65c..d80521c594 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java @@ -47,6 +47,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.QueueACL; + import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; @@ -60,6 +61,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; @@ -427,6 +429,12 @@ JobStatus submitJobInternal(Job job, Cluster cluster) trackingIds.toArray(new String[trackingIds.size()])); } + // Set reservation info if it exists + ReservationId reservationId = job.getReservationId(); + if (reservationId != null) { + conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString()); + } + // Write job file to submit dir writeConf(conf, submitJobFile); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 562120f6fe..5b623b59df 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -63,6 +63,8 @@ public interface MRJobConfig { public static final String QUEUE_NAME = "mapreduce.job.queuename"; + public static final String RESERVATION_ID = "mapreduce.job.reservation.id"; + public static final String JOB_TAGS = "mapreduce.job.tags"; public static final String JVM_NUMTASKS_TORUN = "mapreduce.job.jvm.numtasks"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index b76d0f39e1..803390fa05 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -43,6 +43,12 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -406,4 +412,22 @@ public void moveApplicationAcrossQueues(ApplicationId appId, String queue) throws YarnException, IOException { client.moveApplicationAcrossQueues(appId, queue); } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return client.submitReservation(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return client.updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return client.deleteReservation(request); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 5120c85f41..9419d039dd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -489,6 +490,26 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( appContext.setQueue( // Queue name jobConf.get(JobContext.QUEUE_NAME, YarnConfiguration.DEFAULT_QUEUE_NAME)); + // add reservationID if present + ReservationId reservationID = null; + try { + reservationID = + ReservationId.parseReservationId(jobConf + .get(JobContext.RESERVATION_ID)); + } catch (NumberFormatException e) { + // throw exception as reservationid as is invalid + String errMsg = + "Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID) + + " specified for the app: " + applicationId; + LOG.warn(errMsg); + throw new IOException(errMsg); + } + if (reservationID != null) { + appContext.setReservationID(reservationID); + LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId + + " to queue:" + appContext.getQueue() + " with reservationId:" + + appContext.getReservationID()); + } appContext.setApplicationName( // Job name jobConf.get(JobContext.JOB_NAME, YarnConfiguration.DEFAULT_APPLICATION_NAME)); @@ -503,6 +524,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( if (tagsFromConf != null && !tagsFromConf.isEmpty()) { appContext.setApplicationTags(new HashSet(tagsFromConf)); } + return appContext; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java index 69ede3a4d0..5663a81cd1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java @@ -102,6 +102,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -397,6 +403,24 @@ public GetContainersResponse getContainers(GetContainersRequest request) throws YarnException, IOException { return null; } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return null; + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return null; + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return null; + } } class HistoryService extends AMService implements HSClientProtocol {