YARN-6344. Add parameter for rack locality delay in CapacityScheduler. (kkaranasos)

This commit is contained in:
Konstantinos Karanasos 2017-04-10 15:25:33 -07:00
parent e9ac61cc0e
commit 7999318af1
6 changed files with 235 additions and 22 deletions

View File

@ -111,9 +111,27 @@
<value>40</value>
<description>
Number of missed scheduling opportunities after which the CapacityScheduler
attempts to schedule rack-local containers.
Typically this should be set to number of nodes in the cluster, By default is setting
approximately number of nodes in one rack which is 40.
attempts to schedule rack-local containers.
When setting this parameter, the size of the cluster should be taken into account.
We use 40 as the default value, which is approximately the number of nodes in one rack.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.rack-locality-additional-delay</name>
<value>-1</value>
<description>
Number of additional missed scheduling opportunities over the node-locality-delay
ones, after which the CapacityScheduler attempts to schedule off-switch containers,
instead of rack-local ones.
Example: with node-locality-delay=40 and rack-locality-delay=20, the scheduler will
attempt rack-local assignments after 40 missed opportunities, and off-switch assignments
after 40+20=60 missed opportunities.
When setting this parameter, the size of the cluster should be taken into account.
We use -1 as the default value, which disables this feature. In this case, the number
of missed opportunities for assigning off-switch containers is calculated based on
the number of containers and unique locations specified in the resource request,
as well as the size of the cluster.
</description>
</property>

View File

@ -1304,6 +1304,11 @@ public <N extends SchedulerNode> SchedulingPlacementSet<N> getSchedulingPlacemen
return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey);
}
public Map<String, ResourceRequest> getResourceRequests(
SchedulerRequestKey schedulerRequestKey) {
return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey)
.getResourceRequests();
}
public void incUnconfirmedRes(Resource res) {
unconfirmedAllocatedMem.addAndGet(res.getMemorySize());

View File

@ -197,6 +197,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final int DEFAULT_NODE_LOCALITY_DELAY = 40;
@Private
public static final String RACK_LOCALITY_ADDITIONAL_DELAY =
PREFIX + "rack-locality-additional-delay";
@Private
public static final int DEFAULT_RACK_LOCALITY_ADDITIONAL_DELAY = -1;
@Private
public static final String RACK_LOCALITY_FULL_RESET =
PREFIX + "rack-locality-full-reset";
@ -829,6 +836,11 @@ public int getNodeLocalityDelay() {
return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
}
public int getRackLocalityAdditionalDelay() {
return getInt(RACK_LOCALITY_ADDITIONAL_DELAY,
DEFAULT_RACK_LOCALITY_ADDITIONAL_DELAY);
}
public boolean getRackLocalityFullReset() {
return getBoolean(RACK_LOCALITY_FULL_RESET,
DEFAULT_RACK_LOCALITY_FULL_RESET);

View File

@ -95,6 +95,7 @@ public class LeafQueue extends AbstractCSQueue {
private float maxAMResourcePerQueuePercent;
private volatile int nodeLocalityDelay;
private volatile int rackLocalityAdditionalDelay;
private volatile boolean rackLocalityFullReset;
Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
@ -215,6 +216,7 @@ protected void setupQueueConfigs(Resource clusterResource)
}
nodeLocalityDelay = conf.getNodeLocalityDelay();
rackLocalityAdditionalDelay = conf.getRackLocalityAdditionalDelay();
rackLocalityFullReset = conf.getRackLocalityFullReset();
// re-init this since max allocation could have changed
@ -271,9 +273,12 @@ protected void setupQueueConfigs(Resource clusterResource)
+ "numContainers = " + numContainers
+ " [= currentNumContainers ]" + "\n" + "state = " + getState()
+ " [= configuredState ]" + "\n" + "acls = " + aclsString
+ " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = "
+ nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder
.toString() + "\n" + "reservationsContinueLooking = "
+ " [= configuredAcls ]" + "\n"
+ "nodeLocalityDelay = " + nodeLocalityDelay + "\n"
+ "rackLocalityAdditionalDelay = "
+ rackLocalityAdditionalDelay + "\n"
+ "labels=" + labelStrBuilder.toString() + "\n"
+ "reservationsContinueLooking = "
+ reservationsContinueLooking + "\n" + "preemptionDisabled = "
+ getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = "
+ defaultAppPriorityPerQueue + "\npriority = " + priority);
@ -1346,6 +1351,11 @@ public int getNodeLocalityDelay() {
return nodeLocalityDelay;
}
@Lock(NoLock.class)
public int getRackLocalityAdditionalDelay() {
return rackLocalityAdditionalDelay;
}
@Lock(NoLock.class)
public boolean getRackLocalityFullReset() {
return rackLocalityFullReset;

View File

@ -278,6 +278,12 @@ private int getActualNodeLocalityDelay() {
.getCSLeafQueue().getNodeLocalityDelay());
}
private int getActualRackLocalityDelay() {
return Math.min(rmContext.getScheduler().getNumClusterNodes(),
application.getCSLeafQueue().getNodeLocalityDelay()
+ application.getCSLeafQueue().getRackLocalityAdditionalDelay());
}
private boolean canAssign(SchedulerRequestKey schedulerKey,
FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) {
@ -286,26 +292,37 @@ private boolean canAssign(SchedulerRequestKey schedulerKey,
if (reservedContainer != null) {
return true;
}
// If there are no nodes in the cluster, return false.
if (rmContext.getScheduler().getNumClusterNodes() == 0) {
return false;
}
// If we have only ANY requests for this schedulerKey, we should not
// delay its scheduling.
if (application.getResourceRequests(schedulerKey).size() == 1) {
return true;
}
// 'Delay' off-switch
long missedOpportunities =
application.getSchedulingOpportunities(schedulerKey);
long requiredContainers = application.getOutstandingAsksCount(
schedulerKey);
float localityWaitFactor =
getLocalityWaitFactor(schedulerKey, rmContext.getScheduler()
.getNumClusterNodes());
// Cap the delay by the number of nodes in the cluster. Under most
// conditions this means we will consider each node in the cluster before
// accepting an off-switch assignment.
return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
(requiredContainers * localityWaitFactor)) < missedOpportunities);
// If rack locality additional delay parameter is enabled.
if (application.getCSLeafQueue().getRackLocalityAdditionalDelay() > -1) {
return missedOpportunities > getActualRackLocalityDelay();
} else {
long requiredContainers =
application.getOutstandingAsksCount(schedulerKey);
float localityWaitFactor = getLocalityWaitFactor(schedulerKey,
rmContext.getScheduler().getNumClusterNodes());
// Cap the delay by the number of nodes in the cluster.
return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
(requiredContainers * localityWaitFactor)) < missedOpportunities);
}
}
// Check if we need containers on this rack
if (application.getOutstandingAsksCount(schedulerKey, node.getRackName())
<= 0) {
if (application.getOutstandingAsksCount(schedulerKey,
node.getRackName()) <= 0) {
return false;
}

View File

@ -104,7 +104,7 @@
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey;
public class TestLeafQueue {
public class TestLeafQueue {
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
@ -2105,6 +2105,154 @@ public void testLocalityScheduling() throws Exception {
assertEquals(numNodes+1, app_0.getSchedulingOpportunities(schedulerKey));
}
@Test
public void testRackLocalityDelayScheduling() throws Exception {
// Change parameter values for node locality and rack locality delay.
csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 2);
csConf.setInt(
CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY, 1);
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext,
csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
root.reinitialize(newRoot, cs.getClusterResource());
// Manipulate queue 'b'
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
// Check locality parameters.
assertEquals(2, a.getNodeLocalityDelay());
assertEquals(1, a.getRackLocalityAdditionalDelay());
// User
String user1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId1 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app1 = new FiCaSchedulerApp(appAttemptId1, user1, a,
mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app1, user1);
// Setup some nodes and racks
String host1 = "127.0.0.1";
String host2 = "127.0.0.2";
String host3 = "127.0.0.3";
String host4 = "127.0.0.4";
String rack1 = "rack_1";
String rack2 = "rack_2";
String rack3 = "rack_3";
FiCaSchedulerNode node2 = TestUtils.getMockNode(host3, rack2, 0, 8 * GB);
FiCaSchedulerNode node3 = TestUtils.getMockNode(host4, rack3, 0, 8 * GB);
Map<ApplicationAttemptId, FiCaSchedulerApp> apps =
ImmutableMap.of(app1.getApplicationAttemptId(), app1);
Map<NodeId, FiCaSchedulerNode> nodes =
ImmutableMap.of(node2.getNodeID(), node2, node3.getNodeID(), node3);
final int numNodes = 5;
Resource clusterResource =
Resources.createResource(numNodes * (8 * GB), numNodes * 16);
when(spyRMContext.getScheduler().getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests and submit
Priority priority = TestUtils.createMockPriority(1);
List<ResourceRequest> app1Requests1 = new ArrayList<ResourceRequest>();
app1Requests1.add(TestUtils.createResourceRequest(host1, 1 * GB, 1,
true, priority, recordFactory));
app1Requests1.add(TestUtils.createResourceRequest(rack1, 1 * GB, 1,
true, priority, recordFactory));
app1Requests1.add(TestUtils.createResourceRequest(host2, 1 * GB, 1,
true, priority, recordFactory));
app1Requests1.add(TestUtils.createResourceRequest(rack2, 1 * GB, 1,
true, priority, recordFactory));
// Adding one extra in the ANY.
app1Requests1.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 3, true, priority, recordFactory));
app1.updateResourceRequests(app1Requests1);
// Start testing...
CSAssignment assignment = null;
SchedulerRequestKey schedulerKey = toSchedulerKey(priority);
assertEquals(3, app1.getOutstandingAsksCount(schedulerKey));
// No rack-local yet.
assignment = a.assignContainers(clusterResource, node2,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, a, nodes, apps);
verifyNoContainerAllocated(assignment);
assertEquals(1, app1.getSchedulingOpportunities(schedulerKey));
assertEquals(3, app1.getOutstandingAsksCount(schedulerKey));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Still no rack-local.
assignment = a.assignContainers(clusterResource, node2,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, a, nodes, apps);
assertEquals(2, app1.getSchedulingOpportunities(schedulerKey));
assertEquals(3, app1.getOutstandingAsksCount(schedulerKey));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Rack local now.
assignment = a.assignContainers(clusterResource, node2,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, a, nodes, apps);
assertEquals(0, app1.getSchedulingOpportunities(schedulerKey));
assertEquals(2, app1.getOutstandingAsksCount(schedulerKey));
assertEquals(NodeType.RACK_LOCAL, assignment.getType());
// No off-switch until 3 missed opportunities.
a.assignContainers(clusterResource, node3,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, a, nodes, apps);
a.assignContainers(clusterResource, node3,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, a, nodes, apps);
assignment = a.assignContainers(clusterResource, node3,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, a, nodes, apps);
assertEquals(3, app1.getSchedulingOpportunities(schedulerKey));
assertEquals(2, app1.getOutstandingAsksCount(schedulerKey));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Now off-switch should succeed.
assignment = a.assignContainers(clusterResource, node3,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, a, nodes, apps);
assertEquals(4, app1.getSchedulingOpportunities(schedulerKey));
assertEquals(1, app1.getOutstandingAsksCount(schedulerKey));
assertEquals(NodeType.OFF_SWITCH, assignment.getType());
// Check capping by number of cluster nodes.
doReturn(10).when(a).getRackLocalityAdditionalDelay();
// Off-switch will happen at 6 missed opportunities now, since cluster size
// is 5.
assignment = a.assignContainers(clusterResource, node3,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, a, nodes, apps);
assertEquals(5, app1.getSchedulingOpportunities(schedulerKey));
assertEquals(1, app1.getOutstandingAsksCount(schedulerKey));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
assignment = a.assignContainers(clusterResource, node3,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, a, nodes, apps);
assertEquals(6, app1.getSchedulingOpportunities(schedulerKey));
assertEquals(0, app1.getOutstandingAsksCount(schedulerKey));
assertEquals(NodeType.OFF_SWITCH, assignment.getType());
}
@Test
public void testApplicationPriorityScheduling() throws Exception {
// Manipulate queue 'a'
@ -2410,16 +2558,18 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception {
}
@Test (timeout = 30000)
public void testNodeLocalityAfterQueueRefresh() throws Exception {
public void testLocalityDelaysAfterQueueRefresh() throws Exception {
// Manipulate queue 'e'
LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
// before reinitialization
assertEquals(40, e.getNodeLocalityDelay());
assertEquals(-1, e.getRackLocalityAdditionalDelay());
csConf.setInt(CapacitySchedulerConfiguration
.NODE_LOCALITY_DELAY, 60);
csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 60);
csConf.setInt(
CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY, 600);
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
@ -2431,6 +2581,7 @@ public void testNodeLocalityAfterQueueRefresh() throws Exception {
// after reinitialization
assertEquals(60, e.getNodeLocalityDelay());
assertEquals(600, e.getRackLocalityAdditionalDelay());
}
@Test (timeout = 30000)