YARN-4865. Track Reserved resources in ResourceUsage and QueueCapacities. (Sunil G via wangda)

This commit is contained in:
Wangda Tan 2016-03-29 17:07:55 -07:00
parent ddfe6774c2
commit fc055a3cbe
10 changed files with 222 additions and 7 deletions

View File

@ -118,4 +118,24 @@ public void recoverContainer(Resource clusterResource,
* @return default application priority
*/
public Priority getDefaultApplicationPriority();
/**
* Increment Reserved Capacity
*
* @param partition
* asked by application
* @param reservedRes
* reserved resource asked
*/
public void incReservedResource(String partition, Resource reservedRes);
/**
* Decrement Reserved Capacity
*
* @param partition
* asked by application
* @param reservedRes
* reserved resource asked
*/
public void decReservedResource(String partition, Resource reservedRes);
}

View File

@ -535,6 +535,30 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
return true;
}
@Override
public void incReservedResource(String partition, Resource reservedRes) {
if (partition == null) {
partition = RMNodeLabelsManager.NO_LABEL;
}
queueUsage.incReserved(partition, reservedRes);
if(null != parent){
parent.incReservedResource(partition, reservedRes);
}
}
@Override
public void decReservedResource(String partition, Resource reservedRes) {
if (partition == null) {
partition = RMNodeLabelsManager.NO_LABEL;
}
queueUsage.decReserved(partition, reservedRes);
if(null != parent){
parent.decReservedResource(partition, reservedRes);
}
}
@Override
public void incPendingResource(String nodeLabel, Resource resourceToInc) {
if (nodeLabel == null) {

View File

@ -186,6 +186,8 @@ private static void updateUsedCapacity(final ResourceCalculator rc,
String nodePartition) {
float absoluteUsedCapacity = 0.0f;
float usedCapacity = 0.0f;
float reservedCapacity = 0.0f;
float absoluteReservedCapacity = 0.0f;
if (Resources.greaterThan(rc, totalPartitionResource,
totalPartitionResource, Resources.none())) {
@ -207,11 +209,22 @@ private static void updateUsedCapacity(final ResourceCalculator rc,
usedCapacity =
Resources.divide(rc, totalPartitionResource, usedResource,
queueGuranteedResource);
Resource resResource = queueResourceUsage.getReserved(nodePartition);
reservedCapacity =
Resources.divide(rc, totalPartitionResource, resResource,
queueGuranteedResource);
absoluteReservedCapacity =
Resources.divide(rc, totalPartitionResource, resResource,
totalPartitionResource);
}
queueCapacities
.setAbsoluteUsedCapacity(nodePartition, absoluteUsedCapacity);
queueCapacities.setUsedCapacity(nodePartition, usedCapacity);
queueCapacities.setReservedCapacity(nodePartition, reservedCapacity);
queueCapacities
.setAbsoluteReservedCapacity(nodePartition, absoluteReservedCapacity);
}
private static Resource getNonPartitionedMaxAvailableResourceToQueue(

View File

@ -963,6 +963,13 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
node.getPartition(), reservedOrAllocatedRMContainer,
assignment.isIncreasedAllocation());
// Update reserved metrics
Resource reservedRes = assignment.getAssignmentInformation()
.getReserved();
if (reservedRes != null && !reservedRes.equals(Resources.none())) {
incReservedResource(node.getPartition(), reservedRes);
}
// Done
return assignment;
} else if (assignment.getSkipped()) {
@ -1316,6 +1323,13 @@ public void completedContainer(Resource clusterResource,
// Book-keeping
if (removed) {
// track reserved resource for metrics, for normal container
// getReservedResource will be null.
Resource reservedRes = rmContainer.getReservedResource();
if (reservedRes != null && !reservedRes.equals(Resources.none())) {
decReservedResource(node.getPartition(), reservedRes);
}
// Inform the ordering policy
orderingPolicy.containerReleased(application, rmContainer);

View File

@ -50,7 +50,7 @@ public QueueCapacities(boolean isRoot) {
// Usage enum here to make implement cleaner
private enum CapacityType {
USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), ABS_CAP(5),
MAX_AM_PERC(6);
MAX_AM_PERC(6), RESERVED_CAP(7), ABS_RESERVED_CAP(8);
private int idx;
@ -76,6 +76,8 @@ public String toString() {
sb.append("cap=" + capacitiesArr[4] + "%, ");
sb.append("abs_cap=" + capacitiesArr[5] + "%}");
sb.append("max_am_perc=" + capacitiesArr[6] + "%}");
sb.append("reserved_cap=" + capacitiesArr[7] + "%}");
sb.append("abs_reserved_cap=" + capacitiesArr[8] + "%}");
return sb.toString();
}
}
@ -234,6 +236,40 @@ public void setMaxAMResourcePercentage(float value) {
_set(NL, CapacityType.MAX_AM_PERC, value);
}
/* Reserved Capacity Getter and Setter */
public float getReservedCapacity() {
return _get(NL, CapacityType.RESERVED_CAP);
}
public float getReservedCapacity(String label) {
return _get(label, CapacityType.RESERVED_CAP);
}
public void setReservedCapacity(float value) {
_set(NL, CapacityType.RESERVED_CAP, value);
}
public void setReservedCapacity(String label, float value) {
_set(label, CapacityType.RESERVED_CAP, value);
}
/* Absolute Reserved Capacity Getter and Setter */
public float getAbsoluteReservedCapacity() {
return _get(NL, CapacityType.ABS_RESERVED_CAP);
}
public float getAbsoluteReservedCapacity(String label) {
return _get(label, CapacityType.ABS_RESERVED_CAP);
}
public void setAbsoluteReservedCapacity(float value) {
_set(NL, CapacityType.ABS_RESERVED_CAP, value);
}
public void setAbsoluteReservedCapacity(String label, float value) {
_set(label, CapacityType.ABS_RESERVED_CAP, value);
}
/**
* Clear configurable fields, like
* (absolute)capacity/(absolute)maximum-capacity, this will be used by queue

View File

@ -340,6 +340,14 @@ public void incPendingResource(String nodeLabel, Resource resourceToInc) {
public void decPendingResource(String nodeLabel, Resource resourceToDec) {
}
@Override
public void incReservedResource(String nodeLabel, Resource resourceToInc) {
}
@Override
public void decReservedResource(String nodeLabel, Resource resourceToDec) {
}
@Override
public Priority getDefaultApplicationPriority() {
// TODO add implementation for FSParentQueue

View File

@ -220,6 +220,18 @@ public Priority getDefaultApplicationPriority() {
// TODO add implementation for FIFO scheduler
return null;
}
@Override
public void incReservedResource(String partition, Resource reservedRes) {
// TODO add implementation for FIFO scheduler
}
@Override
public void decReservedResource(String partition, Resource reservedRes) {
// TODO add implementation for FIFO scheduler
}
};
public FifoScheduler() {

View File

@ -367,6 +367,7 @@ public void testExcessReservationWillBeUnreserved() throws Exception {
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
// Do node heartbeats 2 times
// First time will allocate container for app1, second time will reserve
@ -393,6 +394,10 @@ public void testExcessReservationWillBeUnreserved() throws Exception {
// Usage of queue = 4G + 2 * 1G + 4G (reserved)
Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed().getMemory());
Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved().getMemory());
Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
.getMemory());
// Cancel asks of app2 and re-kick RM
am2.allocate("*", 4 * GB, 0, new ArrayList<ContainerId>());
@ -405,6 +410,10 @@ public void testExcessReservationWillBeUnreserved() throws Exception {
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertEquals(6 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed().getMemory());
Assert.assertEquals(0, cs.getRootQueue().getQueueResourceUsage()
.getReserved().getMemory());
Assert.assertEquals(0, leafQueue.getQueueResourceUsage().getReserved()
.getMemory());
rm1.close();
}

View File

@ -463,6 +463,83 @@ public RMNodeLabelsManager createNodeLabelManager() {
rm1.close();
}
@Test (timeout = 120000)
public void testContainerReservationWithLabels() throws Exception {
// This test is pretty much similar to testContainerAllocateWithLabel.
// Difference is, this test doesn't specify label expression in
// ResourceRequest,
// instead, it uses default queue label expression
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y",
"z"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
NodeId.newInstance("h3", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(
TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
rm1.registerNode("h2:1234", 8 * GB); // label = y
rm1.registerNode("h3:1234", 8 * GB); // label = x
ContainerId containerId;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// request a container.
am1.allocate("*", 4 * GB, 2, new ArrayList<ContainerId>());
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
// Do node heartbeats 2 times
// First time will allocate container for app1, second time will reserve
// container for app1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h1");
// Check if a 4G container allocated for app1, and 4G is reserved
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1
.getApplicationAttemptId());
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertTrue(schedulerApp1.getReservedContainers().size() > 0);
Assert.assertEquals(9 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemory());
Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemory());
Assert.assertEquals(4 * GB,
leafQueue.getQueueResourceUsage().getReserved("x").getMemory());
// Cancel asks of app2 and re-kick RM
am1.allocate("*", 4 * GB, 0, new ArrayList<ContainerId>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertEquals(5 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemory());
Assert.assertEquals(0, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemory());
Assert.assertEquals(0, leafQueue.getQueueResourceUsage().getReserved("x")
.getMemory());
rm1.close();
}
private void checkPendingResource(MockRM rm, int priority,
ApplicationAttemptId attemptId, int memory) {
CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();

View File

@ -44,7 +44,9 @@ public static Collection<String[]> getParameters() {
{ "AbsoluteUsedCapacity" },
{ "MaximumCapacity" },
{ "AbsoluteMaximumCapacity" },
{ "MaxAMResourcePercentage" } });
{ "MaxAMResourcePercentage" },
{ "ReservedCapacity" },
{ "AbsoluteReservedCapacity" }});
}
public TestQueueCapacities(String suffix) {