YARN-7920. Simplify configuration for PlacementConstraints. Contributed by Wangda Tan.

This commit is contained in:
Konstantinos Karanasos 2018-02-15 14:23:27 -08:00
parent 47473952e5
commit 0b489e564c
16 changed files with 363 additions and 120 deletions

View File

@ -532,11 +532,57 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String RM_SCHEDULER =
RM_PREFIX + "scheduler.class";
/** Enable rich placement constraints. */
public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED =
RM_PREFIX + "placement-constraints.enabled";
/**
* Specify which handler will be used to process PlacementConstraints.
* For details on PlacementConstraints, please refer to
* {@link org.apache.hadoop.yarn.api.resource.PlacementConstraint}
*/
@Private
public static final String RM_PLACEMENT_CONSTRAINTS_HANDLER =
RM_PREFIX + "placement-constraints.handler";
public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = false;
/**
* This handler rejects all allocate calls made by an application, if they
* contain a {@link org.apache.hadoop.yarn.api.records.SchedulingRequest}.
*/
@Private
public static final String DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER =
"disabled";
/**
* Using this handler, the placement of containers with constraints is
* determined as a pre-processing step before the capacity or the fair
* scheduler is called. Once the placement is decided, the capacity/fair
* scheduler is invoked to perform the actual allocation. The advantage of
* this approach is that it supports all constraint types (affinity,
* anti-affinity, cardinality). Moreover, it considers multiple containers at
* a time, which allows to satisfy more constraints than a container-at-a-time
* approach can achieve. As it sits outside the main scheduler, it can be used
* by both the capacity and fair schedulers. Note that at the moment it does
* not account for task priorities within an application, given that such
* priorities might be conflicting with the placement constraints.
*/
@Private
public static final String PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER =
"placement-processor";
/**
* Using this handler, containers with constraints will be placed by the main
* scheduler. If the configured RM scheduler
* <pre>yarn.resourcemanager.scheduler.class</pre>
* cannot handle placement constraints, the corresponding SchedulingRequests
* will be rejected. As of now, only the capacity scheduler supports
* SchedulingRequests. In particular, it currently supports anti-affinity
* constraints (no affinity or cardinality) and places one container at a
* time. The advantage of this handler compared to the placement-processor is
* that it follows the same ordering rules for queues (sorted by utilization,
* priority) and apps (sorted by FIFO/fairness/priority) as the ones followed
* by the main scheduler.
*/
@Private
public static final String
SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER =
"scheduler";
/** Placement Algorithm. */
public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS =

View File

@ -65,7 +65,8 @@ public void testAMRMClientWithPlacementConstraints()
// mismatches between client and server
teardown();
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
createClusterAndStartApplication(conf);
AMRMClient<AMRMClient.ContainerRequest> amClient =

View File

@ -131,9 +131,13 @@
</property>
<property>
<description>Enable Constraint Placement.</description>
<name>yarn.resourcemanager.placement-constraints.enabled</name>
<value>false</value>
<description>
Specify which handler will be used to process PlacementConstraints.
Acceptable values are: `placement-processor`, `scheduler` and `disabled`.
For a detailed explanation of these values, please refer to documentation.
</description>
<name>yarn.resourcemanager.placement-constraints.handler</name>
<value>disabled</value>
</property>
<property>

View File

@ -59,7 +59,6 @@
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementProcessor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@ -67,6 +66,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.AbstractPlacementProcessor;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.DisabledPlacementProcessor;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementConstraintProcessor;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.SchedulerPlacementProcessor;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
@ -118,20 +121,47 @@ protected void serviceInit(Configuration conf) throws Exception {
initializeProcessingChain(conf);
}
private void addPlacementConstraintHandler(Configuration conf) {
String placementConstraintsHandler =
conf.get(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER);
if (placementConstraintsHandler
.equals(YarnConfiguration.DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER)) {
LOG.info(YarnConfiguration.DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER
+ " placement handler will be used, all scheduling requests will "
+ "be rejected.");
amsProcessingChain.addProcessor(new DisabledPlacementProcessor());
} else if (placementConstraintsHandler
.equals(YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER)) {
LOG.info(YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER
+ " placement handler will be used. Scheduling requests will be "
+ "handled by the placement constraint processor");
amsProcessingChain.addProcessor(new PlacementConstraintProcessor());
} else if (placementConstraintsHandler
.equals(YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER)) {
LOG.info(YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER
+ " placement handler will be used. Scheduling requests will be "
+ "handled by the main scheduler.");
amsProcessingChain.addProcessor(new SchedulerPlacementProcessor());
}
}
private void initializeProcessingChain(Configuration conf) {
amsProcessingChain.init(rmContext, null);
boolean enablePlacementConstraints = conf.getBoolean(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED,
YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED);
if (enablePlacementConstraints) {
amsProcessingChain.addProcessor(new PlacementProcessor());
}
addPlacementConstraintHandler(conf);
List<ApplicationMasterServiceProcessor> processors = getProcessorList(conf);
if (processors != null) {
Collections.reverse(processors);
for (ApplicationMasterServiceProcessor p : processors) {
// Ensure only single instance of PlacementProcessor is included
if (enablePlacementConstraints && p instanceof PlacementProcessor) {
if (p instanceof AbstractPlacementProcessor) {
LOG.warn("Found PlacementProcessor=" + p.getClass().getCanonicalName()
+ " defined in "
+ YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS
+ ", however PlacementProcessor handler should be configured "
+ "by using " + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER
+ ", this processor will be ignored.");
continue;
}
this.amsProcessingChain.addProcessor(p);

View File

@ -63,7 +63,6 @@
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@ -1098,18 +1097,6 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
return EMPTY_ALLOCATION;
}
if ((!getConfiguration().getBoolean(
CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
CapacitySchedulerConfiguration.DEFAULT_SCHEDULING_REQUEST_ALLOWED))
&& schedulingRequests != null && (!schedulingRequests.isEmpty())) {
throw new SchedulerInvalidResoureRequestException(
"Application attempt:" + applicationAttemptId
+ " is using SchedulingRequest, which is disabled. Please update "
+ CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED
+ " to true in capacity-scheduler.xml in order to use this "
+ "feature.");
}
// The allocate may be the leftover from previous attempt, and it will
// impact current attempt, such as confuse the request and allocation for
// current attempt's AM container.

View File

@ -78,11 +78,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final String PREFIX = "yarn.scheduler.capacity.";
@Private
public static final String SCHEDULING_REQUEST_ALLOWED =
PREFIX + "scheduling-request.allowed";
public static final boolean DEFAULT_SCHEDULING_REQUEST_ALLOWED = false;
@Private
public static final String DOT = ".";

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
/**
* Base class for all PlacementProcessors.
*/
public abstract class AbstractPlacementProcessor implements
ApplicationMasterServiceProcessor{
private static final Logger LOG =
LoggerFactory.getLogger(AbstractPlacementProcessor.class);
protected ApplicationMasterServiceProcessor nextAMSProcessor;
protected AbstractYarnScheduler scheduler;
private PlacementConstraintManager constraintManager;
@Override
public void init(ApplicationMasterServiceContext amsContext,
ApplicationMasterServiceProcessor nextProcessor) {
this.nextAMSProcessor = nextProcessor;
this.scheduler =
(AbstractYarnScheduler) ((RMContextImpl) amsContext).getScheduler();
this.constraintManager =
((RMContextImpl)amsContext).getPlacementConstraintManager();
}
@Override
public void registerApplicationMaster(
ApplicationAttemptId applicationAttemptId,
RegisterApplicationMasterRequest request,
RegisterApplicationMasterResponse response)
throws IOException, YarnException {
Map<Set<String>, PlacementConstraint> appPlacementConstraints =
request.getPlacementConstraints();
processPlacementConstraints(applicationAttemptId.getApplicationId(),
appPlacementConstraints);
nextAMSProcessor.registerApplicationMaster(applicationAttemptId, request,
response);
}
private void processPlacementConstraints(ApplicationId applicationId,
Map<Set<String>, PlacementConstraint> appPlacementConstraints) {
if (appPlacementConstraints != null && !appPlacementConstraints.isEmpty()) {
LOG.info("Constraints added for application [{}] against tags [{}]",
applicationId, appPlacementConstraints);
constraintManager.registerApplication(
applicationId, appPlacementConstraints);
}
}
@Override
public void finishApplicationMaster(ApplicationAttemptId applicationAttemptId,
FinishApplicationMasterRequest request,
FinishApplicationMasterResponse response) {
constraintManager.unregisterApplication(
applicationAttemptId.getApplicationId());
this.nextAMSProcessor.finishApplicationMaster(applicationAttemptId, request,
response);
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Processor that reject all SchedulingRequests.
*/
public class DisabledPlacementProcessor extends AbstractPlacementProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(DisabledPlacementProcessor.class);
@Override
public void registerApplicationMaster(
ApplicationAttemptId applicationAttemptId,
RegisterApplicationMasterRequest request,
RegisterApplicationMasterResponse response)
throws IOException, YarnException {
if (request.getPlacementConstraints() != null && !request
.getPlacementConstraints().isEmpty()) {
String message = "Found non empty placement constraints map in "
+ "RegisterApplicationMasterRequest for application="
+ applicationAttemptId.toString() + ", but the configured "
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER
+ " cannot handle placement constraints. Rejecting this "
+ "registerApplicationMaster operation";
LOG.warn(message);
throw new YarnException(message);
}
nextAMSProcessor.registerApplicationMaster(applicationAttemptId, request,
response);
}
@Override
public void allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse response) throws YarnException {
if (request.getSchedulingRequests() != null && !request
.getSchedulingRequests().isEmpty()) {
String message = "Found non empty SchedulingRequest in "
+ "AllocateRequest for application="
+ appAttemptId.toString() + ", but the configured "
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER
+ " cannot handle placement constraints. Rejecting this "
+ "allocate operation";
LOG.warn(message);
throw new YarnException(message);
}
nextAMSProcessor.allocate(appAttemptId, request, response);
}
}

View File

@ -24,8 +24,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
@ -33,43 +31,38 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
* An ApplicationMasterService Processor that performs Constrained placement of
* An ApplicationMasterServiceProcessor that performs Constrained placement of
* Scheduling Requests. It does the following:
* 1. All initialization.
* 2. Intercepts placement constraints from the register call and adds it to
* the placement constraint manager.
* 3. Dispatches Scheduling Requests to the Planner.
*/
public class PlacementProcessor implements ApplicationMasterServiceProcessor {
public class PlacementConstraintProcessor extends AbstractPlacementProcessor {
/**
* Wrapper over the SchedulingResponse that wires in the placement attempt
@ -90,11 +83,8 @@ private Response(boolean isSuccess, ApplicationId applicationId,
}
private static final Logger LOG =
LoggerFactory.getLogger(PlacementProcessor.class);
private PlacementConstraintManager constraintManager;
private ApplicationMasterServiceProcessor nextAMSProcessor;
LoggerFactory.getLogger(PlacementConstraintProcessor.class);
private AbstractYarnScheduler scheduler;
private ExecutorService schedulingThreadPool;
private int retryAttempts;
private Map<ApplicationId, List<BatchedRequests>> requestsToRetry =
@ -110,12 +100,8 @@ private Response(boolean isSuccess, ApplicationId applicationId,
public void init(ApplicationMasterServiceContext amsContext,
ApplicationMasterServiceProcessor nextProcessor) {
LOG.info("Initializing Constraint Placement Processor:");
this.nextAMSProcessor = nextProcessor;
this.constraintManager =
((RMContextImpl)amsContext).getPlacementConstraintManager();
super.init(amsContext, nextProcessor);
this.scheduler =
(AbstractYarnScheduler)((RMContextImpl)amsContext).getScheduler();
// Only the first class is considered - even if a comma separated
// list is provided. (This is for simplicity, since getInstances does a
// lot of good things by handling things correctly)
@ -165,28 +151,6 @@ public void init(ApplicationMasterServiceContext amsContext,
LOG.info("Num retry attempts [{}]", this.retryAttempts);
}
@Override
public void registerApplicationMaster(ApplicationAttemptId appAttemptId,
RegisterApplicationMasterRequest request,
RegisterApplicationMasterResponse response)
throws IOException, YarnException {
Map<Set<String>, PlacementConstraint> appPlacementConstraints =
request.getPlacementConstraints();
processPlacementConstraints(
appAttemptId.getApplicationId(), appPlacementConstraints);
nextAMSProcessor.registerApplicationMaster(appAttemptId, request, response);
}
private void processPlacementConstraints(ApplicationId applicationId,
Map<Set<String>, PlacementConstraint> appPlacementConstraints) {
if (appPlacementConstraints != null && !appPlacementConstraints.isEmpty()) {
LOG.info("Constraints added for application [{}] against tags [{}]",
applicationId, appPlacementConstraints);
constraintManager.registerApplication(
applicationId, appPlacementConstraints);
}
}
@Override
public void allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse response) throws YarnException {
@ -311,11 +275,10 @@ private void handleRejectedRequests(ApplicationAttemptId appAttemptId,
public void finishApplicationMaster(ApplicationAttemptId appAttemptId,
FinishApplicationMasterRequest request,
FinishApplicationMasterResponse response) {
constraintManager.unregisterApplication(appAttemptId.getApplicationId());
placementDispatcher.clearApplicationState(appAttemptId.getApplicationId());
requestsToReject.remove(appAttemptId.getApplicationId());
requestsToRetry.remove(appAttemptId.getApplicationId());
nextAMSProcessor.finishApplicationMaster(appAttemptId, request, response);
super.finishApplicationMaster(appAttemptId, request, response);
}
private void handleSchedulingResponse(SchedulingResponse schedulerResponse) {

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Forwarding SchedulingRequests to be handled by the scheduler, as long as the
* scheduler supports SchedulingRequests.
*/
public class SchedulerPlacementProcessor extends AbstractPlacementProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(SchedulerPlacementProcessor.class);
@Override
public void allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse response) throws YarnException {
if (request.getSchedulingRequests() != null
&& !request.getSchedulingRequests().isEmpty()) {
if (!(scheduler instanceof CapacityScheduler)) {
String message = "Found non empty SchedulingRequest of "
+ "AllocateRequest for application=" + appAttemptId.toString()
+ ", however the configured scheduler="
+ scheduler.getClass().getCanonicalName()
+ " cannot handle placement constraints, rejecting this "
+ "allocate operation";
LOG.warn(message);
throw new YarnException(message);
}
}
nextAMSProcessor.allocate(appAttemptId, request, response);
}
}

View File

@ -50,6 +50,8 @@ public void testBasicPendingResourceUpdate() throws Exception {
Configuration conf = TestUtils.getConfigurationWithQueueLabels(
new Configuration(false));
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
@ -166,6 +168,8 @@ public void testNodePartitionPendingResourceUpdate() throws Exception {
Configuration conf = TestUtils.getConfigurationWithQueueLabels(
new Configuration(false));
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(conf);

View File

@ -58,8 +58,8 @@ public void setUp() throws Exception {
public void testIntraAppAntiAffinity() throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
new Configuration());
csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
true);
csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@ -141,8 +141,8 @@ public RMNodeLabelsManager createNodeLabelManager() {
public void testIntraAppAntiAffinityWithMultipleTags() throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
new Configuration());
csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
true);
csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {

View File

@ -57,13 +57,13 @@ public void setUp() throws Exception {
private void testIntraAppAntiAffinityAsync(int numThreads) throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
new Configuration());
csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
true);
csConf.setInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
numThreads);
csConf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ ".scheduling-interval-ms", 0);
csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {

View File

@ -275,8 +275,6 @@ public static <E> Set<E> toSet(E... elements) {
public static Configuration getConfigurationWithQueueLabels(Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
conf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
true);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});

View File

@ -86,8 +86,8 @@ public void createAndStartRM() {
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
conf.setInt(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 1);
startRM(conf);
@ -381,8 +381,8 @@ public void testSchedulerRejection() throws Exception {
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
startRM(conf);
HashMap<NodeId, MockNM> nodes = new HashMap<>();
@ -533,8 +533,8 @@ public void testRePlacementAfterSchedulerRejection() throws Exception {
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
conf.setInt(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 2);
startRM(conf);

View File

@ -12,10 +12,6 @@
limitations under the License. See accompanying LICENSE file.
-->
#set ( $H3 = '###' )
#set ( $H4 = '####' )
#set ( $H5 = '#####' )
Placement Constraints
=====================
@ -29,7 +25,7 @@ YARN allows applications to specify placement constraints in the form of data lo
For example, it may be beneficial to co-locate the allocations of a job on the same rack (*affinity* constraints) to reduce network costs, spread allocations across machines (*anti-affinity* constraints) to minimize resource interference, or allow up to a specific number of allocations in a node group (*cardinality* constraints) to strike a balance between the two. Placement decisions also affect resilience. For example, allocations placed within the same cluster upgrade domain would go offline simultaneously.
The applications can specify constraints without requiring knowledge of the underlying topology of the cluster (e.g., one does not need to specify the specific node or rack where their containers should be placed with constraints) or the other applications deployed. Currently **intra-application** constraints are supported, but the design that is followed is generic and support for constraints across applications will soon be added. Moreover, all constraints at the moment are **hard**, that is, if the constraints for a container cannot be satisfied due to the current cluster condition or conflicting constraints, the container request gets rejected.
The applications can specify constraints without requiring knowledge of the underlying topology of the cluster (e.g., one does not need to specify the specific node or rack where their containers should be placed with constraints) or the other applications deployed. Currently **intra-application** constraints are supported, but the design that is followed is generic and support for constraints across applications will soon be added. Moreover, all constraints at the moment are **hard**, that is, if the constraints for a container cannot be satisfied due to the current cluster condition or conflicting constraints, the container request will remain pending or get will get rejected.
Note that in this document we use the notion of “allocation” to refer to a unit of resources (e.g., CPU and memory) that gets allocated in a node. In the current implementation of YARN, an allocation corresponds to a single container. However, in case an application uses an allocation to spawn more than one containers, an allocation could correspond to multiple containers.
@ -39,31 +35,23 @@ Quick Guide
We first describe how to enable scheduling with placement constraints and then provide examples of how to experiment with this feature using the distributed shell, an application that allows to run a given shell command on a set of containers.
$H3 Enabling placement constraints
### Enabling placement constraints
To enable placement constraints, the following property has to be set to **true** in **conf/yarn-site.xml**:
To enable placement constraints, the following property has to be set to `placement-processor` or `scheduler` in **conf/yarn-site.xml**:
| Property | Description | Default value |
|:-------- |:----------- |:------------- |
| `yarn.resourcemanager.placement-constraints.enabled` | Enables rich placement constraints. | `false` |
| `yarn.resourcemanager.placement-constraints.handler` | Specify which handler will be used to process PlacementConstraints. Acceptable values are: `placement-processor`, `scheduler`, and `disabled`. | `disabled` |
We now give more details about each of the three placement constraint handlers:
Further, the user can choose between the following two alternatives for placing containers with constraints:
* `placement-processor`: Using this handler, the placement of containers with constraints is determined as a pre-processing step before the capacity or the fair scheduler is called. Once the placement is decided, the capacity/fair scheduler is invoked to perform the actual allocation. The advantage of this handler is that it supports all constraint types (affinity, anti-affinity, cardinality). Moreover, it considers multiple containers at a time, which allows to satisfy more constraints than a container-at-a-time approach can achieve. As it sits outside the main scheduler, it can be used by both the capacity and fair schedulers. Note that at the moment it does not account for task priorities within an application, given that such priorities might be conflicting with the placement constraints.
* `scheduler`: Using this handler, containers with constraints will be placed by the main scheduler (as of now, only the capacity scheduler supports SchedulingRequests). It currently supports anti-affinity constraints (no affinity or cardinality). The advantage of this handler, when compared to the `placement-processor`, is that it follows the same ordering rules for queues (sorted by utilization, priority), apps (sorted by FIFO/fairness/priority) and tasks within the same app (priority) that are enforced by the existing main scheduler.
* `disabled`: Using this handler, if a SchedulingRequest is asked by an application, the corresponding allocate call will be rejected.
* **Placement processor:** Following this approach, the placement of containers with constraints is determined as a pre-processing step before the capacity or the fair scheduler is called. Once the placement is decided, the capacity/fair scheduler is invoked to perform the actual allocation. The advantage of this approach is that it supports all constraint types (affinity, anti-affinity, cardinality). Moreover, it considers multiple containers at a time, which allows to satisfy more constraints than a container-at-a-time approach can achieve. As it sits outside the main scheduler, it can be used by both the capacity and fair schedulers. Note that at the moment it does not account for task priorities within an application, given that such priorities might be conflicting with the placement constraints.
* **Placement allocator in capacity scheduler:** This approach places containers with constraints within the capacity scheduler. It currently supports anti-affinity constraints (no affinity or cardinality) and places one container at a time. However, it supports traditional task priorities within an application.
The `placement-processor` handler supports a wider range of constraints and can allow more containers to be placed, especially when applications have demanding constraints or the cluster is highly-utilized (due to considering multiple containers at a time). However, if respecting task priority within an application is important for the user and the capacity scheduler is used, then the `scheduler` handler should be used instead.
The placement processor approach supports a wider range of constraints and can allow more containers to be placed especially when applications have demanding constraints or the cluster is highly-utilized (due to considering multiple containers at a time). However, if respecting task priority within an application is important for the user and the capacity scheduler is used, then the placement allocator in the capacity scheduler should be used instead.
By default, the placement processor approach is enabled. To use the placement allocator in the capacity scheduler instead, the following parameter has to be set to **true** in the **conf/capacity-scheduler.xml**:
| Property | Description | Default value |
|:-------- |:----------- |:------------- |
| `yarn.scheduler.capacity.scheduling-request.allowed` | When set to false, the placement processor is used; when set to true, the allocator inside the capacity scheduler is used. | `false` |
$H3 Experimenting with placement constraints using distributed shell
### Experimenting with placement constraints using distributed shell
Users can experiment with placement constraints by using the distributed shell application through the following command:
@ -101,18 +89,18 @@ The above encodes two constraints:
Defining Placement Constraints
------------------------------
$H3 Allocation tags
### Allocation tags
Allocation tags are string tags that an application can associate with (groups of) its containers. Tags are used to identify components of applications. For example, an HBase Master allocation can be tagged with "hbase-m", and Region Servers with "hbase-rs". Other examples are "latency-critical" to refer to the more general demands of the allocation, or "app_0041" to denote the job ID. Allocation tags play a key role in constraints, as they allow to refer to multiple allocations that share a common tag.
Note that instead of using the `ResourceRequest` object to define allocation tags, we use the new `SchedulingRequest` object. This has many similarities with the `ResourceRequest`, but better separates the sizing of the requested allocations (number and size of allocations, priority, execution type, etc.), and the constraints dictating how these allocations should be placed (resource name, relaxed locality). Applications can still use `ResourceRequest` objects, but in order to define allocation tags and constraints, they need to use the `SchedulingRequest` object. Within a single `AllocateRequest`, an application should use either the `ResourceRequest` or the `SchedulingRequest` objects, but not both of them.
$H4 Differences between node labels, node attributes and allocation tags
#### Differences between node labels, node attributes and allocation tags
The difference between allocation tags and node labels or node attributes (YARN-3409), is that allocation tags are attached to allocations and not to nodes. When an allocation gets allocated to a node by the scheduler, the set of tags of that allocation are automatically added to the node for the duration of the allocation. Hence, a node inherits the tags of the allocations that are currently allocated to the node. Likewise, a rack inherits the tags of its nodes. Moreover, similar to node labels and unlike node attributes, allocation tags have no value attached to them. As we show below, our constraints can refer to allocation tags, as well as node labels and node attributes.
$H3 Placement constraints API
### Placement constraints API
Applications can use the public API in the `PlacementConstraints` to construct placement constraint. Before describing the methods for building constraints, we describe the methods of the `PlacementTargets` class that are used to construct the target expressions that will then be used in constraints:
@ -139,11 +127,10 @@ The methods of the `PlacementConstraints` class for building constraints are the
The `PlacementConstraints` class also includes method for building compound constraints (AND/OR expressions with multiple constraints). Adding support for compound constraints is work in progress.
$H3 Specifying constraints in applications
### Specifying constraints in applications
Applications have to specify the containers for which each constraint will be enabled. To this end, applications can provide a mapping from a set of allocation tags (source tags) to a placement constraint. For example, an entry of this mapping could be "hbase"->constraint1, which means that constraint1 will be applied when scheduling each allocation with tag "hbase".
When using the placement processor approach (see [Enabling placement constraints](#Enabling_placement_constraints)), this constraint mapping is specified within the `RegisterApplicationMasterRequest`.
When using the placement allocator in the capacity scheduler, the constraints can also be added at each `SchedulingRequest` object. Each such constraint is valid for the tag of that scheduling request. In case constraints are specified both at the `ReisterApplicationMasterRequest` and the scheduling requests, the latter override the former.
When using the `placement-processor` handler (see [Enabling placement constraints](#Enabling_placement_constraints)), this constraint mapping is specified within the `RegisterApplicationMasterRequest`.
When using the `scheduler` handler, the constraints can also be added at each `SchedulingRequest` object. Each such constraint is valid for the tag of that scheduling request. In case constraints are specified both at the `RegisterApplicationMasterRequest` and the scheduling requests, the latter override the former.