YARN-6050. AMs can't be scheduled on racks or nodes (rkanter)
This commit is contained in:
parent
64ea62c1cc
commit
9bae6720cb
@ -22,6 +22,7 @@
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
@ -589,7 +590,8 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
|
||||
amResourceRequest.setCapability(capability);
|
||||
amResourceRequest.setNumContainers(1);
|
||||
amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
|
||||
appContext.setAMContainerResourceRequest(amResourceRequest);
|
||||
appContext.setAMContainerResourceRequests(
|
||||
Collections.singletonList(amResourceRequest));
|
||||
}
|
||||
// set labels for the Job containers
|
||||
appContext.setNodeLabelExpression(jobConf
|
||||
|
@ -571,7 +571,7 @@ public void testNodeLabelExp() throws Exception {
|
||||
buildSubmitContext(yarnRunner, jobConf);
|
||||
|
||||
assertEquals(appSubCtx.getNodeLabelExpression(), "GPU");
|
||||
assertEquals(appSubCtx.getAMContainerResourceRequest()
|
||||
assertEquals(appSubCtx.getAMContainerResourceRequests().get(0)
|
||||
.getNodeLabelExpression(), "highMem");
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@ -100,7 +102,7 @@ public static ApplicationSubmissionContext newInstance(
|
||||
amReq.setNumContainers(1);
|
||||
amReq.setRelaxLocality(true);
|
||||
amReq.setNodeLabelExpression(amContainerLabelExpression);
|
||||
context.setAMContainerResourceRequest(amReq);
|
||||
context.setAMContainerResourceRequests(Collections.singletonList(amReq));
|
||||
return context;
|
||||
}
|
||||
|
||||
@ -159,7 +161,8 @@ public static ApplicationSubmissionContext newInstance(
|
||||
context.setApplicationType(applicationType);
|
||||
context.setKeepContainersAcrossApplicationAttempts(keepContainers);
|
||||
context.setNodeLabelExpression(appLabelExpression);
|
||||
context.setAMContainerResourceRequest(resourceRequest);
|
||||
context.setAMContainerResourceRequests(
|
||||
Collections.singletonList(resourceRequest));
|
||||
return context;
|
||||
}
|
||||
|
||||
@ -454,29 +457,61 @@ public abstract void setKeepContainersAcrossApplicationAttempts(
|
||||
public abstract void setNodeLabelExpression(String nodeLabelExpression);
|
||||
|
||||
/**
|
||||
* Get ResourceRequest of AM container, if this is not null, scheduler will
|
||||
* use this to acquire resource for AM container.
|
||||
*
|
||||
* Get the ResourceRequest of the AM container.
|
||||
*
|
||||
* If this is not null, scheduler will use this to acquire resource for AM
|
||||
* container.
|
||||
*
|
||||
* If this is null, scheduler will assemble a ResourceRequest by using
|
||||
* <em>getResource</em> and <em>getPriority</em> of
|
||||
* <em>ApplicationSubmissionContext</em>.
|
||||
*
|
||||
* Number of containers and Priority will be ignore.
|
||||
*
|
||||
* @return ResourceRequest of AM container
|
||||
*
|
||||
* Number of containers and Priority will be ignored.
|
||||
*
|
||||
* @return ResourceRequest of the AM container
|
||||
* @deprecated See {@link #getAMContainerResourceRequests()}
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
@Deprecated
|
||||
public abstract ResourceRequest getAMContainerResourceRequest();
|
||||
|
||||
/**
|
||||
* Set ResourceRequest of AM container
|
||||
* @param request of AM container
|
||||
* Set ResourceRequest of the AM container
|
||||
* @param request of the AM container
|
||||
* @deprecated See {@link #setAMContainerResourceRequests(List)}
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
@Deprecated
|
||||
public abstract void setAMContainerResourceRequest(ResourceRequest request);
|
||||
|
||||
/**
|
||||
* Get the ResourceRequests of the AM container.
|
||||
*
|
||||
* If this is not null, scheduler will use this to acquire resource for AM
|
||||
* container.
|
||||
*
|
||||
* If this is null, scheduler will use the ResourceRequest as determined by
|
||||
* <em>getAMContainerResourceRequest</em> and its behavior.
|
||||
*
|
||||
* Number of containers and Priority will be ignored.
|
||||
*
|
||||
* @return List of ResourceRequests of the AM container
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public abstract List<ResourceRequest> getAMContainerResourceRequests();
|
||||
|
||||
/**
|
||||
* Set ResourceRequests of the AM container.
|
||||
* @param requests of the AM container
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public abstract void setAMContainerResourceRequests(
|
||||
List<ResourceRequest> requests);
|
||||
|
||||
/**
|
||||
* Get the attemptFailuresValidityInterval in milliseconds for the application
|
||||
*
|
||||
|
@ -378,7 +378,7 @@ message ApplicationSubmissionContextProto {
|
||||
optional LogAggregationContextProto log_aggregation_context = 14;
|
||||
optional ReservationIdProto reservation_id = 15;
|
||||
optional string node_label_expression = 16;
|
||||
optional ResourceRequestProto am_container_resource_request = 17;
|
||||
repeated ResourceRequestProto am_container_resource_request = 17;
|
||||
repeated ApplicationTimeoutMapProto application_timeouts = 18;
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
@ -66,7 +68,7 @@ public class ApplicationSubmissionContextPBImpl
|
||||
private ContainerLaunchContext amContainer = null;
|
||||
private Resource resource = null;
|
||||
private Set<String> applicationTags = null;
|
||||
private ResourceRequest amResourceRequest = null;
|
||||
private List<ResourceRequest> amResourceRequests = null;
|
||||
private LogAggregationContext logAggregationContext = null;
|
||||
private ReservationId reservationId = null;
|
||||
private Map<ApplicationTimeoutType, Long> applicationTimeouts = null;
|
||||
@ -127,9 +129,10 @@ private void mergeLocalToBuilder() {
|
||||
builder.clearApplicationTags();
|
||||
builder.addAllApplicationTags(this.applicationTags);
|
||||
}
|
||||
if (this.amResourceRequest != null) {
|
||||
builder.setAmContainerResourceRequest(
|
||||
convertToProtoFormat(this.amResourceRequest));
|
||||
if (this.amResourceRequests != null) {
|
||||
builder.clearAmContainerResourceRequest();
|
||||
builder.addAllAmContainerResourceRequest(
|
||||
convertToProtoFormat(this.amResourceRequests));
|
||||
}
|
||||
if (this.logAggregationContext != null) {
|
||||
builder.setLogAggregationContext(
|
||||
@ -430,13 +433,23 @@ private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
|
||||
private PriorityProto convertToProtoFormat(Priority t) {
|
||||
return ((PriorityPBImpl)t).getProto();
|
||||
}
|
||||
|
||||
private ResourceRequestPBImpl convertFromProtoFormat(ResourceRequestProto p) {
|
||||
return new ResourceRequestPBImpl(p);
|
||||
|
||||
private List<ResourceRequest> convertFromProtoFormat(
|
||||
List<ResourceRequestProto> ps) {
|
||||
List<ResourceRequest> rs = new ArrayList<>();
|
||||
for (ResourceRequestProto p : ps) {
|
||||
rs.add(new ResourceRequestPBImpl(p));
|
||||
}
|
||||
return rs;
|
||||
}
|
||||
|
||||
private ResourceRequestProto convertToProtoFormat(ResourceRequest t) {
|
||||
return ((ResourceRequestPBImpl)t).getProto();
|
||||
private List<ResourceRequestProto> convertToProtoFormat(
|
||||
List<ResourceRequest> ts) {
|
||||
List<ResourceRequestProto> rs = new ArrayList<>(ts.size());
|
||||
for (ResourceRequest t : ts) {
|
||||
rs.add(((ResourceRequestPBImpl)t).getProto());
|
||||
}
|
||||
return rs;
|
||||
}
|
||||
|
||||
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
|
||||
@ -485,25 +498,46 @@ public void setNodeLabelExpression(String labelExpression) {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public ResourceRequest getAMContainerResourceRequest() {
|
||||
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.amResourceRequest != null) {
|
||||
return amResourceRequest;
|
||||
} // Else via proto
|
||||
if (!p.hasAmContainerResourceRequest()) {
|
||||
List<ResourceRequest> reqs = getAMContainerResourceRequests();
|
||||
if (reqs == null || reqs.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
amResourceRequest = convertFromProtoFormat(p.getAmContainerResourceRequest());
|
||||
return amResourceRequest;
|
||||
return getAMContainerResourceRequests().get(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ResourceRequest> getAMContainerResourceRequests() {
|
||||
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.amResourceRequests != null) {
|
||||
return amResourceRequests;
|
||||
} // Else via proto
|
||||
if (p.getAmContainerResourceRequestCount() == 0) {
|
||||
return null;
|
||||
}
|
||||
amResourceRequests =
|
||||
convertFromProtoFormat(p.getAmContainerResourceRequestList());
|
||||
return amResourceRequests;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public void setAMContainerResourceRequest(ResourceRequest request) {
|
||||
maybeInitBuilder();
|
||||
if (request == null) {
|
||||
builder.clearAmContainerResourceRequest();
|
||||
}
|
||||
this.amResourceRequest = request;
|
||||
this.amResourceRequests = Collections.singletonList(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAMContainerResourceRequests(List<ResourceRequest> requests) {
|
||||
maybeInitBuilder();
|
||||
if (requests == null) {
|
||||
builder.clearAmContainerResourceRequest();
|
||||
}
|
||||
this.amResourceRequests = requests;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -801,6 +801,28 @@ private <T> Map<NodeId, Set<T>> generateNodeLabelsInfoPerNode(Class<T> type) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get nodes that have no labels.
|
||||
*
|
||||
* @return set of nodes with no labels
|
||||
*/
|
||||
public Set<NodeId> getNodesWithoutALabel() {
|
||||
try {
|
||||
readLock.lock();
|
||||
Set<NodeId> nodes = new HashSet<>();
|
||||
for (Host host : nodeCollections.values()) {
|
||||
for (NodeId nodeId : host.nms.keySet()) {
|
||||
if (getLabelsByNode(nodeId).isEmpty()) {
|
||||
nodes.add(nodeId);
|
||||
}
|
||||
}
|
||||
}
|
||||
return Collections.unmodifiableSet(nodes);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get mapping of labels to nodes for all the labels.
|
||||
|
@ -17,7 +17,9 @@
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -31,6 +33,8 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
@ -337,14 +341,16 @@ protected void recoverApplication(ApplicationStateData appState,
|
||||
// has been disabled. Reject the recovery of this application if it
|
||||
// is true and give clear message so that user can react properly.
|
||||
if (!appContext.getUnmanagedAM() &&
|
||||
application.getAMResourceRequest() == null &&
|
||||
(application.getAMResourceRequests() == null ||
|
||||
application.getAMResourceRequests().isEmpty()) &&
|
||||
!YarnConfiguration.areNodeLabelsEnabled(this.conf)) {
|
||||
// check application submission context and see if am resource request
|
||||
// or application itself contains any node label expression.
|
||||
ResourceRequest amReqFromAppContext =
|
||||
appContext.getAMContainerResourceRequest();
|
||||
String labelExp = (amReqFromAppContext != null) ?
|
||||
amReqFromAppContext.getNodeLabelExpression() : null;
|
||||
List<ResourceRequest> amReqsFromAppContext =
|
||||
appContext.getAMContainerResourceRequests();
|
||||
String labelExp =
|
||||
(amReqsFromAppContext != null && !amReqsFromAppContext.isEmpty()) ?
|
||||
amReqsFromAppContext.get(0).getNodeLabelExpression() : null;
|
||||
if (labelExp == null) {
|
||||
labelExp = appContext.getNodeLabelExpression();
|
||||
}
|
||||
@ -379,9 +385,9 @@ private RMAppImpl createAndPopulateNewRMApp(
|
||||
}
|
||||
|
||||
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||
ResourceRequest amReq = null;
|
||||
List<ResourceRequest> amReqs = null;
|
||||
try {
|
||||
amReq = validateAndCreateResourceRequest(submissionContext, isRecovery);
|
||||
amReqs = validateAndCreateResourceRequest(submissionContext, isRecovery);
|
||||
} catch (InvalidLabelResourceRequestException e) {
|
||||
// This can happen if the application had been submitted and run
|
||||
// with Node Label enabled but recover with Node Label disabled.
|
||||
@ -444,7 +450,7 @@ private RMAppImpl createAndPopulateNewRMApp(
|
||||
submissionContext.getQueue(),
|
||||
submissionContext, this.scheduler, this.masterService,
|
||||
submitTime, submissionContext.getApplicationType(),
|
||||
submissionContext.getApplicationTags(), amReq, startTime);
|
||||
submissionContext.getApplicationTags(), amReqs, startTime);
|
||||
// Concurrent app submissions with same applicationId will fail here
|
||||
// Concurrent app submissions with different applicationIds will not
|
||||
// influence each other
|
||||
@ -470,7 +476,7 @@ private RMAppImpl createAndPopulateNewRMApp(
|
||||
return application;
|
||||
}
|
||||
|
||||
private ResourceRequest validateAndCreateResourceRequest(
|
||||
private List<ResourceRequest> validateAndCreateResourceRequest(
|
||||
ApplicationSubmissionContext submissionContext, boolean isRecovery)
|
||||
throws InvalidResourceRequestException {
|
||||
// Validation of the ApplicationSubmissionContext needs to be completed
|
||||
@ -480,33 +486,77 @@ private ResourceRequest validateAndCreateResourceRequest(
|
||||
|
||||
// Check whether AM resource requirements are within required limits
|
||||
if (!submissionContext.getUnmanagedAM()) {
|
||||
ResourceRequest amReq = submissionContext.getAMContainerResourceRequest();
|
||||
if (amReq == null) {
|
||||
amReq = BuilderUtils
|
||||
.newResourceRequest(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
|
||||
ResourceRequest.ANY, submissionContext.getResource(), 1);
|
||||
}
|
||||
|
||||
// set label expression for AM container
|
||||
if (null == amReq.getNodeLabelExpression()) {
|
||||
amReq.setNodeLabelExpression(submissionContext
|
||||
.getNodeLabelExpression());
|
||||
List<ResourceRequest> amReqs =
|
||||
submissionContext.getAMContainerResourceRequests();
|
||||
if (amReqs == null || amReqs.isEmpty()) {
|
||||
if (submissionContext.getResource() != null) {
|
||||
amReqs = Collections.singletonList(BuilderUtils
|
||||
.newResourceRequest(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
|
||||
ResourceRequest.ANY, submissionContext.getResource(), 1));
|
||||
} else {
|
||||
throw new InvalidResourceRequestException("Invalid resource request, "
|
||||
+ "no resources requested");
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
SchedulerUtils.normalizeAndValidateRequest(amReq,
|
||||
scheduler.getMaximumResourceCapability(),
|
||||
submissionContext.getQueue(), scheduler, isRecovery, rmContext);
|
||||
// Find the ANY request and ensure there's only one
|
||||
ResourceRequest anyReq = null;
|
||||
for (ResourceRequest amReq : amReqs) {
|
||||
if (amReq.getResourceName().equals(ResourceRequest.ANY)) {
|
||||
if (anyReq == null) {
|
||||
anyReq = amReq;
|
||||
} else {
|
||||
throw new InvalidResourceRequestException("Invalid resource "
|
||||
+ "request, only one resource request with "
|
||||
+ ResourceRequest.ANY + " is allowed");
|
||||
}
|
||||
}
|
||||
}
|
||||
if (anyReq == null) {
|
||||
throw new InvalidResourceRequestException("Invalid resource request, "
|
||||
+ "no resource request specified with " + ResourceRequest.ANY);
|
||||
}
|
||||
|
||||
// Make sure that all of the requests agree with the ANY request
|
||||
// and have correct values
|
||||
for (ResourceRequest amReq : amReqs) {
|
||||
amReq.setCapability(anyReq.getCapability());
|
||||
amReq.setExecutionTypeRequest(
|
||||
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
|
||||
amReq.setNumContainers(1);
|
||||
amReq.setPriority(RMAppAttemptImpl.AM_CONTAINER_PRIORITY);
|
||||
}
|
||||
|
||||
// set label expression for AM ANY request if not set
|
||||
if (null == anyReq.getNodeLabelExpression()) {
|
||||
anyReq.setNodeLabelExpression(submissionContext
|
||||
.getNodeLabelExpression());
|
||||
}
|
||||
|
||||
// Put ANY request at the front
|
||||
if (!amReqs.get(0).equals(anyReq)) {
|
||||
amReqs.remove(anyReq);
|
||||
amReqs.add(0, anyReq);
|
||||
}
|
||||
|
||||
// Normalize all requests
|
||||
for (ResourceRequest amReq : amReqs) {
|
||||
SchedulerUtils.normalizeAndValidateRequest(amReq,
|
||||
scheduler.getMaximumResourceCapability(),
|
||||
submissionContext.getQueue(), scheduler, isRecovery, rmContext);
|
||||
|
||||
amReq.setCapability(
|
||||
scheduler.getNormalizedResource(amReq.getCapability()));
|
||||
}
|
||||
return amReqs;
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
LOG.warn("RM app submission failed in validating AM resource request"
|
||||
+ " for application " + submissionContext.getApplicationId(), e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
amReq.setCapability(scheduler.getNormalizedResource(amReq.getCapability()));
|
||||
return amReq;
|
||||
}
|
||||
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -21,13 +21,16 @@
|
||||
import java.io.IOException;
|
||||
import java.text.ParseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
@ -41,6 +44,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
@ -557,19 +561,65 @@ public static Map<ApplicationTimeoutType, Long> validateISO8601AndConvertToLocal
|
||||
*
|
||||
* @param rmContext context
|
||||
* @param conf configuration
|
||||
* @param amreq am resource request
|
||||
* @param amReqs am resource requests
|
||||
* @return applicable node count
|
||||
*/
|
||||
public static int getApplicableNodeCountForAM(RMContext rmContext,
|
||||
Configuration conf, ResourceRequest amreq) {
|
||||
if (YarnConfiguration.areNodeLabelsEnabled(conf)) {
|
||||
RMNodeLabelsManager labelManager = rmContext.getNodeLabelManager();
|
||||
String amNodeLabelExpression = amreq.getNodeLabelExpression();
|
||||
amNodeLabelExpression = (amNodeLabelExpression == null
|
||||
|| amNodeLabelExpression.trim().isEmpty())
|
||||
? RMNodeLabelsManager.NO_LABEL : amNodeLabelExpression;
|
||||
return labelManager.getActiveNMCountPerLabel(amNodeLabelExpression);
|
||||
Configuration conf, List<ResourceRequest> amReqs) {
|
||||
// Determine the list of nodes that are eligible based on the strict
|
||||
// resource requests
|
||||
Set<NodeId> nodesForReqs = new HashSet<>();
|
||||
for (ResourceRequest amReq : amReqs) {
|
||||
if (amReq.getRelaxLocality() &&
|
||||
!amReq.getResourceName().equals(ResourceRequest.ANY)) {
|
||||
nodesForReqs.addAll(
|
||||
rmContext.getScheduler().getNodeIds(amReq.getResourceName()));
|
||||
}
|
||||
}
|
||||
|
||||
if (YarnConfiguration.areNodeLabelsEnabled(conf)) {
|
||||
// Determine the list of nodes that are eligible based on the node label
|
||||
String amNodeLabelExpression = amReqs.get(0).getNodeLabelExpression();
|
||||
Set<NodeId> nodesForLabels =
|
||||
getNodeIdsForLabel(rmContext, amNodeLabelExpression);
|
||||
if (nodesForLabels != null && !nodesForLabels.isEmpty()) {
|
||||
// If only node labels, strip out any wildcard NodeIds and return
|
||||
if (nodesForReqs.isEmpty()) {
|
||||
for (Iterator<NodeId> it = nodesForLabels.iterator(); it.hasNext();) {
|
||||
if (it.next().getPort() == 0) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
return nodesForLabels.size();
|
||||
} else {
|
||||
// The NodeIds common to both the strict resource requests and the
|
||||
// node label is the eligible set
|
||||
return Sets.intersection(nodesForReqs, nodesForLabels).size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no strict resource request NodeIds nor node label NodeIds, then just
|
||||
// return the entire cluster
|
||||
if (nodesForReqs.isEmpty()) {
|
||||
return rmContext.getScheduler().getNumClusterNodes();
|
||||
}
|
||||
// No node label NodeIds, so return the strict resource request NodeIds
|
||||
return nodesForReqs.size();
|
||||
}
|
||||
|
||||
private static Set<NodeId> getNodeIdsForLabel(RMContext rmContext,
|
||||
String label) {
|
||||
label = (label == null || label.trim().isEmpty())
|
||||
? RMNodeLabelsManager.NO_LABEL : label;
|
||||
if (label.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||
// NO_LABEL nodes aren't tracked directly
|
||||
return rmContext.getNodeLabelManager().getNodesWithoutALabel();
|
||||
} else {
|
||||
Map<String, Set<NodeId>> labelsToNodes =
|
||||
rmContext.getNodeLabelManager().getLabelsToNodes(
|
||||
Collections.singleton(label));
|
||||
return labelsToNodes.get(label);
|
||||
}
|
||||
return rmContext.getScheduler().getNumClusterNodes();
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@ -269,7 +270,7 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
|
||||
|
||||
ReservationId getReservationId();
|
||||
|
||||
ResourceRequest getAMResourceRequest();
|
||||
List<ResourceRequest> getAMResourceRequests();
|
||||
|
||||
Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp();
|
||||
|
||||
|
@ -196,7 +196,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
private RMAppEvent eventCausingFinalSaving;
|
||||
private RMAppState targetedFinalState;
|
||||
private RMAppState recoveredFinalState;
|
||||
private ResourceRequest amReq;
|
||||
private List<ResourceRequest> amReqs;
|
||||
|
||||
private CallerContext callerContext;
|
||||
|
||||
@ -424,10 +424,10 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
|
||||
ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
|
||||
ApplicationMasterService masterService, long submitTime,
|
||||
String applicationType, Set<String> applicationTags,
|
||||
ResourceRequest amReq) {
|
||||
List<ResourceRequest> amReqs) {
|
||||
this(applicationId, rmContext, config, name, user, queue, submissionContext,
|
||||
scheduler, masterService, submitTime, applicationType, applicationTags,
|
||||
amReq, -1);
|
||||
amReqs, -1);
|
||||
}
|
||||
|
||||
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
|
||||
@ -435,7 +435,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
|
||||
ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
|
||||
ApplicationMasterService masterService, long submitTime,
|
||||
String applicationType, Set<String> applicationTags,
|
||||
ResourceRequest amReq, long startTime) {
|
||||
List<ResourceRequest> amReqs, long startTime) {
|
||||
|
||||
this.systemClock = SystemClock.getInstance();
|
||||
|
||||
@ -458,7 +458,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
|
||||
}
|
||||
this.applicationType = applicationType;
|
||||
this.applicationTags = applicationTags;
|
||||
this.amReq = amReq;
|
||||
this.amReqs = amReqs;
|
||||
if (submissionContext.getPriority() != null) {
|
||||
this.applicationPriority = Priority
|
||||
.newInstance(submissionContext.getPriority().getPriority());
|
||||
@ -987,7 +987,7 @@ private void createNewAttempt(ApplicationAttemptId appAttemptId) {
|
||||
if (amBlacklistingEnabled && !submissionContext.getUnmanagedAM()) {
|
||||
currentAMBlacklistManager = new SimpleBlacklistManager(
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf,
|
||||
getAMResourceRequest()),
|
||||
getAMResourceRequests()),
|
||||
blacklistDisableThreshold);
|
||||
} else {
|
||||
currentAMBlacklistManager = new DisabledBlacklistManager();
|
||||
@ -995,7 +995,7 @@ private void createNewAttempt(ApplicationAttemptId appAttemptId) {
|
||||
}
|
||||
RMAppAttempt attempt =
|
||||
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
|
||||
submissionContext, conf, amReq, this, currentAMBlacklistManager);
|
||||
submissionContext, conf, amReqs, this, currentAMBlacklistManager);
|
||||
attempts.put(appAttemptId, attempt);
|
||||
currentAttempt = attempt;
|
||||
}
|
||||
@ -1690,8 +1690,8 @@ public ReservationId getReservationId() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceRequest getAMResourceRequest() {
|
||||
return this.amReq;
|
||||
public List<ResourceRequest> getAMResourceRequests() {
|
||||
return this.amReqs;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1964,7 +1964,9 @@ public String getAppNodeLabelExpression() {
|
||||
public String getAmNodeLabelExpression() {
|
||||
String amNodeLabelExpression = null;
|
||||
if (!getApplicationSubmissionContext().getUnmanagedAM()) {
|
||||
amNodeLabelExpression = getAMResourceRequest().getNodeLabelExpression();
|
||||
amNodeLabelExpression =
|
||||
getAMResourceRequests() != null && !getAMResourceRequests().isEmpty()
|
||||
? getAMResourceRequests().get(0).getNodeLabelExpression() : null;
|
||||
amNodeLabelExpression = (amNodeLabelExpression == null)
|
||||
? NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET : amNodeLabelExpression;
|
||||
amNodeLabelExpression = (amNodeLabelExpression.trim().isEmpty())
|
||||
|
@ -192,7 +192,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||
private Object transitionTodo;
|
||||
|
||||
private RMAppAttemptMetrics attemptMetrics = null;
|
||||
private ResourceRequest amReq = null;
|
||||
private List<ResourceRequest> amReqs = null;
|
||||
private BlacklistManager blacklistedNodesForAM = null;
|
||||
|
||||
private String amLaunchDiagnostics;
|
||||
@ -485,16 +485,16 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
|
||||
RMContext rmContext, YarnScheduler scheduler,
|
||||
ApplicationMasterService masterService,
|
||||
ApplicationSubmissionContext submissionContext,
|
||||
Configuration conf, ResourceRequest amReq, RMApp rmApp) {
|
||||
Configuration conf, List<ResourceRequest> amReqs, RMApp rmApp) {
|
||||
this(appAttemptId, rmContext, scheduler, masterService, submissionContext,
|
||||
conf, amReq, rmApp, new DisabledBlacklistManager());
|
||||
conf, amReqs, rmApp, new DisabledBlacklistManager());
|
||||
}
|
||||
|
||||
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
|
||||
RMContext rmContext, YarnScheduler scheduler,
|
||||
ApplicationMasterService masterService,
|
||||
ApplicationSubmissionContext submissionContext,
|
||||
Configuration conf, ResourceRequest amReq, RMApp rmApp,
|
||||
Configuration conf, List<ResourceRequest> amReqs, RMApp rmApp,
|
||||
BlacklistManager amBlacklistManager) {
|
||||
this.conf = conf;
|
||||
this.applicationAttemptId = appAttemptId;
|
||||
@ -514,7 +514,7 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
|
||||
this.attemptMetrics =
|
||||
new RMAppAttemptMetrics(applicationAttemptId, rmContext);
|
||||
|
||||
this.amReq = amReq;
|
||||
this.amReqs = amReqs;
|
||||
this.blacklistedNodesForAM = amBlacklistManager;
|
||||
|
||||
final int diagnosticsLimitKC = getDiagnosticsLimitKCOrThrow(conf);
|
||||
@ -1090,18 +1090,21 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
||||
// will be passed to scheduler, and scheduler will deduct the number after
|
||||
// AM container allocated
|
||||
|
||||
// Currently, following fields are all hard code,
|
||||
// Currently, following fields are all hard coded,
|
||||
// TODO: change these fields when we want to support
|
||||
// priority/resource-name/relax-locality specification for AM containers
|
||||
// allocation.
|
||||
appAttempt.amReq.setNumContainers(1);
|
||||
appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
|
||||
appAttempt.amReq.setResourceName(ResourceRequest.ANY);
|
||||
appAttempt.amReq.setRelaxLocality(true);
|
||||
// priority or multiple containers AM container allocation.
|
||||
for (ResourceRequest amReq : appAttempt.amReqs) {
|
||||
amReq.setNumContainers(1);
|
||||
amReq.setPriority(AM_CONTAINER_PRIORITY);
|
||||
}
|
||||
|
||||
appAttempt.getAMBlacklistManager().refreshNodeHostCount(
|
||||
int numNodes =
|
||||
RMServerUtils.getApplicableNodeCountForAM(appAttempt.rmContext,
|
||||
appAttempt.conf, appAttempt.amReq));
|
||||
appAttempt.conf, appAttempt.amReqs);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Setting node count for blacklist to " + numNodes);
|
||||
}
|
||||
appAttempt.getAMBlacklistManager().refreshNodeHostCount(numNodes);
|
||||
|
||||
ResourceBlacklistRequest amBlacklist =
|
||||
appAttempt.getAMBlacklistManager().getBlacklistUpdates();
|
||||
@ -1114,7 +1117,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
||||
Allocation amContainerAllocation =
|
||||
appAttempt.scheduler.allocate(
|
||||
appAttempt.applicationAttemptId,
|
||||
Collections.singletonList(appAttempt.amReq),
|
||||
appAttempt.amReqs,
|
||||
EMPTY_CONTAINER_RELEASE_LIST,
|
||||
amBlacklist.getBlacklistAdditions(),
|
||||
amBlacklist.getBlacklistRemovals(),
|
||||
|
@ -1257,4 +1257,9 @@ protected void rollbackContainerUpdate(
|
||||
rmContainer.getLastConfirmedResource(), null)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NodeId> getNodeIds(String resourceName) {
|
||||
return nodeTracker.getNodeIdsByResourceName(resourceName);
|
||||
}
|
||||
}
|
||||
|
@ -268,6 +268,9 @@ public List<N> getAllNodes() {
|
||||
|
||||
/**
|
||||
* Convenience method to filter nodes based on a condition.
|
||||
*
|
||||
* @param nodeFilter A {@link NodeFilter} for filtering the nodes
|
||||
* @return A list of filtered nodes
|
||||
*/
|
||||
public List<N> getNodes(NodeFilter nodeFilter) {
|
||||
List<N> nodeList = new ArrayList<>();
|
||||
@ -288,6 +291,37 @@ public List<N> getNodes(NodeFilter nodeFilter) {
|
||||
return nodeList;
|
||||
}
|
||||
|
||||
public List<NodeId> getAllNodeIds() {
|
||||
return getNodeIds(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method to filter nodes based on a condition.
|
||||
*
|
||||
* @param nodeFilter A {@link NodeFilter} for filtering the nodes
|
||||
* @return A list of filtered nodes
|
||||
*/
|
||||
public List<NodeId> getNodeIds(NodeFilter nodeFilter) {
|
||||
List<NodeId> nodeList = new ArrayList<>();
|
||||
readLock.lock();
|
||||
try {
|
||||
if (nodeFilter == null) {
|
||||
for (N node : nodes.values()) {
|
||||
nodeList.add(node.getNodeID());
|
||||
}
|
||||
} else {
|
||||
for (N node : nodes.values()) {
|
||||
if (nodeFilter.accept(node)) {
|
||||
nodeList.add(node.getNodeID());
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
return nodeList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method to sort nodes.
|
||||
*
|
||||
@ -320,11 +354,38 @@ public List<N> getNodesByResourceName(final String resourceName) {
|
||||
resourceName != null && !resourceName.isEmpty());
|
||||
List<N> retNodes = new ArrayList<>();
|
||||
if (ResourceRequest.ANY.equals(resourceName)) {
|
||||
return getAllNodes();
|
||||
retNodes.addAll(getAllNodes());
|
||||
} else if (nodeNameToNodeMap.containsKey(resourceName)) {
|
||||
retNodes.add(nodeNameToNodeMap.get(resourceName));
|
||||
} else if (nodesPerRack.containsKey(resourceName)) {
|
||||
return nodesPerRack.get(resourceName);
|
||||
retNodes.addAll(nodesPerRack.get(resourceName));
|
||||
} else {
|
||||
LOG.info(
|
||||
"Could not find a node matching given resourceName " + resourceName);
|
||||
}
|
||||
return retNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method to return list of {@link NodeId} corresponding to
|
||||
* resourceName passed in the {@link ResourceRequest}.
|
||||
*
|
||||
* @param resourceName Host/rack name of the resource, or
|
||||
* {@link ResourceRequest#ANY}
|
||||
* @return list of {@link NodeId} that match the resourceName
|
||||
*/
|
||||
public List<NodeId> getNodeIdsByResourceName(final String resourceName) {
|
||||
Preconditions.checkArgument(
|
||||
resourceName != null && !resourceName.isEmpty());
|
||||
List<NodeId> retNodes = new ArrayList<>();
|
||||
if (ResourceRequest.ANY.equals(resourceName)) {
|
||||
retNodes.addAll(getAllNodeIds());
|
||||
} else if (nodeNameToNodeMap.containsKey(resourceName)) {
|
||||
retNodes.add(nodeNameToNodeMap.get(resourceName).getNodeID());
|
||||
} else if (nodesPerRack.containsKey(resourceName)) {
|
||||
for (N node : nodesPerRack.get(resourceName)) {
|
||||
retNodes.add(node.getNodeID());
|
||||
}
|
||||
} else {
|
||||
LOG.info(
|
||||
"Could not find a node matching given resourceName " + resourceName);
|
||||
|
@ -19,10 +19,12 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
||||
|
||||
@ -49,4 +51,11 @@ public interface ResourceScheduler extends YarnScheduler, Recoverable {
|
||||
* @throws IOException
|
||||
*/
|
||||
void reinitialize(Configuration conf, RMContext rmContext) throws IOException;
|
||||
|
||||
/**
|
||||
* Get the {@link NodeId} available in the cluster by resource name.
|
||||
* @param resourceName resource name
|
||||
* @return the number of available {@link NodeId} by resource name.
|
||||
*/
|
||||
List<NodeId> getNodeIds(String resourceName);
|
||||
}
|
||||
|
@ -141,18 +141,20 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
|
||||
Resource amResource;
|
||||
String partition;
|
||||
|
||||
if (rmApp == null || rmApp.getAMResourceRequest() == null) {
|
||||
if (rmApp == null || rmApp.getAMResourceRequests() == null
|
||||
|| rmApp.getAMResourceRequests().isEmpty()) {
|
||||
// the rmApp may be undefined (the resource manager checks for this too)
|
||||
// and unmanaged applications do not provide an amResource request
|
||||
// in these cases, provide a default using the scheduler
|
||||
amResource = rmContext.getScheduler().getMinimumResourceCapability();
|
||||
partition = CommonNodeLabelsManager.NO_LABEL;
|
||||
} else {
|
||||
amResource = rmApp.getAMResourceRequest().getCapability();
|
||||
amResource = rmApp.getAMResourceRequests().get(0).getCapability();
|
||||
partition =
|
||||
(rmApp.getAMResourceRequest().getNodeLabelExpression() == null)
|
||||
(rmApp.getAMResourceRequests().get(0)
|
||||
.getNodeLabelExpression() == null)
|
||||
? CommonNodeLabelsManager.NO_LABEL
|
||||
: rmApp.getAMResourceRequest().getNodeLabelExpression();
|
||||
: rmApp.getAMResourceRequests().get(0).getNodeLabelExpression();
|
||||
}
|
||||
|
||||
setAppAMNodePartitionName(partition);
|
||||
|
@ -229,7 +229,7 @@ public AppInfo(ResourceManager rm, RMApp app, Boolean hasAccess,
|
||||
appNodeLabelExpression =
|
||||
app.getApplicationSubmissionContext().getNodeLabelExpression();
|
||||
amNodeLabelExpression = (unmanagedApplication) ? null
|
||||
: app.getAMResourceRequest().getNodeLabelExpression();
|
||||
: app.getAMResourceRequests().get(0).getNodeLabelExpression();
|
||||
|
||||
// Setting partition based resource usage of application
|
||||
ResourceScheduler scheduler = rm.getRMContext().getScheduler();
|
||||
|
@ -24,6 +24,7 @@
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -678,6 +679,17 @@ public RMApp submitApp(Credentials cred, ByteBuffer tokensConf)
|
||||
tokensConf);
|
||||
}
|
||||
|
||||
public RMApp submitApp(List<ResourceRequest> amResourceRequests)
|
||||
throws Exception {
|
||||
return submitApp(amResourceRequests, "app1",
|
||||
"user", null, false, null,
|
||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
|
||||
false, false, null, 0, null, true,
|
||||
amResourceRequests.get(0).getPriority(),
|
||||
amResourceRequests.get(0).getNodeLabelExpression(), null, null);
|
||||
}
|
||||
|
||||
public RMApp submitApp(Resource capability, String name, String user,
|
||||
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
||||
int maxAppAttempts, Credentials ts, String appType,
|
||||
@ -688,6 +700,30 @@ public RMApp submitApp(Resource capability, String name, String user,
|
||||
Map<ApplicationTimeoutType, Long> applicationTimeouts,
|
||||
ByteBuffer tokensConf)
|
||||
throws Exception {
|
||||
priority = (priority == null) ? Priority.newInstance(0) : priority;
|
||||
ResourceRequest amResourceRequest = ResourceRequest.newInstance(
|
||||
priority, ResourceRequest.ANY, capability, 1);
|
||||
if (amLabel != null && !amLabel.isEmpty()) {
|
||||
amResourceRequest.setNodeLabelExpression(amLabel.trim());
|
||||
}
|
||||
return submitApp(Collections.singletonList(amResourceRequest), name, user,
|
||||
acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted,
|
||||
keepContainers, isAppIdProvided, applicationId,
|
||||
attemptFailuresValidityInterval, logAggregationContext,
|
||||
cancelTokensWhenComplete, priority, amLabel, applicationTimeouts,
|
||||
tokensConf);
|
||||
}
|
||||
|
||||
public RMApp submitApp(List<ResourceRequest> amResourceRequests, String name,
|
||||
String user, Map<ApplicationAccessType, String> acls, boolean unmanaged,
|
||||
String queue, int maxAppAttempts, Credentials ts, String appType,
|
||||
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
|
||||
ApplicationId applicationId, long attemptFailuresValidityInterval,
|
||||
LogAggregationContext logAggregationContext,
|
||||
boolean cancelTokensWhenComplete, Priority priority, String amLabel,
|
||||
Map<ApplicationTimeoutType, Long> applicationTimeouts,
|
||||
ByteBuffer tokensConf)
|
||||
throws Exception {
|
||||
ApplicationId appId = isAppIdProvided ? applicationId : null;
|
||||
ApplicationClientProtocol client = getClientRMService();
|
||||
if (! isAppIdProvided) {
|
||||
@ -718,7 +754,6 @@ public RMApp submitApp(Resource capability, String name, String user,
|
||||
sub.setApplicationType(appType);
|
||||
ContainerLaunchContext clc = Records
|
||||
.newRecord(ContainerLaunchContext.class);
|
||||
sub.setResource(capability);
|
||||
clc.setApplicationACLs(acls);
|
||||
if (ts != null && UserGroupInformation.isSecurityEnabled()) {
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
@ -733,12 +768,12 @@ public RMApp submitApp(Resource capability, String name, String user,
|
||||
sub.setLogAggregationContext(logAggregationContext);
|
||||
}
|
||||
sub.setCancelTokensWhenComplete(cancelTokensWhenComplete);
|
||||
ResourceRequest amResourceRequest = ResourceRequest.newInstance(
|
||||
Priority.newInstance(0), ResourceRequest.ANY, capability, 1);
|
||||
if (amLabel != null && !amLabel.isEmpty()) {
|
||||
amResourceRequest.setNodeLabelExpression(amLabel.trim());
|
||||
for (ResourceRequest amResourceRequest : amResourceRequests) {
|
||||
amResourceRequest.setNodeLabelExpression(amLabel.trim());
|
||||
}
|
||||
}
|
||||
sub.setAMContainerResourceRequest(amResourceRequest);
|
||||
sub.setAMContainerResourceRequests(amResourceRequests);
|
||||
req.setApplicationSubmissionContext(sub);
|
||||
UserGroupInformation fakeUser =
|
||||
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
|
||||
|
@ -31,6 +31,8 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
@ -50,6 +52,8 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
@ -57,11 +61,13 @@
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||
@ -72,6 +78,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
@ -312,7 +319,7 @@ public void testQueueSubmitWithNoPermission() throws IOException {
|
||||
ResourceRequest resReg =
|
||||
ResourceRequest.newInstance(Priority.newInstance(0),
|
||||
ResourceRequest.ANY, Resource.newInstance(1024, 1), 1);
|
||||
sub.setAMContainerResourceRequest(resReg);
|
||||
sub.setAMContainerResourceRequests(Collections.singletonList(resReg));
|
||||
req.setApplicationSubmissionContext(sub);
|
||||
sub.setAMContainerSpec(mock(ContainerLaunchContext.class));
|
||||
try {
|
||||
@ -522,8 +529,157 @@ protected void setupDispatcher(RMContext rmContext, Configuration conf) {
|
||||
Assert.assertEquals("app event type is wrong before", RMAppEventType.KILL, appEventType);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testRMAppSubmit() throws Exception {
|
||||
public void testRMAppSubmitAMContainerResourceRequests() throws Exception {
|
||||
asContext.setResource(Resources.createResource(1024));
|
||||
asContext.setAMContainerResourceRequest(
|
||||
ResourceRequest.newInstance(Priority.newInstance(0),
|
||||
ResourceRequest.ANY, Resources.createResource(1024), 1, true));
|
||||
List<ResourceRequest> reqs = new ArrayList<>();
|
||||
reqs.add(ResourceRequest.newInstance(Priority.newInstance(0),
|
||||
ResourceRequest.ANY, Resources.createResource(1025), 1, false));
|
||||
reqs.add(ResourceRequest.newInstance(Priority.newInstance(0),
|
||||
"/rack", Resources.createResource(1025), 1, false));
|
||||
reqs.add(ResourceRequest.newInstance(Priority.newInstance(0),
|
||||
"/rack/node", Resources.createResource(1025), 1, true));
|
||||
asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs));
|
||||
// getAMContainerResourceRequest uses the first entry of
|
||||
// getAMContainerResourceRequests
|
||||
Assert.assertEquals(reqs.get(0), asContext.getAMContainerResourceRequest());
|
||||
Assert.assertEquals(reqs, asContext.getAMContainerResourceRequests());
|
||||
RMApp app = testRMAppSubmit();
|
||||
for (ResourceRequest req : reqs) {
|
||||
req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
|
||||
}
|
||||
// setAMContainerResourceRequests has priority over
|
||||
// setAMContainerResourceRequest and setResource
|
||||
Assert.assertEquals(reqs, app.getAMResourceRequests());
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testRMAppSubmitAMContainerResourceRequest() throws Exception {
|
||||
asContext.setResource(Resources.createResource(1024));
|
||||
asContext.setAMContainerResourceRequests(null);
|
||||
ResourceRequest req =
|
||||
ResourceRequest.newInstance(Priority.newInstance(0),
|
||||
ResourceRequest.ANY, Resources.createResource(1025), 1, true);
|
||||
asContext.setAMContainerResourceRequest(cloneResourceRequest(req));
|
||||
// getAMContainerResourceRequests uses a singleton list of
|
||||
// getAMContainerResourceRequest
|
||||
Assert.assertEquals(req, asContext.getAMContainerResourceRequest());
|
||||
Assert.assertEquals(req, asContext.getAMContainerResourceRequests().get(0));
|
||||
Assert.assertEquals(1, asContext.getAMContainerResourceRequests().size());
|
||||
RMApp app = testRMAppSubmit();
|
||||
req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
|
||||
// setAMContainerResourceRequest has priority over setResource
|
||||
Assert.assertEquals(Collections.singletonList(req),
|
||||
app.getAMResourceRequests());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRMAppSubmitResource() throws Exception {
|
||||
asContext.setResource(Resources.createResource(1024));
|
||||
asContext.setAMContainerResourceRequests(null);
|
||||
RMApp app = testRMAppSubmit();
|
||||
// setResource
|
||||
Assert.assertEquals(Collections.singletonList(
|
||||
ResourceRequest.newInstance(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
|
||||
ResourceRequest.ANY, Resources.createResource(1024), 1, true, "")),
|
||||
app.getAMResourceRequests());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRMAppSubmitNoResourceRequests() throws Exception {
|
||||
asContext.setResource(null);
|
||||
asContext.setAMContainerResourceRequests(null);
|
||||
try {
|
||||
testRMAppSubmit();
|
||||
Assert.fail("Should have failed due to no ResourceRequest");
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
Assert.assertEquals(
|
||||
"Invalid resource request, no resources requested",
|
||||
e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRMAppSubmitAMContainerResourceRequestsDisagree()
|
||||
throws Exception {
|
||||
asContext.setResource(null);
|
||||
List<ResourceRequest> reqs = new ArrayList<>();
|
||||
ResourceRequest anyReq = ResourceRequest.newInstance(
|
||||
Priority.newInstance(1),
|
||||
ResourceRequest.ANY, Resources.createResource(1024), 1, false, "label1",
|
||||
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
|
||||
reqs.add(anyReq);
|
||||
reqs.add(ResourceRequest.newInstance(Priority.newInstance(2),
|
||||
"/rack", Resources.createResource(1025), 2, false, "",
|
||||
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)));
|
||||
reqs.add(ResourceRequest.newInstance(Priority.newInstance(3),
|
||||
"/rack/node", Resources.createResource(1026), 3, true, "",
|
||||
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)));
|
||||
asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs));
|
||||
RMApp app = testRMAppSubmit();
|
||||
// It should force the requests to all agree on these points
|
||||
for (ResourceRequest req : reqs) {
|
||||
req.setCapability(anyReq.getCapability());
|
||||
req.setExecutionTypeRequest(
|
||||
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
|
||||
req.setNumContainers(1);
|
||||
req.setPriority(Priority.newInstance(0));
|
||||
}
|
||||
Assert.assertEquals(reqs, app.getAMResourceRequests());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRMAppSubmitAMContainerResourceRequestsNoAny()
|
||||
throws Exception {
|
||||
asContext.setResource(null);
|
||||
List<ResourceRequest> reqs = new ArrayList<>();
|
||||
reqs.add(ResourceRequest.newInstance(Priority.newInstance(1),
|
||||
"/rack", Resources.createResource(1025), 1, false));
|
||||
reqs.add(ResourceRequest.newInstance(Priority.newInstance(1),
|
||||
"/rack/node", Resources.createResource(1025), 1, true));
|
||||
asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs));
|
||||
// getAMContainerResourceRequest uses the first entry of
|
||||
// getAMContainerResourceRequests
|
||||
Assert.assertEquals(reqs, asContext.getAMContainerResourceRequests());
|
||||
try {
|
||||
testRMAppSubmit();
|
||||
Assert.fail("Should have failed due to missing ANY ResourceRequest");
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
Assert.assertEquals(
|
||||
"Invalid resource request, no resource request specified with *",
|
||||
e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRMAppSubmitAMContainerResourceRequestsTwoManyAny()
|
||||
throws Exception {
|
||||
asContext.setResource(null);
|
||||
List<ResourceRequest> reqs = new ArrayList<>();
|
||||
reqs.add(ResourceRequest.newInstance(Priority.newInstance(1),
|
||||
ResourceRequest.ANY, Resources.createResource(1025), 1, false));
|
||||
reqs.add(ResourceRequest.newInstance(Priority.newInstance(1),
|
||||
ResourceRequest.ANY, Resources.createResource(1025), 1, false));
|
||||
asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs));
|
||||
// getAMContainerResourceRequest uses the first entry of
|
||||
// getAMContainerResourceRequests
|
||||
Assert.assertEquals(reqs, asContext.getAMContainerResourceRequests());
|
||||
try {
|
||||
testRMAppSubmit();
|
||||
Assert.fail("Should have failed due to too many ANY ResourceRequests");
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
Assert.assertEquals(
|
||||
"Invalid resource request, only one resource request with * is " +
|
||||
"allowed", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private RMApp testRMAppSubmit() throws Exception {
|
||||
appMonitor.submitApplication(asContext, "test");
|
||||
RMApp app = rmContext.getRMApps().get(appId);
|
||||
Assert.assertNotNull("app is null", app);
|
||||
@ -534,12 +690,14 @@ public void testRMAppSubmit() throws Exception {
|
||||
|
||||
// wait for event to be processed
|
||||
int timeoutSecs = 0;
|
||||
while ((getAppEventType() == RMAppEventType.KILL) &&
|
||||
while ((getAppEventType() == RMAppEventType.KILL) &&
|
||||
timeoutSecs++ < 20) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
|
||||
getAppEventType());
|
||||
|
||||
return app;
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -737,6 +895,15 @@ private static ResourceScheduler mockResourceScheduler() {
|
||||
ResourceCalculator rs = mock(ResourceCalculator.class);
|
||||
when(scheduler.getResourceCalculator()).thenReturn(rs);
|
||||
|
||||
when(scheduler.getNormalizedResource(any()))
|
||||
.thenAnswer(new Answer<Resource>() {
|
||||
@Override
|
||||
public Resource answer(InvocationOnMock invocationOnMock)
|
||||
throws Throwable {
|
||||
return (Resource) invocationOnMock.getArguments()[0];
|
||||
}
|
||||
});
|
||||
|
||||
return scheduler;
|
||||
}
|
||||
|
||||
@ -753,4 +920,26 @@ private static Resource mockResource() {
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
||||
}
|
||||
|
||||
private static ResourceRequest cloneResourceRequest(ResourceRequest req) {
|
||||
return ResourceRequest.newInstance(
|
||||
Priority.newInstance(req.getPriority().getPriority()),
|
||||
new String(req.getResourceName()),
|
||||
Resource.newInstance(req.getCapability().getMemorySize(),
|
||||
req.getCapability().getVirtualCores()),
|
||||
req.getNumContainers(),
|
||||
req.getRelaxLocality(),
|
||||
req.getNodeLabelExpression() != null
|
||||
? new String(req.getNodeLabelExpression()) : null,
|
||||
ExecutionTypeRequest.newInstance(
|
||||
req.getExecutionTypeRequest().getExecutionType()));
|
||||
}
|
||||
|
||||
private static List<ResourceRequest> cloneResourceRequests(
|
||||
List<ResourceRequest> reqs) {
|
||||
List<ResourceRequest> cloneReqs = new ArrayList<>();
|
||||
for (ResourceRequest req : reqs) {
|
||||
cloneReqs.add(cloneResourceRequest(req));
|
||||
}
|
||||
return cloneReqs;
|
||||
}
|
||||
}
|
||||
|
@ -38,6 +38,7 @@
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
@ -1307,9 +1308,9 @@ private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
|
||||
spy(new RMAppImpl(applicationId3, rmContext, config, null, null,
|
||||
queueName, asContext, yarnScheduler, null,
|
||||
System.currentTimeMillis(), "YARN", null,
|
||||
BuilderUtils.newResourceRequest(
|
||||
Collections.singletonList(BuilderUtils.newResourceRequest(
|
||||
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
||||
Resource.newInstance(1024, 1), 1)){
|
||||
Resource.newInstance(1024, 1), 1))){
|
||||
@Override
|
||||
public ApplicationReport createAndGetApplicationReport(
|
||||
String clientUserName, boolean allowAccess) {
|
||||
@ -1323,7 +1324,8 @@ public ApplicationReport createAndGetApplicationReport(
|
||||
return report;
|
||||
}
|
||||
});
|
||||
app.getAMResourceRequest().setNodeLabelExpression(amNodeLabelExpression);
|
||||
app.getAMResourceRequests().get(0)
|
||||
.setNodeLabelExpression(amNodeLabelExpression);
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
|
||||
ApplicationId.newInstance(123456, 1), 1);
|
||||
RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId,
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
@ -28,6 +29,9 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
@ -156,6 +160,186 @@ public void testNodeBlacklistingOnAMFailure() throws Exception {
|
||||
currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
|
||||
}
|
||||
|
||||
@Test(timeout = 100000)
|
||||
public void testNodeBlacklistingOnAMFailureStrictNodeLocality()
|
||||
throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
|
||||
true);
|
||||
|
||||
DrainDispatcher dispatcher = new DrainDispatcher();
|
||||
MockRM rm = startRM(conf, dispatcher);
|
||||
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
|
||||
|
||||
// Register 5 nodes, so that we can blacklist atleast one if AM container
|
||||
// is failed. As per calculation it will be like, 5nodes * 0.2 (default)=1.
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
MockNM nm2 =
|
||||
new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService());
|
||||
nm2.registerNode();
|
||||
|
||||
MockNM nm3 =
|
||||
new MockNM("127.0.0.3:2345", 8000, rm.getResourceTrackerService());
|
||||
nm3.registerNode();
|
||||
|
||||
MockNM nm4 =
|
||||
new MockNM("127.0.0.4:2345", 8000, rm.getResourceTrackerService());
|
||||
nm4.registerNode();
|
||||
|
||||
MockNM nm5 =
|
||||
new MockNM("127.0.0.5:2345", 8000, rm.getResourceTrackerService());
|
||||
nm5.registerNode();
|
||||
|
||||
// Specify a strict locality on nm2
|
||||
List<ResourceRequest> reqs = new ArrayList<>();
|
||||
ResourceRequest nodeReq = ResourceRequest.newInstance(
|
||||
Priority.newInstance(0), nm2.getNodeId().getHost(),
|
||||
Resource.newInstance(200, 1), 1, true);
|
||||
ResourceRequest rackReq = ResourceRequest.newInstance(
|
||||
Priority.newInstance(0), "/default-rack",
|
||||
Resource.newInstance(200, 1), 1, false);
|
||||
ResourceRequest anyReq = ResourceRequest.newInstance(
|
||||
Priority.newInstance(0), ResourceRequest.ANY,
|
||||
Resource.newInstance(200, 1), 1, false);
|
||||
reqs.add(anyReq);
|
||||
reqs.add(rackReq);
|
||||
reqs.add(nodeReq);
|
||||
RMApp app = rm.submitApp(reqs);
|
||||
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm2);
|
||||
ContainerId amContainerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||
RMContainer rmContainer = scheduler.getRMContainer(amContainerId);
|
||||
NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
|
||||
Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan);
|
||||
|
||||
// Set the exist status to INVALID so that we can verify that the system
|
||||
// automatically blacklisting the node
|
||||
makeAMContainerExit(rm, amContainerId, nm2, ContainerExitStatus.INVALID);
|
||||
|
||||
// restart the am
|
||||
RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
|
||||
System.out.println("New AppAttempt launched " + attempt.getAppAttemptId());
|
||||
|
||||
nm2.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
// Now the AM container should be allocated
|
||||
MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
|
||||
|
||||
MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||
amContainerId =
|
||||
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
|
||||
rmContainer = scheduler.getRMContainer(amContainerId);
|
||||
nodeWhereAMRan = rmContainer.getAllocatedNode();
|
||||
|
||||
// The second AM should be on the same node because the strict locality
|
||||
// made the eligible nodes only 1, so the blacklisting threshold kicked in
|
||||
System.out.println("AM ran on " + nodeWhereAMRan);
|
||||
Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan);
|
||||
|
||||
am2.registerAppAttempt();
|
||||
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
|
||||
}
|
||||
|
||||
@Test(timeout = 100000)
|
||||
public void testNodeBlacklistingOnAMFailureRelaxedNodeLocality()
|
||||
throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
|
||||
true);
|
||||
|
||||
DrainDispatcher dispatcher = new DrainDispatcher();
|
||||
MockRM rm = startRM(conf, dispatcher);
|
||||
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
|
||||
|
||||
// Register 5 nodes, so that we can blacklist atleast one if AM container
|
||||
// is failed. As per calculation it will be like, 5nodes * 0.2 (default)=1.
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
MockNM nm2 =
|
||||
new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService());
|
||||
nm2.registerNode();
|
||||
|
||||
MockNM nm3 =
|
||||
new MockNM("127.0.0.3:2345", 8000, rm.getResourceTrackerService());
|
||||
nm3.registerNode();
|
||||
|
||||
MockNM nm4 =
|
||||
new MockNM("127.0.0.4:2345", 8000, rm.getResourceTrackerService());
|
||||
nm4.registerNode();
|
||||
|
||||
MockNM nm5 =
|
||||
new MockNM("127.0.0.5:2345", 8000, rm.getResourceTrackerService());
|
||||
nm5.registerNode();
|
||||
|
||||
// Specify a relaxed locality on nm2
|
||||
List<ResourceRequest> reqs = new ArrayList<>();
|
||||
ResourceRequest nodeReq = ResourceRequest.newInstance(
|
||||
Priority.newInstance(0), nm2.getNodeId().getHost(),
|
||||
Resource.newInstance(200, 1), 1, true);
|
||||
ResourceRequest rackReq = ResourceRequest.newInstance(
|
||||
Priority.newInstance(0), "/default-rack",
|
||||
Resource.newInstance(200, 1), 1, true);
|
||||
ResourceRequest anyReq = ResourceRequest.newInstance(
|
||||
Priority.newInstance(0), ResourceRequest.ANY,
|
||||
Resource.newInstance(200, 1), 1, true);
|
||||
reqs.add(anyReq);
|
||||
reqs.add(rackReq);
|
||||
reqs.add(nodeReq);
|
||||
RMApp app = rm.submitApp(reqs);
|
||||
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm2);
|
||||
ContainerId amContainerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||
RMContainer rmContainer = scheduler.getRMContainer(amContainerId);
|
||||
NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
|
||||
Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan);
|
||||
|
||||
// Set the exist status to INVALID so that we can verify that the system
|
||||
// automatically blacklisting the node
|
||||
makeAMContainerExit(rm, amContainerId, nm2, ContainerExitStatus.INVALID);
|
||||
|
||||
// restart the am
|
||||
RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
|
||||
System.out.println("New AppAttempt launched " + attempt.getAppAttemptId());
|
||||
|
||||
nm2.nodeHeartbeat(true);
|
||||
nm1.nodeHeartbeat(true);
|
||||
nm3.nodeHeartbeat(true);
|
||||
nm4.nodeHeartbeat(true);
|
||||
nm5.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
// Now the AM container should be allocated
|
||||
MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
|
||||
|
||||
MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||
amContainerId =
|
||||
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
|
||||
rmContainer = scheduler.getRMContainer(amContainerId);
|
||||
nodeWhereAMRan = rmContainer.getAllocatedNode();
|
||||
|
||||
// The second AM should be on a different node because the relaxed locality
|
||||
// made the app schedulable on other nodes and nm2 is blacklisted
|
||||
System.out.println("AM ran on " + nodeWhereAMRan);
|
||||
Assert.assertNotEquals(nm2.getNodeId(), nodeWhereAMRan);
|
||||
|
||||
am2.registerAppAttempt();
|
||||
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
|
||||
}
|
||||
|
||||
@Test(timeout = 100000)
|
||||
public void testNoBlacklistingForNonSystemErrors() throws Exception {
|
||||
|
||||
|
@ -0,0 +1,297 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class TestRMServerUtils {
|
||||
@Test
|
||||
public void testGetApplicableNodeCountForAMLocality() throws Exception {
|
||||
List<NodeId> rack1Nodes = new ArrayList<>();
|
||||
for (int i = 0; i < 29; i++) {
|
||||
rack1Nodes.add(NodeId.newInstance("host" + i, 1234));
|
||||
}
|
||||
NodeId node1 = NodeId.newInstance("node1", 1234);
|
||||
NodeId node2 = NodeId.newInstance("node2", 1234);
|
||||
rack1Nodes.add(node2);
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, false);
|
||||
ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
|
||||
Mockito.when(scheduler.getNumClusterNodes()).thenReturn(100);
|
||||
Mockito.when(scheduler.getNodeIds("/rack1")).thenReturn(rack1Nodes);
|
||||
Mockito.when(scheduler.getNodeIds("node1"))
|
||||
.thenReturn(Collections.singletonList(node1));
|
||||
Mockito.when(scheduler.getNodeIds("node2"))
|
||||
.thenReturn(Collections.singletonList(node2));
|
||||
RMContext rmContext = Mockito.mock(RMContext.class);
|
||||
Mockito.when(rmContext.getScheduler()).thenReturn(scheduler);
|
||||
|
||||
ResourceRequest anyReq = createResourceRequest(ResourceRequest.ANY,
|
||||
true, null);
|
||||
List<ResourceRequest> reqs = new ArrayList<>();
|
||||
reqs.add(anyReq);
|
||||
Assert.assertEquals(100,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
|
||||
ResourceRequest rackReq = createResourceRequest("/rack1", true, null);
|
||||
reqs.add(rackReq);
|
||||
Assert.assertEquals(30,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
anyReq.setRelaxLocality(false);
|
||||
Assert.assertEquals(30,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
rackReq.setRelaxLocality(false);
|
||||
Assert.assertEquals(100,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
|
||||
ResourceRequest node1Req = createResourceRequest("node1", false, null);
|
||||
reqs.add(node1Req);
|
||||
Assert.assertEquals(100,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
node1Req.setRelaxLocality(true);
|
||||
Assert.assertEquals(1,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
rackReq.setRelaxLocality(true);
|
||||
Assert.assertEquals(31,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
|
||||
ResourceRequest node2Req = createResourceRequest("node2", false, null);
|
||||
reqs.add(node2Req);
|
||||
Assert.assertEquals(31,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
node2Req.setRelaxLocality(true);
|
||||
Assert.assertEquals(31,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
rackReq.setRelaxLocality(false);
|
||||
Assert.assertEquals(2,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
node1Req.setRelaxLocality(false);
|
||||
Assert.assertEquals(1,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
node2Req.setRelaxLocality(false);
|
||||
Assert.assertEquals(100,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetApplicableNodeCountForAMLabels() throws Exception {
|
||||
Set<NodeId> noLabelNodes = new HashSet<>();
|
||||
for (int i = 0; i < 80; i++) {
|
||||
noLabelNodes.add(NodeId.newInstance("host" + i, 1234));
|
||||
}
|
||||
Set<NodeId> label1Nodes = new HashSet<>();
|
||||
for (int i = 80; i < 90; i++) {
|
||||
label1Nodes.add(NodeId.newInstance("host" + i, 1234));
|
||||
}
|
||||
label1Nodes.add(NodeId.newInstance("host101", 0));
|
||||
label1Nodes.add(NodeId.newInstance("host102", 0));
|
||||
Map<String, Set<NodeId>> label1NodesMap = new HashMap<>();
|
||||
label1NodesMap.put("label1", label1Nodes);
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||
ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
|
||||
Mockito.when(scheduler.getNumClusterNodes()).thenReturn(100);
|
||||
RMContext rmContext = Mockito.mock(RMContext.class);
|
||||
Mockito.when(rmContext.getScheduler()).thenReturn(scheduler);
|
||||
RMNodeLabelsManager labMan = Mockito.mock(RMNodeLabelsManager.class);
|
||||
Mockito.when(labMan.getNodesWithoutALabel()).thenReturn(noLabelNodes);
|
||||
Mockito.when(labMan.getLabelsToNodes(Collections.singleton("label1")))
|
||||
.thenReturn(label1NodesMap);
|
||||
Mockito.when(rmContext.getNodeLabelManager()).thenReturn(labMan);
|
||||
|
||||
ResourceRequest anyReq = createResourceRequest(ResourceRequest.ANY,
|
||||
true, null);
|
||||
List<ResourceRequest> reqs = new ArrayList<>();
|
||||
reqs.add(anyReq);
|
||||
Assert.assertEquals(80,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
anyReq.setNodeLabelExpression("label1");
|
||||
Assert.assertEquals(10,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetApplicableNodeCountForAMLocalityAndLabels()
|
||||
throws Exception {
|
||||
List<NodeId> rack1Nodes = new ArrayList<>();
|
||||
for (int i = 0; i < 29; i++) {
|
||||
rack1Nodes.add(NodeId.newInstance("host" + i, 1234));
|
||||
}
|
||||
NodeId node1 = NodeId.newInstance("node1", 1234);
|
||||
NodeId node2 = NodeId.newInstance("node2", 1234);
|
||||
rack1Nodes.add(node2);
|
||||
Set<NodeId> noLabelNodes = new HashSet<>();
|
||||
for (int i = 0; i < 19; i++) {
|
||||
noLabelNodes.add(rack1Nodes.get(i));
|
||||
}
|
||||
noLabelNodes.add(node2);
|
||||
for (int i = 29; i < 89; i++) {
|
||||
noLabelNodes.add(NodeId.newInstance("host" + i, 1234));
|
||||
}
|
||||
Set<NodeId> label1Nodes = new HashSet<>();
|
||||
label1Nodes.add(node1);
|
||||
for (int i = 89; i < 93; i++) {
|
||||
label1Nodes.add(NodeId.newInstance("host" + i, 1234));
|
||||
}
|
||||
for (int i = 19; i < 29; i++) {
|
||||
label1Nodes.add(rack1Nodes.get(i));
|
||||
}
|
||||
label1Nodes.add(NodeId.newInstance("host101", 0));
|
||||
label1Nodes.add(NodeId.newInstance("host102", 0));
|
||||
Map<String, Set<NodeId>> label1NodesMap = new HashMap<>();
|
||||
label1NodesMap.put("label1", label1Nodes);
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||
ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
|
||||
Mockito.when(scheduler.getNumClusterNodes()).thenReturn(100);
|
||||
Mockito.when(scheduler.getNodeIds("/rack1")).thenReturn(rack1Nodes);
|
||||
Mockito.when(scheduler.getNodeIds("node1"))
|
||||
.thenReturn(Collections.singletonList(node1));
|
||||
Mockito.when(scheduler.getNodeIds("node2"))
|
||||
.thenReturn(Collections.singletonList(node2));
|
||||
RMContext rmContext = Mockito.mock(RMContext.class);
|
||||
Mockito.when(rmContext.getScheduler()).thenReturn(scheduler);
|
||||
RMNodeLabelsManager labMan = Mockito.mock(RMNodeLabelsManager.class);
|
||||
Mockito.when(labMan.getNodesWithoutALabel()).thenReturn(noLabelNodes);
|
||||
Mockito.when(labMan.getLabelsToNodes(Collections.singleton("label1")))
|
||||
.thenReturn(label1NodesMap);
|
||||
Mockito.when(rmContext.getNodeLabelManager()).thenReturn(labMan);
|
||||
|
||||
ResourceRequest anyReq = createResourceRequest(ResourceRequest.ANY,
|
||||
true, null);
|
||||
List<ResourceRequest> reqs = new ArrayList<>();
|
||||
reqs.add(anyReq);
|
||||
Assert.assertEquals(80,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
|
||||
ResourceRequest rackReq = createResourceRequest("/rack1", true, null);
|
||||
reqs.add(rackReq);
|
||||
Assert.assertEquals(20,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
anyReq.setRelaxLocality(false);
|
||||
Assert.assertEquals(20,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
rackReq.setRelaxLocality(false);
|
||||
Assert.assertEquals(80,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
|
||||
ResourceRequest node1Req = createResourceRequest("node1", false, null);
|
||||
reqs.add(node1Req);
|
||||
Assert.assertEquals(80,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
node1Req.setRelaxLocality(true);
|
||||
Assert.assertEquals(0,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
rackReq.setRelaxLocality(true);
|
||||
Assert.assertEquals(20,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
|
||||
ResourceRequest node2Req = createResourceRequest("node2", false, null);
|
||||
reqs.add(node2Req);
|
||||
Assert.assertEquals(20,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
node2Req.setRelaxLocality(true);
|
||||
Assert.assertEquals(20,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
rackReq.setRelaxLocality(false);
|
||||
Assert.assertEquals(1,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
node1Req.setRelaxLocality(false);
|
||||
Assert.assertEquals(1,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
node2Req.setRelaxLocality(false);
|
||||
Assert.assertEquals(80,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
|
||||
anyReq.setNodeLabelExpression("label1");
|
||||
rackReq.setNodeLabelExpression("label1");
|
||||
node1Req.setNodeLabelExpression("label1");
|
||||
node2Req.setNodeLabelExpression("label1");
|
||||
anyReq.setRelaxLocality(true);
|
||||
reqs = new ArrayList<>();
|
||||
reqs.add(anyReq);
|
||||
Assert.assertEquals(15,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
|
||||
rackReq.setRelaxLocality(true);
|
||||
reqs.add(rackReq);
|
||||
Assert.assertEquals(10,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
anyReq.setRelaxLocality(false);
|
||||
Assert.assertEquals(10,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
rackReq.setRelaxLocality(false);
|
||||
Assert.assertEquals(15,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
|
||||
node1Req.setRelaxLocality(false);
|
||||
reqs.add(node1Req);
|
||||
Assert.assertEquals(15,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
node1Req.setRelaxLocality(true);
|
||||
Assert.assertEquals(1,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
rackReq.setRelaxLocality(true);
|
||||
Assert.assertEquals(11,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
|
||||
node2Req.setRelaxLocality(false);
|
||||
reqs.add(node2Req);
|
||||
Assert.assertEquals(11,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
node2Req.setRelaxLocality(true);
|
||||
Assert.assertEquals(11,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
rackReq.setRelaxLocality(false);
|
||||
Assert.assertEquals(1,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
node1Req.setRelaxLocality(false);
|
||||
Assert.assertEquals(0,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
node2Req.setRelaxLocality(false);
|
||||
Assert.assertEquals(15,
|
||||
RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
|
||||
}
|
||||
|
||||
private ResourceRequest createResourceRequest(String resource,
|
||||
boolean relaxLocality, String nodeLabel) {
|
||||
return ResourceRequest.newInstance(Priority.newInstance(0),
|
||||
resource, Resource.newInstance(1, 1), 1, relaxLocality, nodeLabel);
|
||||
}
|
||||
}
|
@ -57,7 +57,7 @@
|
||||
public abstract class MockAsm extends MockApps {
|
||||
|
||||
public static class ApplicationBase implements RMApp {
|
||||
ResourceRequest amReq;
|
||||
List<ResourceRequest> amReqs;
|
||||
@Override
|
||||
public String getUser() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
@ -204,8 +204,8 @@ public ReservationId getReservationId() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceRequest getAMResourceRequest() {
|
||||
return this.amReq;
|
||||
public List<ResourceRequest> getAMResourceRequests() {
|
||||
return this.amReqs;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -526,7 +526,8 @@ private static RMApp createRMApp(ApplicationId appId) {
|
||||
when(app.getAppNodeLabelExpression()).thenCallRealMethod();
|
||||
ResourceRequest amReq = mock(ResourceRequest.class);
|
||||
when(amReq.getNodeLabelExpression()).thenReturn("high-mem");
|
||||
when(app.getAMResourceRequest()).thenReturn(amReq);
|
||||
when(app.getAMResourceRequests())
|
||||
.thenReturn(Collections.singletonList(amReq));
|
||||
when(app.getAmNodeLabelExpression()).thenCallRealMethod();
|
||||
when(app.getApplicationPriority()).thenReturn(Priority.newInstance(10));
|
||||
when(app.getCallerContext())
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@ -62,14 +63,14 @@ public class MockRMApp implements RMApp {
|
||||
StringBuilder diagnostics = new StringBuilder();
|
||||
RMAppAttempt attempt;
|
||||
int maxAppAttempts = 1;
|
||||
ResourceRequest amReq;
|
||||
List<ResourceRequest> amReqs;
|
||||
|
||||
public MockRMApp(int newid, long time, RMAppState newState) {
|
||||
finish = time;
|
||||
id = MockApps.newAppID(newid);
|
||||
state = newState;
|
||||
amReq = ResourceRequest.newInstance(Priority.UNDEFINED, "0.0.0.0",
|
||||
Resource.newInstance(0, 0), 1);
|
||||
amReqs = Collections.singletonList(ResourceRequest.newInstance(
|
||||
Priority.UNDEFINED, "0.0.0.0", Resource.newInstance(0, 0), 1));
|
||||
}
|
||||
|
||||
public MockRMApp(int newid, long time, RMAppState newState, String userName) {
|
||||
@ -276,8 +277,8 @@ public ReservationId getReservationId() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceRequest getAMResourceRequest() {
|
||||
return this.amReq;
|
||||
public List<ResourceRequest> getAMResourceRequests() {
|
||||
return this.amReqs;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -30,8 +30,10 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -271,7 +273,8 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext)
|
||||
submissionContext.setAMContainerSpec(mock(ContainerLaunchContext.class));
|
||||
RMApp application = new RMAppImpl(applicationId, rmContext, conf, name,
|
||||
user, queue, submissionContext, scheduler, masterService,
|
||||
System.currentTimeMillis(), "YARN", null, mock(ResourceRequest.class));
|
||||
System.currentTimeMillis(), "YARN", null,
|
||||
new ArrayList<ResourceRequest>());
|
||||
|
||||
testAppStartState(applicationId, user, name, queue, application);
|
||||
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),
|
||||
@ -1024,9 +1027,9 @@ public void testRecoverApplication(ApplicationStateData appState,
|
||||
submissionContext.getQueue(), submissionContext, scheduler, null,
|
||||
appState.getSubmitTime(), submissionContext.getApplicationType(),
|
||||
submissionContext.getApplicationTags(),
|
||||
BuilderUtils.newResourceRequest(
|
||||
Collections.singletonList(BuilderUtils.newResourceRequest(
|
||||
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
||||
submissionContext.getResource(), 1));
|
||||
submissionContext.getResource(), 1)));
|
||||
Assert.assertEquals(RMAppState.NEW, application.getState());
|
||||
|
||||
RMAppEvent recoverEvent =
|
||||
|
@ -328,9 +328,9 @@ public void setUp() throws Exception {
|
||||
applicationAttempt =
|
||||
new RMAppAttemptImpl(applicationAttemptId, spyRMContext, scheduler,
|
||||
masterService, submissionContext, new Configuration(),
|
||||
BuilderUtils.newResourceRequest(
|
||||
Collections.singletonList(BuilderUtils.newResourceRequest(
|
||||
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
||||
submissionContext.getResource(), 1), application);
|
||||
submissionContext.getResource(), 1)), application);
|
||||
|
||||
when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
|
||||
when(application.getApplicationId()).thenReturn(applicationId);
|
||||
@ -1108,9 +1108,9 @@ public void testLaunchedFailWhileAHSEnabled() {
|
||||
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(),
|
||||
spyRMContext, scheduler,masterService,
|
||||
submissionContext, myConf,
|
||||
BuilderUtils.newResourceRequest(
|
||||
Collections.singletonList(BuilderUtils.newResourceRequest(
|
||||
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
||||
submissionContext.getResource(), 1), application);
|
||||
submissionContext.getResource(), 1)), application);
|
||||
|
||||
//submit, schedule and allocate app attempt
|
||||
myApplicationAttempt.handle(
|
||||
@ -1584,9 +1584,9 @@ public void testContainersCleanupForLastAttempt() {
|
||||
applicationAttempt =
|
||||
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext,
|
||||
scheduler, masterService, submissionContext, new Configuration(),
|
||||
BuilderUtils.newResourceRequest(
|
||||
Collections.singletonList(BuilderUtils.newResourceRequest(
|
||||
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
||||
submissionContext.getResource(), 1), application);
|
||||
submissionContext.getResource(), 1)), application);
|
||||
when(submissionContext.getKeepContainersAcrossApplicationAttempts())
|
||||
.thenReturn(true);
|
||||
when(submissionContext.getMaxAppAttempts()).thenReturn(1);
|
||||
@ -1645,9 +1645,10 @@ public Allocation answer(InvocationOnMock invocation)
|
||||
applicationAttempt =
|
||||
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(),
|
||||
spyRMContext, scheduler, masterService, submissionContext,
|
||||
new Configuration(), ResourceRequest.newInstance(
|
||||
Priority.UNDEFINED, "host1", Resource.newInstance(3333, 1), 3,
|
||||
false, "label-expression"), application);
|
||||
new Configuration(), Collections.singletonList(
|
||||
ResourceRequest.newInstance(Priority.UNDEFINED, "host1",
|
||||
Resource.newInstance(3333, 1), 3,
|
||||
false, "label-expression")), application);
|
||||
new RMAppAttemptImpl.ScheduleTransition().transition(
|
||||
(RMAppAttemptImpl) applicationAttempt, null);
|
||||
}
|
||||
|
@ -28,6 +28,7 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -613,7 +614,8 @@ public void testHeadroom() throws Exception {
|
||||
ResourceRequest amResourceRequest = mock(ResourceRequest.class);
|
||||
Resource amResource = Resources.createResource(0, 0);
|
||||
when(amResourceRequest.getCapability()).thenReturn(amResource);
|
||||
when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
|
||||
when(rmApp.getAMResourceRequests()).thenReturn(
|
||||
Collections.singletonList(amResourceRequest));
|
||||
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
|
||||
when(spyRMContext.getRMApps()).thenReturn(spyApps);
|
||||
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
|
||||
|
@ -25,6 +25,7 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -639,7 +640,8 @@ public void testHeadroom() throws Exception {
|
||||
ResourceRequest amResourceRequest = mock(ResourceRequest.class);
|
||||
Resource amResource = Resources.createResource(0, 0);
|
||||
when(amResourceRequest.getCapability()).thenReturn(amResource);
|
||||
when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
|
||||
when(rmApp.getAMResourceRequests()).thenReturn(
|
||||
Collections.singletonList(amResourceRequest));
|
||||
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
|
||||
when(spyRMContext.getRMApps()).thenReturn(spyApps);
|
||||
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
|
||||
|
@ -3205,7 +3205,7 @@ public void testAMUsedResource() throws Exception {
|
||||
RMApp rmApp = rm.submitApp(amMemory, "app-1", "user_0", null, queueName);
|
||||
|
||||
assertEquals("RMApp does not containes minimum allocation",
|
||||
minAllocResource, rmApp.getAMResourceRequest().getCapability());
|
||||
minAllocResource, rmApp.getAMResourceRequests().get(0).getCapability());
|
||||
|
||||
ResourceScheduler scheduler = rm.getRMContext().getScheduler();
|
||||
LeafQueue queueA =
|
||||
|
@ -151,7 +151,8 @@ private void setUpInternal(ResourceCalculator rC) throws Exception {
|
||||
amResourceRequest = mock(ResourceRequest.class);
|
||||
when(amResourceRequest.getCapability()).thenReturn(
|
||||
Resources.createResource(0, 0));
|
||||
when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
|
||||
when(rmApp.getAMResourceRequests()).thenReturn(
|
||||
Collections.singletonList(amResourceRequest));
|
||||
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
|
||||
when(spyRMContext.getRMApps()).thenReturn(spyApps);
|
||||
|
||||
|
@ -3204,6 +3204,84 @@ public void testCancelStrictLocality() throws IOException {
|
||||
assertEquals(1, app.getLiveContainers().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAMStrictLocalityRack() throws IOException {
|
||||
testAMStrictLocality(false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAMStrictLocalityNode() throws IOException {
|
||||
testAMStrictLocality(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAMStrictLocalityRackInvalid() throws IOException {
|
||||
testAMStrictLocality(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAMStrictLocalityNodeInvalid() throws IOException {
|
||||
testAMStrictLocality(true, true);
|
||||
}
|
||||
|
||||
private void testAMStrictLocality(boolean node, boolean invalid)
|
||||
throws IOException {
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
RMNode node2 = MockNodes.newNodeInfo(2, Resources.createResource(1024), 2,
|
||||
"127.0.0.2");
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
||||
List<ResourceRequest> reqs = new ArrayList<>();
|
||||
ResourceRequest nodeRequest = createResourceRequest(1024,
|
||||
node2.getHostName(), 1, 1, true);
|
||||
if (node && invalid) {
|
||||
nodeRequest.setResourceName("invalid");
|
||||
}
|
||||
ResourceRequest rackRequest = createResourceRequest(1024,
|
||||
node2.getRackName(), 1, 1, !node);
|
||||
if (!node && invalid) {
|
||||
rackRequest.setResourceName("invalid");
|
||||
}
|
||||
ResourceRequest anyRequest = createResourceRequest(1024,
|
||||
ResourceRequest.ANY, 1, 1, false);
|
||||
reqs.add(anyRequest);
|
||||
reqs.add(rackRequest);
|
||||
if (node) {
|
||||
reqs.add(nodeRequest);
|
||||
}
|
||||
|
||||
ApplicationAttemptId attId1 =
|
||||
createSchedulingRequest("queue1", "user1", reqs);
|
||||
|
||||
scheduler.update();
|
||||
|
||||
NodeUpdateSchedulerEvent node2UpdateEvent =
|
||||
new NodeUpdateSchedulerEvent(node2);
|
||||
|
||||
FSAppAttempt app = scheduler.getSchedulerApp(attId1);
|
||||
|
||||
// node2 should get the container
|
||||
scheduler.handle(node2UpdateEvent);
|
||||
if (invalid) {
|
||||
assertEquals(0, app.getLiveContainers().size());
|
||||
assertEquals(0, scheduler.getNode(node2.getNodeID()).getNumContainers());
|
||||
assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumContainers());
|
||||
} else {
|
||||
assertEquals(1, app.getLiveContainers().size());
|
||||
assertEquals(1, scheduler.getNode(node2.getNodeID()).getNumContainers());
|
||||
assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumContainers());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Strict locality requests shouldn't reserve resources on another node.
|
||||
*/
|
||||
|
@ -1213,8 +1213,8 @@ public void testUnmarshalAppInfo() throws JSONException, Exception {
|
||||
assertEquals(app1.getApplicationId().toString(), appInfo.getAppId());
|
||||
assertEquals(app1.getName(), appInfo.getName());
|
||||
assertEquals(app1.createApplicationState(), appInfo.getState());
|
||||
assertEquals(app1.getAMResourceRequest().getCapability().getMemorySize(),
|
||||
appInfo.getAllocatedMB());
|
||||
assertEquals(app1.getAMResourceRequests().get(0).getCapability()
|
||||
.getMemorySize(), appInfo.getAllocatedMB());
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
@ -1427,7 +1427,7 @@ public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException,
|
||||
expectedNumberOfElements++;
|
||||
appNodeLabelExpression = info.getString("appNodeLabelExpression");
|
||||
}
|
||||
if (app.getAMResourceRequest().getNodeLabelExpression() != null) {
|
||||
if (app.getAMResourceRequests().get(0).getNodeLabelExpression() != null) {
|
||||
expectedNumberOfElements++;
|
||||
amNodeLabelExpression = info.getString("amNodeLabelExpression");
|
||||
}
|
||||
@ -1534,7 +1534,7 @@ public void verifyAppInfoGeneric(RMApp app, String id, String user,
|
||||
app.getApplicationSubmissionContext().getNodeLabelExpression(),
|
||||
appNodeLabelExpression);
|
||||
assertEquals("unmanagedApplication doesn't match",
|
||||
app.getAMResourceRequest().getNodeLabelExpression(),
|
||||
app.getAMResourceRequests().get(0).getNodeLabelExpression(),
|
||||
amNodeLabelExpression);
|
||||
assertEquals("amRPCAddress",
|
||||
AppInfo.getAmRPCAddressFromRMAppAttempt(app.getCurrentAppAttempt()),
|
||||
@ -1561,7 +1561,7 @@ public void verifyResourceRequestsGeneric(RMApp app,
|
||||
String nodeLabelExpression, int numContainers, boolean relaxLocality,
|
||||
int priority, String resourceName, long memory, long vCores,
|
||||
String executionType, boolean enforceExecutionType) {
|
||||
ResourceRequest request = app.getAMResourceRequest();
|
||||
ResourceRequest request = app.getAMResourceRequests().get(0);
|
||||
assertEquals("nodeLabelExpression doesn't match",
|
||||
request.getNodeLabelExpression(), nodeLabelExpression);
|
||||
assertEquals("numContainers doesn't match", request.getNumContainers(),
|
||||
|
Loading…
Reference in New Issue
Block a user