MAPREDUCE-6103.Adding reservation APIs to MR resource manager delegate. Contributed by Subru Krishnan and Carlo Curino.
(cherry picked from commit aa92dd45f2d8c89a8a17ad2e4449aa3ff08bc53a)
This commit is contained in:
parent
6261f7cc69
commit
3f282762d1
@ -23,3 +23,6 @@ subsystem with the scheduler. (Subru Krishnan and Carlo Curino via subru)
|
|||||||
|
|
||||||
YARN-2080. Integrating reservation system with ResourceManager and
|
YARN-2080. Integrating reservation system with ResourceManager and
|
||||||
client-RM protocol. (Subru Krishnan and Carlo Curino via subru)
|
client-RM protocol. (Subru Krishnan and Carlo Curino via subru)
|
||||||
|
|
||||||
|
MAPREDUCE-6103. Adding reservation APIs to MR resource manager
|
||||||
|
delegate. (Subru Krishnan and Carlo Curino via subru)
|
||||||
|
@ -38,6 +38,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.task.JobContextImpl;
|
import org.apache.hadoop.mapreduce.task.JobContextImpl;
|
||||||
import org.apache.hadoop.mapreduce.util.ConfigUtil;
|
import org.apache.hadoop.mapreduce.util.ConfigUtil;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The job submitter's view of the Job.
|
* The job submitter's view of the Job.
|
||||||
@ -112,6 +113,7 @@ public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
|
|||||||
private JobStatus status;
|
private JobStatus status;
|
||||||
private long statustime;
|
private long statustime;
|
||||||
private Cluster cluster;
|
private Cluster cluster;
|
||||||
|
private ReservationId reservationId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated Use {@link #getInstance()}
|
* @deprecated Use {@link #getInstance()}
|
||||||
@ -1524,4 +1526,23 @@ public boolean isUber() throws IOException, InterruptedException {
|
|||||||
return status.isUber();
|
return status.isUber();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the reservation to which the job is submitted to, if any
|
||||||
|
*
|
||||||
|
* @return the reservationId the identifier of the job's reservation, null if
|
||||||
|
* the job does not have any reservation associated with it
|
||||||
|
*/
|
||||||
|
public ReservationId getReservationId() {
|
||||||
|
return reservationId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the reservation to which the job is submitted to
|
||||||
|
*
|
||||||
|
* @param reservationId the reservationId to set
|
||||||
|
*/
|
||||||
|
public void setReservationId(ReservationId reservationId) {
|
||||||
|
this.reservationId = reservationId;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -47,6 +47,7 @@
|
|||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.QueueACL;
|
import org.apache.hadoop.mapred.QueueACL;
|
||||||
|
|
||||||
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
|
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
|
||||||
|
|
||||||
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
|
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
|
||||||
@ -60,6 +61,7 @@
|
|||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
import org.codehaus.jackson.JsonParseException;
|
import org.codehaus.jackson.JsonParseException;
|
||||||
import org.codehaus.jackson.map.JsonMappingException;
|
import org.codehaus.jackson.map.JsonMappingException;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
@ -427,6 +429,12 @@ JobStatus submitJobInternal(Job job, Cluster cluster)
|
|||||||
trackingIds.toArray(new String[trackingIds.size()]));
|
trackingIds.toArray(new String[trackingIds.size()]));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set reservation info if it exists
|
||||||
|
ReservationId reservationId = job.getReservationId();
|
||||||
|
if (reservationId != null) {
|
||||||
|
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
|
||||||
|
}
|
||||||
|
|
||||||
// Write job file to submit dir
|
// Write job file to submit dir
|
||||||
writeConf(conf, submitJobFile);
|
writeConf(conf, submitJobFile);
|
||||||
|
|
||||||
|
@ -63,6 +63,8 @@ public interface MRJobConfig {
|
|||||||
|
|
||||||
public static final String QUEUE_NAME = "mapreduce.job.queuename";
|
public static final String QUEUE_NAME = "mapreduce.job.queuename";
|
||||||
|
|
||||||
|
public static final String RESERVATION_ID = "mapreduce.job.reservation.id";
|
||||||
|
|
||||||
public static final String JOB_TAGS = "mapreduce.job.tags";
|
public static final String JOB_TAGS = "mapreduce.job.tags";
|
||||||
|
|
||||||
public static final String JVM_NUMTASKS_TORUN = "mapreduce.job.jvm.numtasks";
|
public static final String JVM_NUMTASKS_TORUN = "mapreduce.job.jvm.numtasks";
|
||||||
|
@ -43,6 +43,12 @@
|
|||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
@ -406,4 +412,22 @@ public void moveApplicationAcrossQueues(ApplicationId appId, String queue)
|
|||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
client.moveApplicationAcrossQueues(appId, queue);
|
client.moveApplicationAcrossQueues(appId, queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReservationSubmissionResponse submitReservation(
|
||||||
|
ReservationSubmissionRequest request) throws YarnException, IOException {
|
||||||
|
return client.submitReservation(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReservationUpdateResponse updateReservation(
|
||||||
|
ReservationUpdateRequest request) throws YarnException, IOException {
|
||||||
|
return client.updateReservation(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReservationDeleteResponse deleteReservation(
|
||||||
|
ReservationDeleteRequest request) throws YarnException, IOException {
|
||||||
|
return client.deleteReservation(request);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -76,6 +76,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
@ -489,6 +490,26 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
|
|||||||
appContext.setQueue( // Queue name
|
appContext.setQueue( // Queue name
|
||||||
jobConf.get(JobContext.QUEUE_NAME,
|
jobConf.get(JobContext.QUEUE_NAME,
|
||||||
YarnConfiguration.DEFAULT_QUEUE_NAME));
|
YarnConfiguration.DEFAULT_QUEUE_NAME));
|
||||||
|
// add reservationID if present
|
||||||
|
ReservationId reservationID = null;
|
||||||
|
try {
|
||||||
|
reservationID =
|
||||||
|
ReservationId.parseReservationId(jobConf
|
||||||
|
.get(JobContext.RESERVATION_ID));
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
// throw exception as reservationid as is invalid
|
||||||
|
String errMsg =
|
||||||
|
"Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID)
|
||||||
|
+ " specified for the app: " + applicationId;
|
||||||
|
LOG.warn(errMsg);
|
||||||
|
throw new IOException(errMsg);
|
||||||
|
}
|
||||||
|
if (reservationID != null) {
|
||||||
|
appContext.setReservationID(reservationID);
|
||||||
|
LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId
|
||||||
|
+ " to queue:" + appContext.getQueue() + " with reservationId:"
|
||||||
|
+ appContext.getReservationID());
|
||||||
|
}
|
||||||
appContext.setApplicationName( // Job name
|
appContext.setApplicationName( // Job name
|
||||||
jobConf.get(JobContext.JOB_NAME,
|
jobConf.get(JobContext.JOB_NAME,
|
||||||
YarnConfiguration.DEFAULT_APPLICATION_NAME));
|
YarnConfiguration.DEFAULT_APPLICATION_NAME));
|
||||||
@ -503,6 +524,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
|
|||||||
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
|
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
|
||||||
appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
|
appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
|
||||||
}
|
}
|
||||||
|
|
||||||
return appContext;
|
return appContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,6 +102,12 @@
|
|||||||
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
@ -397,6 +403,24 @@ public GetContainersResponse getContainers(GetContainersRequest request)
|
|||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReservationSubmissionResponse submitReservation(
|
||||||
|
ReservationSubmissionRequest request) throws YarnException, IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReservationUpdateResponse updateReservation(
|
||||||
|
ReservationUpdateRequest request) throws YarnException, IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReservationDeleteResponse deleteReservation(
|
||||||
|
ReservationDeleteRequest request) throws YarnException, IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class HistoryService extends AMService implements HSClientProtocol {
|
class HistoryService extends AMService implements HSClientProtocol {
|
||||||
|
Loading…
Reference in New Issue
Block a user