YARN-392. Make it possible to specify hard locality constraints in resource requests. (sandyr via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1488326 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-05-31 17:26:18 +00:00
parent 0a452a30ab
commit 781e82ca9a
6 changed files with 232 additions and 19 deletions

View File

@ -96,6 +96,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-422. Add a NM Client library to help application-writers. (Zhijie Shen YARN-422. Add a NM Client library to help application-writers. (Zhijie Shen
via vinodkv) via vinodkv)
YARN-392. Make it possible to specify hard locality constraints in resource
requests. (sandyr via tucu)
IMPROVEMENTS IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event YARN-365. Change NM heartbeat handling to not generate a scheduler event

View File

@ -155,6 +155,44 @@ public static boolean isAnyLocation(String hostName) {
@Stable @Stable
public abstract void setNumContainers(int numContainers); public abstract void setNumContainers(int numContainers);
/**
* Get whether locality relaxation is enabled with this
* <code>ResourceRequest</code>. Defaults to true.
*
* @return whether locality relaxation is enabled with this
* <code>ResourceRequest</code>.
*/
@Public
@Stable
public abstract boolean getRelaxLocality();
/**
* For a request at a network hierarchy level, set whether locality can be relaxed
* to that level and beyond.
*
* If the flag is off on a rack-level <code>ResourceRequest</code>,
* containers at that request's priority will not be assigned to nodes on that
* request's rack unless requests specifically for those nodes have also been
* submitted.
*
* If the flag is off on an {@link ResourceRequest#ANY}-level
* <code>ResourceRequest</code>, containers at that request's priority will
* only be assigned on racks for which specific requests have also been
* submitted.
*
* For example, to request a container strictly on a specific node, the
* corresponding rack-level and any-level requests should have locality
* relaxation set to false. Similarly, to request a container strictly on a
* specific rack, the corresponding any-level request should have locality
* relaxation set to false.
*
* @param relaxLocality whether locality relaxation is enabled with this
* <code>ResourceRequest</code>.
*/
@Public
@Stable
public abstract void setRelaxLocality(boolean relaxLocality);
@Override @Override
public int hashCode() { public int hashCode() {
final int prime = 2153; final int prime = 2153;

View File

@ -147,6 +147,18 @@ public void setNumContainers(int numContainers) {
builder.setNumContainers((numContainers)); builder.setNumContainers((numContainers));
} }
@Override
public boolean getRelaxLocality() {
ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
return p.getRelaxLocality();
}
@Override
public void setRelaxLocality(boolean relaxLocality) {
maybeInitBuilder();
builder.setRelaxLocality(relaxLocality);
}
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p); return new PriorityPBImpl(p);
} }
@ -167,6 +179,7 @@ private ResourceProto convertToProtoFormat(Resource t) {
public String toString() { public String toString() {
return "{Priority: " + getPriority() + ", Capability: " + getCapability() return "{Priority: " + getPriority() + ", Capability: " + getCapability()
+ ", # Containers: " + getNumContainers() + ", # Containers: " + getNumContainers()
+ ", Location: " + getHostName() + "}"; + ", Location: " + getHostName()
+ ", Relax Locality: " + getRelaxLocality() + "}";
} }
} }

View File

@ -204,6 +204,7 @@ message ResourceRequestProto {
optional string host_name = 2; optional string host_name = 2;
optional ResourceProto capability = 3; optional ResourceProto capability = 3;
optional int32 num_containers = 4; optional int32 num_containers = 4;
optional bool relax_locality = 5 [default = true];
} }
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////

View File

@ -316,6 +316,11 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
ResourceRequest localRequest = app.getResourceRequest(priority, ResourceRequest localRequest = app.getResourceRequest(priority,
node.getHostName()); node.getHostName());
if (localRequest != null && !localRequest.getRelaxLocality()) {
LOG.warn("Relax locality off is not supported on local request: "
+ localRequest);
}
NodeType allowedLocality = app.getAllowedLocalityLevel(priority, NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(), scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
scheduler.getRackLocalityThreshold()); scheduler.getRackLocalityThreshold());
@ -326,6 +331,10 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
localRequest, NodeType.NODE_LOCAL, reserved); localRequest, NodeType.NODE_LOCAL, reserved);
} }
if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
continue;
}
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) || && (allowedLocality.equals(NodeType.RACK_LOCAL) ||
allowedLocality.equals(NodeType.OFF_SWITCH))) { allowedLocality.equals(NodeType.OFF_SWITCH))) {
@ -335,6 +344,10 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
ResourceRequest offSwitchRequest = app.getResourceRequest(priority, ResourceRequest offSwitchRequest = app.getResourceRequest(priority,
ResourceRequest.ANY); ResourceRequest.ANY);
if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
continue;
}
if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0 if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
&& allowedLocality.equals(NodeType.OFF_SWITCH)) { && allowedLocality.equals(NodeType.OFF_SWITCH)) {
return assignContainer(node, priority, offSwitchRequest, return assignContainer(node, priority, offSwitchRequest,
@ -359,10 +372,23 @@ public Resource assignContainer(FSSchedulerNode node) {
* given node, if the node had full space. * given node, if the node had full space.
*/ */
public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) { public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
// TODO: add checks stuff about node specific scheduling here ResourceRequest anyRequest = app.getResourceRequest(prio, ResourceRequest.ANY);
ResourceRequest request = app.getResourceRequest(prio, ResourceRequest.ANY); ResourceRequest rackRequest = app.getResourceRequest(prio, node.getRackName());
return request.getNumContainers() > 0 && ResourceRequest nodeRequest = app.getResourceRequest(prio, node.getHostName());
return
// There must be outstanding requests at the given priority:
anyRequest != null && anyRequest.getNumContainers() > 0 &&
// If locality relaxation is turned off at *-level, there must be a
// non-zero request for the node's rack:
(anyRequest.getRelaxLocality() ||
(rackRequest != null && rackRequest.getNumContainers() > 0)) &&
// If locality relaxation is turned off at rack-level, there must be a
// non-zero request at the node:
(rackRequest == null || rackRequest.getRelaxLocality() ||
(nodeRequest != null && nodeRequest.getNumContainers() > 0)) &&
// The requested container must be able to fit on the node:
Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null, Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
request.getCapability(), node.getRMNode().getTotalCapability()); anyRequest.getCapability(), node.getRMNode().getTotalCapability());
} }
} }

View File

@ -151,7 +151,7 @@ private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
private ResourceRequest createResourceRequest(int memory, String host, private ResourceRequest createResourceRequest(int memory, String host,
int priority, int numContainers) { int priority, int numContainers, boolean relaxLocality) {
ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
request.setCapability(Resources.createResource(memory)); request.setCapability(Resources.createResource(memory));
request.setHostName(host); request.setHostName(host);
@ -159,6 +159,7 @@ private ResourceRequest createResourceRequest(int memory, String host,
Priority prio = recordFactory.newRecordInstance(Priority.class); Priority prio = recordFactory.newRecordInstance(Priority.class);
prio.setPriority(priority); prio.setPriority(priority);
request.setPriority(prio); request.setPriority(prio);
request.setRelaxLocality(relaxLocality);
return request; return request;
} }
@ -182,7 +183,7 @@ private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
scheduler.addApplication(id, queueId, userId); scheduler.addApplication(id, queueId, userId);
List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY, ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
priority, numContainers); priority, numContainers, true);
ask.add(request); ask.add(request);
scheduler.allocate(id, ask, new ArrayList<ContainerId>()); scheduler.allocate(id, ask, new ArrayList<ContainerId>());
return id; return id;
@ -190,9 +191,14 @@ private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
private void createSchedulingRequestExistingApplication(int memory, int priority, private void createSchedulingRequestExistingApplication(int memory, int priority,
ApplicationAttemptId attId) { ApplicationAttemptId attId) {
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY, ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
priority, 1); priority, 1, true);
createSchedulingRequestExistingApplication(request, attId);
}
private void createSchedulingRequestExistingApplication(ResourceRequest request,
ApplicationAttemptId attId) {
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ask.add(request); ask.add(request);
scheduler.allocate(attId, ask, new ArrayList<ContainerId>()); scheduler.allocate(attId, ask, new ArrayList<ContainerId>());
} }
@ -499,14 +505,16 @@ public void testQueueDemandCalculation() throws Exception {
// First ask, queue1 requests 1 large (minReqSize * 2). // First ask, queue1 requests 1 large (minReqSize * 2).
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>(); List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ResourceRequest request1 = ResourceRequest request1 =
createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1); createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1, true);
ask1.add(request1); ask1.add(request1);
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>()); scheduler.allocate(id11, ask1, new ArrayList<ContainerId>());
// Second ask, queue2 requests 1 large + (2 * minReqSize) // Second ask, queue2 requests 1 large + (2 * minReqSize)
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>(); List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
ResourceRequest request2 = createResourceRequest(2 * minReqSize, "foo", 1, 1); ResourceRequest request2 = createResourceRequest(2 * minReqSize, "foo", 1, 1,
ResourceRequest request3 = createResourceRequest(minReqSize, "bar", 1, 2); false);
ResourceRequest request3 = createResourceRequest(minReqSize, "bar", 1, 2,
false);
ask2.add(request2); ask2.add(request2);
ask2.add(request3); ask2.add(request3);
scheduler.allocate(id21, ask2, new ArrayList<ContainerId>()); scheduler.allocate(id21, ask2, new ArrayList<ContainerId>());
@ -514,7 +522,7 @@ public void testQueueDemandCalculation() throws Exception {
// Third ask, queue2 requests 1 large // Third ask, queue2 requests 1 large
List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>(); List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
ResourceRequest request4 = ResourceRequest request4 =
createResourceRequest(2 * minReqSize, ResourceRequest.ANY, 1, 1); createResourceRequest(2 * minReqSize, ResourceRequest.ANY, 1, 1, true);
ask3.add(request4); ask3.add(request4);
scheduler.allocate(id22, ask3, new ArrayList<ContainerId>()); scheduler.allocate(id22, ask3, new ArrayList<ContainerId>());
@ -1408,12 +1416,12 @@ public void testMultipleNodesSingleRackRequest() throws Exception {
// 1 request with 2 nodes on the same rack. another request with 1 node on // 1 request with 2 nodes on the same rack. another request with 1 node on
// a different rack // a different rack
List<ResourceRequest> asks = new ArrayList<ResourceRequest>(); List<ResourceRequest> asks = new ArrayList<ResourceRequest>();
asks.add(createResourceRequest(1024, node1.getHostName(), 1, 1)); asks.add(createResourceRequest(1024, node1.getHostName(), 1, 1, true));
asks.add(createResourceRequest(1024, node2.getHostName(), 1, 1)); asks.add(createResourceRequest(1024, node2.getHostName(), 1, 1, true));
asks.add(createResourceRequest(1024, node3.getHostName(), 1, 1)); asks.add(createResourceRequest(1024, node3.getHostName(), 1, 1, true));
asks.add(createResourceRequest(1024, node1.getRackName(), 1, 1)); asks.add(createResourceRequest(1024, node1.getRackName(), 1, 1, true));
asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1)); asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true));
asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2)); asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true));
scheduler.allocate(appId, asks, new ArrayList<ContainerId>()); scheduler.allocate(appId, asks, new ArrayList<ContainerId>());
@ -1693,5 +1701,129 @@ public void testRemoveNodeUpdatesRootQueueMetrics() {
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
scheduler.update(); // update shouldn't change things scheduler.update(); // update shouldn't change things
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
}
@Test
public void testStrictLocality() {
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
"user1", 0);
ResourceRequest nodeRequest = createResourceRequest(1024, node1.getHostName(), 1, 1, true);
ResourceRequest rackRequest = createResourceRequest(1024, node1.getRackName(), 1, 1, false);
ResourceRequest anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
1, 1, false);
createSchedulingRequestExistingApplication(nodeRequest, attId1);
createSchedulingRequestExistingApplication(rackRequest, attId1);
createSchedulingRequestExistingApplication(anyRequest, attId1);
scheduler.update();
NodeUpdateSchedulerEvent node1UpdateEvent = new NodeUpdateSchedulerEvent(node1);
NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2);
// no matter how many heartbeats, node2 should never get a container
FSSchedulerApp app = scheduler.applications.get(attId1);
for (int i = 0; i < 10; i++) {
scheduler.handle(node2UpdateEvent);
assertEquals(0, app.getLiveContainers().size());
assertEquals(0, app.getReservedContainers().size());
}
// then node1 should get the container
scheduler.handle(node1UpdateEvent);
assertEquals(1, app.getLiveContainers().size());
}
@Test
public void testCancelStrictLocality() {
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
"user1", 0);
ResourceRequest nodeRequest = createResourceRequest(1024, node1.getHostName(), 1, 1, true);
ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 1, false);
ResourceRequest anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
1, 1, false);
createSchedulingRequestExistingApplication(nodeRequest, attId1);
createSchedulingRequestExistingApplication(rackRequest, attId1);
createSchedulingRequestExistingApplication(anyRequest, attId1);
scheduler.update();
NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2);
// no matter how many heartbeats, node2 should never get a container
FSSchedulerApp app = scheduler.applications.get(attId1);
for (int i = 0; i < 10; i++) {
scheduler.handle(node2UpdateEvent);
assertEquals(0, app.getLiveContainers().size());
}
// relax locality
List<ResourceRequest> update = Arrays.asList(
createResourceRequest(1024, node1.getHostName(), 1, 0, true),
createResourceRequest(1024, "rack1", 1, 0, true),
createResourceRequest(1024, ResourceRequest.ANY, 1, 1, true));
scheduler.allocate(attId1, update, new ArrayList<ContainerId>());
// then node2 should get the container
scheduler.handle(node2UpdateEvent);
assertEquals(1, app.getLiveContainers().size());
}
/**
* If we update our ask to strictly request a node, it doesn't make sense to keep
* a reservation on another.
*/
@Test
public void testReservationsStrictLocality() {
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
"user1", 0);
FSSchedulerApp app = scheduler.applications.get(attId);
ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true);
ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true);
ResourceRequest anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
1, 2, false);
createSchedulingRequestExistingApplication(nodeRequest, attId);
createSchedulingRequestExistingApplication(rackRequest, attId);
createSchedulingRequestExistingApplication(anyRequest, attId);
scheduler.update();
NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeUpdateEvent);
assertEquals(1, app.getLiveContainers().size());
scheduler.handle(nodeUpdateEvent);
assertEquals(1, app.getReservedContainers().size());
// now, make our request node-specific (on a different node)
rackRequest = createResourceRequest(1024, "rack1", 1, 1, false);
anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
1, 1, false);
scheduler.allocate(attId, Arrays.asList(rackRequest, anyRequest),
new ArrayList<ContainerId>());
scheduler.handle(nodeUpdateEvent);
assertEquals(0, app.getReservedContainers().size());
} }
} }