YARN-5342. Improve non-exclusive node partition resource allocation in Capacity Scheduler. (Sunil G via wangda)

This commit is contained in:
Wangda Tan 2016-07-26 18:14:09 -07:00
parent d84ab8a578
commit 49969b16cd
3 changed files with 19 additions and 11 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
@ -695,15 +696,21 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult,
} }
// Non-exclusive scheduling opportunity is different: we need reset // Non-exclusive scheduling opportunity is different: we need reset
// it every time to make sure non-labeled resource request will be // it when:
// - It allocated on the default partition
//
// This is to make sure non-labeled resource request will be
// most likely allocated on non-labeled nodes first. // most likely allocated on non-labeled nodes first.
application.resetMissedNonPartitionedRequestSchedulingOpportunity( if (StringUtils.equals(node.getPartition(),
schedulerKey); RMNodeLabelsManager.NO_LABEL)) {
application
.resetMissedNonPartitionedRequestSchedulingOpportunity(schedulerKey);
}
} }
return allocationResult; return allocationResult;
} }
private ContainerAllocation allocate(Resource clusterResource, private ContainerAllocation allocate(Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode, FiCaSchedulerNode node, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey, ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey,

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
@ -280,6 +281,7 @@ public void testPriorityWithPendingApplications() throws Exception {
// If app3 (highest priority among rest) gets active, it indicates that // If app3 (highest priority among rest) gets active, it indicates that
// priority is working with pendingApplications. // priority is working with pendingApplications.
rm.killApp(app1.getApplicationId()); rm.killApp(app1.getApplicationId());
rm.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED);
// kick the scheduler, app3 (high among pending) gets free space // kick the scheduler, app3 (high among pending) gets free space
MockAM am3 = MockRM.launchAM(app3, rm, nm1); MockAM am3 = MockRM.launchAM(app3, rm, nm1);

View File

@ -768,8 +768,6 @@ public RMNodeLabelsManager createNodeLabelManager() {
rm1.start(); rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty> MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
ContainerId nextContainerId;
// launch an app to queue b1 (label = y), AM container should be launched in nm3 // launch an app to queue b1 (label = y), AM container should be launched in nm3
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1"); RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
@ -777,12 +775,13 @@ public RMNodeLabelsManager createNodeLabelManager() {
// request containers from am2, priority=1 asks for "" and priority=2 asks // request containers from am2, priority=1 asks for "" and priority=2 asks
// for "y", "y" container should be allocated first // for "y", "y" container should be allocated first
nextContainerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
am1.allocate("*", 1 * GB, 1, 1, new ArrayList<ContainerId>(), ""); am1.allocate("*", 1 * GB, 1, 1, new ArrayList<ContainerId>(), "");
am1.allocate("*", 1 * GB, 1, 2, new ArrayList<ContainerId>(), "y"); am1.allocate("*", 1 * GB, 1, 2, new ArrayList<ContainerId>(), "y");
Assert.assertTrue(rm1.waitForState(nm1, nextContainerId,
RMContainerState.ALLOCATED)); // Do a node heartbeat once
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
cs.handle(new NodeUpdateSchedulerEvent(
rm1.getRMContext().getRMNodes().get(nm1.getNodeId())));
// Check pending resource for am2, priority=1 doesn't get allocated before // Check pending resource for am2, priority=1 doesn't get allocated before
// priority=2 allocated // priority=2 allocated
@ -1674,7 +1673,7 @@ public RMNodeLabelsManager createNodeLabelManager() {
// Test case 7 // Test case 7
// After c allocated, d will go first because it has less used_capacity(x) // After c allocated, d will go first because it has less used_capacity(x)
// than c // than c
doNMHeartbeat(rm, nm1.getNodeId(), 2); doNMHeartbeat(rm, nm1.getNodeId(), 1);
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId())); cs.getApplicationAttempt(am1.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(), checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),