YARN-398. Make it possible to specify hard locality constraints in resource requests for CapacityScheduler. Contributed by Arun C. Murthy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1489087 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-06-03 18:13:53 +00:00
parent 4ee1ec2126
commit 4a14264ddb
7 changed files with 329 additions and 114 deletions

View File

@ -111,6 +111,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-326. Add multi-resource scheduling to the fair scheduler.
(sandyr via tucu)
YARN-398. Make it possible to specify hard locality constraints in resource
requests for CapacityScheduler. (acmurthy)
IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -31,12 +32,14 @@ public class CSAssignment {
private NodeType type;
private final RMContainer excessReservation;
private final FiCaSchedulerApp application;
private final boolean skipped;
public CSAssignment(Resource resource, NodeType type) {
this.resource = resource;
this.type = type;
this.application = null;
this.excessReservation = null;
this.skipped = false;
}
public CSAssignment(FiCaSchedulerApp application, RMContainer excessReservation) {
@ -44,8 +47,16 @@ public CSAssignment(FiCaSchedulerApp application, RMContainer excessReservation)
this.type = NodeType.NODE_LOCAL;
this.application = application;
this.excessReservation = excessReservation;
this.skipped = false;
}
public CSAssignment(boolean skipped) {
this.resource = Resources.createResource(0, 0);
this.type = NodeType.NODE_LOCAL;
this.application = null;
this.excessReservation = null;
this.skipped = skipped;
}
public Resource getResource() {
return resource;
@ -67,6 +78,10 @@ public RMContainer getExcessReservation() {
return excessReservation;
}
public boolean getSkipped() {
return skipped;
}
@Override
public String toString() {
return resource.getMemory() + ":" + type;

View File

@ -784,6 +784,8 @@ private synchronized FiCaSchedulerApp getApplication(
private static final CSAssignment NULL_ASSIGNMENT =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
@Override
public synchronized CSAssignment
assignContainers(Resource clusterResource, FiCaSchedulerNode node) {
@ -853,6 +855,13 @@ private synchronized FiCaSchedulerApp getApplication(
assignContainersOnNode(clusterResource, node, application, priority,
null);
// Did the application skip this node?
if (assignment.getSkipped()) {
// Don't count 'skipped nodes' as a scheduling opportunity!
application.subtractSchedulingOpportunity(priority);
continue;
}
// Did we schedule or reserve a container?
Resource assigned = assignment.getResource();
if (Resources.greaterThan(
@ -1104,73 +1113,88 @@ private CSAssignment assignContainersOnNode(Resource clusterResource,
Resource assigned = Resources.none();
// Data-local
assigned =
assignNodeLocalContainers(clusterResource, node, application, priority,
reservedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
ResourceRequest nodeLocalResourceRequest =
application.getResourceRequest(priority, node.getHostName());
if (nodeLocalResourceRequest != null) {
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
}
}
// Rack-local
assigned =
assignRackLocalContainers(clusterResource, node, application, priority,
reservedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
ResourceRequest rackLocalResourceRequest =
application.getResourceRequest(priority, node.getRackName());
if (rackLocalResourceRequest != null) {
if (!rackLocalResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
}
}
// Off-switch
return new CSAssignment(
assignOffSwitchContainers(clusterResource, node, application,
priority, reservedContainer),
NodeType.OFF_SWITCH);
ResourceRequest offSwitchResourceRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (offSwitchResourceRequest != null) {
if (!offSwitchResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}
return new CSAssignment(
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, application, priority, reservedContainer),
NodeType.OFF_SWITCH);
}
return SKIP_ASSIGNMENT;
}
private Resource assignNodeLocalContainers(Resource clusterResource,
private Resource assignNodeLocalContainers(
Resource clusterResource, ResourceRequest nodeLocalResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application,
Priority priority, RMContainer reservedContainer) {
ResourceRequest request =
application.getResourceRequest(priority, node.getHostName());
if (request != null) {
if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
request, NodeType.NODE_LOCAL, reservedContainer);
}
if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer);
}
return Resources.none();
}
private Resource assignRackLocalContainers(Resource clusterResource,
private Resource assignRackLocalContainers(
Resource clusterResource, ResourceRequest rackLocalResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
ResourceRequest request =
application.getResourceRequest(priority, node.getRackName());
if (request != null) {
if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority, request,
NodeType.RACK_LOCAL, reservedContainer);
}
if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer);
}
return Resources.none();
}
private Resource assignOffSwitchContainers(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
private Resource assignOffSwitchContainers(
Resource clusterResource, ResourceRequest offSwitchResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
ResourceRequest request =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (request != null) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority, request,
NodeType.OFF_SWITCH, reservedContainer);
}
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer);
}
return Resources.none();
}

View File

@ -332,6 +332,11 @@ synchronized public void addSchedulingOpportunity(Priority priority) {
schedulingOpportunities.count(priority) + 1);
}
synchronized public void subtractSchedulingOpportunity(Priority priority) {
int count = schedulingOpportunities.count(priority) - 1;
this.schedulingOpportunities.setCount(priority, Math.max(count, 0));
}
/**
* Return the number of times the application has been given an opportunity
* to schedule a task at the given priority since the last time it

View File

@ -512,7 +512,7 @@ public void testHeadroom() throws Exception {
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
app_0_0_requests.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
priority_1, recordFactory));
true, priority_1, recordFactory));
app_0_0.updateResourceRequests(app_0_0_requests);
// Schedule to compute
@ -531,7 +531,7 @@ public void testHeadroom() throws Exception {
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
app_0_1_requests.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
priority_1, recordFactory));
true, priority_1, recordFactory));
app_0_1.updateResourceRequests(app_0_1_requests);
// Schedule to compute
@ -550,7 +550,7 @@ public void testHeadroom() throws Exception {
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
app_1_0_requests.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
priority_1, recordFactory));
true, priority_1, recordFactory));
app_1_0.updateResourceRequests(app_1_0_requests);
// Schedule to compute

View File

@ -292,8 +292,8 @@ public void testSingleQueueOneUserMetrics() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
priority, recordFactory)));
// Start testing...
@ -414,12 +414,12 @@ public void testSingleQueueWithOneUser() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
// Start testing...
@ -547,12 +547,12 @@ public void testUserLimits() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
/**
* Start testing...
@ -640,12 +640,12 @@ public void testHeadroomWithMaxCap() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
/**
* Start testing...
@ -679,8 +679,8 @@ public void testHeadroomWithMaxCap() throws Exception {
// Submit requests for app_1 and set max-cap
a.setMaxCapacity(.1f);
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
priority, recordFactory)));
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
// No more to user_0 since he is already over user-limit
@ -696,8 +696,8 @@ public void testHeadroomWithMaxCap() throws Exception {
// Check headroom for app_2
LOG.info("here");
app_1.updateResourceRequests(Collections.singletonList( // unset
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
priority, recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
a.assignContainers(clusterResource, node_1);
assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
@ -757,12 +757,12 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true,
priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true,
priority, recordFactory)));
/**
* Start testing...
@ -791,12 +791,12 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Submit resource requests for other apps now to 'activate' them
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 1, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 1, true,
priority, recordFactory)));
app_3.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
// Now allocations should goto app_2 since
// user_0 is at limit inspite of high user-limit-factor
@ -919,12 +919,12 @@ public void testReservation() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
priority, recordFactory)));
// Start testing...
@ -1021,19 +1021,19 @@ public void testStolenReservedContainer() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
priority, recordFactory)));
// Setup app_1 to request a 4GB container on host_0 and
// another 4GB container anywhere.
ArrayList<ResourceRequest> appRequests_1 =
new ArrayList<ResourceRequest>(4);
appRequests_1.add(TestUtils.createResourceRequest(host_0, 4*GB, 1,
priority, recordFactory));
true, priority, recordFactory));
appRequests_1.add(TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1,
priority, recordFactory));
true, priority, recordFactory));
appRequests_1.add(TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 2,
priority, recordFactory));
true, priority, recordFactory));
app_1.updateResourceRequests(appRequests_1);
// Start testing...
@ -1127,12 +1127,12 @@ public void testReservationExchange() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, priority,
recordFactory)));
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
priority, recordFactory)));
// Start testing...
@ -1242,19 +1242,19 @@ public void testLocalityScheduling() throws Exception {
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0, 1*GB, 1,
priority, recordFactory));
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_0, 1*GB, 1,
priority, recordFactory));
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(host_1, 1*GB, 1,
priority, recordFactory));
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
priority, recordFactory));
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one extra
priority, recordFactory));
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
// Start testing...
@ -1313,13 +1313,13 @@ public void testLocalityScheduling() throws Exception {
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(host_1, 1*GB, 1,
priority, recordFactory));
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
priority, recordFactory));
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, // one extra
priority, recordFactory));
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
assertEquals(2, app_0.getTotalRequiredResources(priority));
@ -1385,31 +1385,31 @@ public void testApplicationPriorityScheduling() throws Exception {
Priority priority_1 = TestUtils.createMockPriority(1);
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0, 1*GB, 1,
priority_1, recordFactory));
true, priority_1, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_0, 1*GB, 1,
priority_1, recordFactory));
true, priority_1, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(host_1, 1*GB, 1,
priority_1, recordFactory));
true, priority_1, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
priority_1, recordFactory));
true, priority_1, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
priority_1, recordFactory));
true, priority_1, recordFactory));
// P2
Priority priority_2 = TestUtils.createMockPriority(2);
app_0_requests_0.add(
TestUtils.createResourceRequest(host_2, 2*GB, 1,
priority_2, recordFactory));
true, priority_2, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_2, 2*GB, 1,
priority_2, recordFactory));
true, priority_2, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1,
priority_2, recordFactory));
true, priority_2, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
@ -1513,19 +1513,19 @@ public void testSchedulingConstraints() throws Exception {
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0_0, 1*GB, 1,
priority, recordFactory));
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0_1, 1*GB, 1,
priority, recordFactory));
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_0, 1*GB, 1,
priority, recordFactory));
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(host_1_0, 1*GB, 1,
priority, recordFactory));
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
priority, recordFactory));
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
// Start testing...
@ -1534,7 +1534,7 @@ public void testSchedulingConstraints() throws Exception {
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
priority, recordFactory));
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
// NODE_LOCAL - node_0_1
@ -1557,7 +1557,7 @@ public void testSchedulingConstraints() throws Exception {
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
priority, recordFactory));
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
// No allocation on node_0_1 even though it's node/rack local since
@ -1731,7 +1731,174 @@ public void testInheritedQueueAcls() throws IOException {
c1.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));
}
@Test
public void testLocalityConstraints() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
// User
String user_0 = "user_0";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext));
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext));
a.submitApplication(app_1, user_0, A);
// Setup some nodes and racks
String host_0_0 = "127.0.0.1";
String rack_0 = "rack_0";
FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
String host_0_1 = "127.0.0.2";
FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
String host_1_0 = "127.0.0.3";
String rack_1 = "rack_1";
FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
String host_1_1 = "127.0.0.4";
FiCaSchedulerNode node_1_1 = TestUtils.getMockNode(host_1_1, rack_1, 0, 8*GB);
final int numNodes = 4;
Resource clusterResource = Resources.createResource(
numNodes * (8*GB), numNodes * 1);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
// resourceName: <priority, memory, #containers, relaxLocality>
// host_0_0: < 1, 1GB, 1, true >
// host_0_1: < null >
// rack_0: < null > <----
// host_1_0: < 1, 1GB, 1, true >
// host_1_1: < null >
// rack_1: < 1, 1GB, 1, false > <----
// ANY: < 1, 1GB, 1, false > <----
// Availability:
// host_0_0: 8G
// host_0_1: 8G
// host_1_0: 8G
// host_1_1: 8G
Priority priority = TestUtils.createMockPriority(1);
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0_0, 1*GB, 1,
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(host_1_0, 1*GB, 1,
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
false, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
false, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_0_requests_0.clear();
//
// Start testing...
//
// node_0_1
// Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
a.assignContainers(clusterResource, node_0_1);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// resourceName: <priority, memory, #containers, relaxLocality>
// host_0_0: < 1, 1GB, 1, true >
// host_0_1: < null >
// rack_0: < null > <----
// host_1_0: < 1, 1GB, 1, true >
// host_1_1: < null >
// rack_1: < 1, 1GB, 1, false > <----
// ANY: < 1, 1GB, 1, false > <----
// Availability:
// host_0_0: 8G
// host_0_1: 8G
// host_1_0: 8G
// host_1_1: 8G
// node_1_1
// Shouldn't allocate since RR(rack_1) = relax: false
a.assignContainers(clusterResource, node_1_1);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// Allow rack-locality for rack_1
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_0_requests_0.clear();
// resourceName: <priority, memory, #containers, relaxLocality>
// host_0_0: < 1, 1GB, 1, true >
// host_0_1: < null >
// rack_0: < null >
// host_1_0: < 1, 1GB, 1, true >
// host_1_1: < null >
// rack_1: < 1, 1GB, 1, true > <----
// ANY: < 1, 1GB, 1, false >
// Availability:
// host_0_0: 8G
// host_0_1: 8G
// host_1_0: 8G
// host_1_1: 8G
// node_1_1
// Now, should allocate since RR(rack_1) = relax: true
a.assignContainers(clusterResource, node_1_1);
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority));
assertEquals(0, app_0.getTotalRequiredResources(priority));
// Now sanity-check node_local
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
false, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
false, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_0_requests_0.clear();
// resourceName: <priority, memory, #containers, relaxLocality>
// host_0_0: < 1, 1GB, 1, true >
// host_0_1: < null >
// rack_0: < null >
// host_1_0: < 1, 1GB, 1, true >
// host_1_1: < null >
// rack_1: < 1, 1GB, 1, false > <----
// ANY: < 1, 1GB, 1, false > <----
// Availability:
// host_0_0: 8G
// host_0_1: 8G
// host_1_0: 8G
// host_1_1: 7G
a.assignContainers(clusterResource, node_1_0);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority));
assertEquals(0, app_0.getTotalRequiredResources(priority));
}
@After
public void tearDown() throws Exception {
}

View File

@ -115,15 +115,16 @@ public static Priority createMockPriority( int priority) {
}
public static ResourceRequest createResourceRequest(
String hostName, int memory, int numContainers, Priority priority,
RecordFactory recordFactory) {
String resourceName, int memory, int numContainers, boolean relaxLocality,
Priority priority, RecordFactory recordFactory) {
ResourceRequest request =
recordFactory.newRecordInstance(ResourceRequest.class);
Resource capability = Resources.createResource(memory, 1);
request.setNumContainers(numContainers);
request.setResourceName(hostName);
request.setResourceName(resourceName);
request.setCapability(capability);
request.setRelaxLocality(relaxLocality);
request.setPriority(priority);
return request;
}