YARN-4484. Available Resource calculation for a queue is not correct when used with labels. (Sunil G via wangda)

This commit is contained in:
Wangda Tan 2016-07-15 11:40:12 -07:00
parent 1d053836b1
commit 24db9167f1
2 changed files with 242 additions and 25 deletions

View File

@ -227,24 +227,34 @@ public static void updateUsedCapacity(final ResourceCalculator rc,
.setAbsoluteReservedCapacity(nodePartition, absoluteReservedCapacity);
}
private static Resource getNonPartitionedMaxAvailableResourceToQueue(
final ResourceCalculator rc, Resource totalNonPartitionedResource,
CSQueue queue) {
Resource queueLimit = Resources.none();
Resource usedResources = queue.getUsedResources();
private static Resource getMaxAvailableResourceToQueue(
final ResourceCalculator rc, RMNodeLabelsManager nlm, CSQueue queue,
Resource cluster) {
Set<String> nodeLabels = queue.getNodeLabelsForQueue();
Resource totalAvailableResource = Resources.createResource(0, 0);
if (Resources.greaterThan(rc, totalNonPartitionedResource,
totalNonPartitionedResource, Resources.none())) {
queueLimit =
Resources.multiply(totalNonPartitionedResource,
queue.getAbsoluteCapacity());
for (String partition : nodeLabels) {
// Calculate guaranteed resource for a label in a queue by below logic.
// (total label resource) * (absolute capacity of label in that queue)
Resource queueGuranteedResource = Resources.multiply(nlm
.getResourceByLabel(partition, cluster), queue.getQueueCapacities()
.getAbsoluteCapacity(partition));
// Available resource in queue for a specific label will be calculated as
// {(guaranteed resource for a label in a queue) -
// (resource usage of that label in the queue)}
// Finally accumulate this available resource to get total.
Resource available = (Resources.greaterThan(rc, cluster,
queueGuranteedResource,
queue.getQueueResourceUsage().getUsed(partition))) ? Resources
.componentwiseMax(Resources.subtractFrom(queueGuranteedResource,
queue.getQueueResourceUsage().getUsed(partition)), Resources
.none()) : Resources.none();
Resources.addTo(totalAvailableResource, available);
}
Resource available = Resources.subtract(queueLimit, usedResources);
return Resources.max(rc, totalNonPartitionedResource, available,
Resources.none());
return totalAvailableResource;
}
/**
* <p>
* Update Queue Statistics:
@ -277,15 +287,10 @@ public static void updateQueueStatistics(
updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
minimumAllocation, queueResourceUsage, queueCapacities, nodePartition);
}
// Now in QueueMetrics, we only store available-resource-to-queue for
// default partition.
if (nodePartition == null
|| nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
childQueue.getMetrics().setAvailableResourcesToQueue(
getNonPartitionedMaxAvailableResourceToQueue(rc,
nlm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, cluster),
childQueue));
}
// Update queue metrics w.r.t node labels. In a generic way, we can
// calculate available resource from all labels in cluster.
childQueue.getMetrics().setAvailableResourcesToQueue(
getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster));
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -47,6 +49,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -1863,4 +1866,213 @@ public RMNodeLabelsManager createNodeLabelManager() {
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
}
@Test
public void testQueueMetricsWithLabels() throws Exception {
/**
* Test case: have a following queue structure:
*
* <pre>
* root
* / \
* a b
* (x) (x)
* </pre>
*
* a/b can access x, both of them has max-capacity-on-x = 50
*
* When doing non-exclusive allocation, app in a (or b) can use 100% of x
* resource.
*/
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b" });
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(queueA, 25);
csConf.setAccessibleNodeLabels(queueA, toSet("x"));
csConf.setCapacityByLabel(queueA, "x", 50);
csConf.setMaximumCapacityByLabel(queueA, "x", 50);
final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(queueB, 75);
csConf.setAccessibleNodeLabels(queueB, toSet("x"));
csConf.setCapacityByLabel(queueB, "x", 50);
csConf.setMaximumCapacityByLabel(queueB, "x", 50);
// set node -> label
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("x", false)));
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("y", false)));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y
// app1 -> a
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// app1 asks for 5 partition=x containers
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
// NM1 do 50 heartbeats
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// app1 gets all resource in partition=x
Assert.assertEquals(5, schedulerNode1.getNumContainers());
SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
.getNodeReport(nm1.getNodeId());
Assert.assertEquals(5 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(5 * GB,
reportNm1.getAvailableResource().getMemorySize());
SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
.getNodeReport(nm2.getNodeId());
Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
Assert.assertEquals(10 * GB,
reportNm2.getAvailableResource().getMemorySize());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
assertEquals(0 * GB, leafQueue.getMetrics().getAvailableMB());
assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB());
rm1.close();
}
@Test
public void testQueueMetricsWithLabelsOnDefaultLabelNode() throws Exception {
/**
* Test case: have a following queue structure:
*
* <pre>
* root
* / \
* a b
* (x) (x)
* </pre>
*
* a/b can access x, both of them has max-capacity-on-x = 50
*
* When doing non-exclusive allocation, app in a (or b) can use 100% of x
* resource.
*/
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b" });
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(queueA, 25);
csConf.setAccessibleNodeLabels(queueA, toSet("x"));
csConf.setCapacityByLabel(queueA, "x", 50);
csConf.setMaximumCapacityByLabel(queueA, "x", 50);
final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(queueB, 75);
csConf.setAccessibleNodeLabels(queueB, toSet("x"));
csConf.setCapacityByLabel(queueB, "x", 50);
csConf.setMaximumCapacityByLabel(queueB, "x", 50);
// set node -> label
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("x", false)));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <no_label>
// app1 -> a
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
// app1 asks for 3 partition= containers
am1.allocate("*", 1 * GB, 3, new ArrayList<ContainerId>());
// NM1 do 50 heartbeats
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// app1 gets all resource in partition=x (non-exclusive)
Assert.assertEquals(3, schedulerNode1.getNumContainers());
SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
.getNodeReport(nm1.getNodeId());
Assert.assertEquals(3 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(7 * GB,
reportNm1.getAvailableResource().getMemorySize());
SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
.getNodeReport(nm2.getNodeId());
Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize());
Assert.assertEquals(9 * GB,
reportNm2.getAvailableResource().getMemorySize());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
double delta = 0.0001;
// 3GB is used from label x quota. 1.5 GB is remaining from default label.
// 2GB is remaining from label x.
assertEquals(3.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
assertEquals(4 * GB, leafQueue.getMetrics().getAllocatedMB());
// app1 asks for 1 default partition container
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
// NM2 do couple of heartbeats
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// app1 gets all resource in default partition
Assert.assertEquals(2, schedulerNode2.getNumContainers());
// 3GB is used from label x quota. 2GB used from default label.
// So total 2.5 GB is remaining.
assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB());
rm1.close();
}
}