YARN-5555. Scheduler UI: "% of Queue" is inaccurate if leaf queue is hierarchically nested. Contributed by Eric Payne.

This commit is contained in:
Varun Vasudev 2016-09-02 16:02:01 +05:30
parent 0690f0969e
commit 05f5c0f631
2 changed files with 114 additions and 0 deletions

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -57,6 +58,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
@ -617,4 +619,29 @@ public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) {
updateAMContainerDiagnostics(AMState.ACTIVATED, diagnosticMessageBldr.toString());
}
}
/**
* Recalculates the per-app, percent of queue metric, specific to the
* Capacity Scheduler.
*/
@Override
public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
ApplicationResourceUsageReport report = super.getResourceUsageReport();
Resource cluster = rmContext.getScheduler().getClusterResource();
Resource totalPartitionRes =
rmContext.getNodeLabelManager()
.getResourceByLabel(getAppAMNodePartitionName(), cluster);
ResourceCalculator calc = rmContext.getScheduler().getResourceCalculator();
if (!calc.isInvalidDivisor(totalPartitionRes)) {
float queueAbsMaxCapPerPartition =
((AbstractCSQueue)getQueue()).getQueueCapacities()
.getAbsoluteCapacity(getAppAMNodePartitionName());
float queueUsagePerc =
calc.divide(totalPartitionRes, report.getUsedResources(),
Resources.multiply(totalPartitionRes,
queueAbsMaxCapPerPartition)) * 100;
report.setQueueUsagePercentage(queueUsagePerc);
}
return report;
}
}

View File

@ -51,6 +51,8 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -67,6 +69,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
@ -3312,6 +3315,90 @@ private CapacitySchedulerContext mockCSContext(
return csContext;
}
@Test
public void testApplicationQueuePercent()
throws Exception {
Resource res = Resource.newInstance(10 * 1024, 10);
CapacityScheduler scheduler = mock(CapacityScheduler.class);
when(scheduler.getClusterResource()).thenReturn(res);
when(scheduler.getResourceCalculator())
.thenReturn(new DefaultResourceCalculator());
ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L);
when(rmContext.getScheduler()).thenReturn(scheduler);
when(rmContext.getRMApps())
.thenReturn(new ConcurrentHashMap<ApplicationId, RMApp>());
RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class);
when(nlm.getResourceByLabel(any(), any())).thenReturn(res);
when(rmContext.getNodeLabelManager()).thenReturn(nlm);
// Queue "test" consumes 100% of the cluster, so its capacity and absolute
// capacity are both 1.0f.
Queue queue = createQueue("test", null, 1.0f, 1.0f);
final String user = "user1";
FiCaSchedulerApp app =
new FiCaSchedulerApp(appAttId, user, queue,
queue.getActiveUsersManager(), rmContext);
// Resource request
Resource requestedResource = Resource.newInstance(1536, 2);
app.getAppAttemptResourceUsage().incUsed(requestedResource);
// In "test" queue, 1536 used is 15% of both the queue and the cluster
assertEquals(15.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
0.01f);
assertEquals(15.0f,
app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f);
// Queue "test2" is a child of root and its capacity is 50% of root. As a
// child of root, its absolute capaicty is also 50%.
queue = createQueue("test2", null, 0.5f, 0.5f);
app = new FiCaSchedulerApp(appAttId, user, queue,
queue.getActiveUsersManager(), rmContext);
app.getAppAttemptResourceUsage().incUsed(requestedResource);
// In "test2" queue, 1536 used is 30% of "test2" and 15% of the cluster.
assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
0.01f);
assertEquals(15.0f,
app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f);
// Queue "test2.1" is 50% of queue "test2", which is 50% of the cluster.
// Therefore, "test2.1" capacity is 50% and absolute capacity is 25%.
AbstractCSQueue qChild = createQueue("test2.1", queue, 0.5f, 0.25f);
app = new FiCaSchedulerApp(appAttId, user, qChild,
qChild.getActiveUsersManager(), rmContext);
app.getAppAttemptResourceUsage().incUsed(requestedResource);
// In "test2.1" queue, 1536 used is 60% of "test2.1" and 15% of the cluster.
assertEquals(60.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
0.01f);
assertEquals(15.0f,
app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f);
}
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
return attId;
}
private AbstractCSQueue createQueue(String name, Queue parent, float capacity,
float absCap) {
CSQueueMetrics metrics = CSQueueMetrics.forQueue(name, parent, false, cs.getConf());
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
null, QueueState.RUNNING, null, "", null, false);
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
AbstractCSQueue queue = mock(AbstractCSQueue.class);
when(queue.getMetrics()).thenReturn(metrics);
when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
when(queue.getQueueInfo(false, false)).thenReturn(queueInfo);
QueueCapacities qCaps = mock(QueueCapacities.class);
when(qCaps.getAbsoluteCapacity(any())).thenReturn(absCap);
when(queue.getQueueCapacities()).thenReturn(qCaps);
return queue;
}
@After
public void tearDown() throws Exception {
if (cs != null) {