YARN-4304. AM max resource configuration per partition to be displayed/updated correctly in UI and in various partition related metrics. (Sunil G via wangda)
This commit is contained in:
parent
805a9ed85e
commit
b08ecf5c75
@ -706,6 +706,9 @@ Release 2.8.0 - UNRELEASED
|
||||
YARN-4582. Label-related invalid resource request exception should be able to
|
||||
properly handled by application. (Bibin A Chundatt via wangda)
|
||||
|
||||
YARN-4304. AM max resource configuration per partition to be displayed/updated
|
||||
correctly in UI and in various partition related metrics. (Sunil G via wangda)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@ -612,4 +613,27 @@ public Priority getDefaultApplicationPriority() {
|
||||
// TODO add dummy implementation
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getNodeLabelsForQueue() {
|
||||
// if queue's label is *, queue can access any labels. Instead of
|
||||
// considering all labels in cluster, only those labels which are
|
||||
// use some resource of this queue can be considered.
|
||||
Set<String> nodeLabels = new HashSet<String>();
|
||||
if (this.getAccessibleNodeLabels() != null
|
||||
&& this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
|
||||
nodeLabels.addAll(Sets.union(this.getQueueCapacities()
|
||||
.getNodePartitionsSet(), this.getQueueResourceUsage()
|
||||
.getNodePartitionsSet()));
|
||||
} else {
|
||||
nodeLabels.addAll(this.getAccessibleNodeLabels());
|
||||
}
|
||||
|
||||
// Add NO_LABEL also to this list as NO_LABEL also can be granted with
|
||||
// resource in many general cases.
|
||||
if (!nodeLabels.contains(RMNodeLabelsManager.NO_LABEL)) {
|
||||
nodeLabels.add(RMNodeLabelsManager.NO_LABEL);
|
||||
}
|
||||
return nodeLabels;
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
@ -332,4 +333,10 @@ public void decUsedResource(String nodePartition, Resource resourceToDec,
|
||||
public void decreaseContainer(Resource clusterResource,
|
||||
SchedContainerChangeRequest decreaseRequest,
|
||||
FiCaSchedulerApp app);
|
||||
|
||||
/**
|
||||
* Get valid Node Labels for this queue
|
||||
* @return valid node labels
|
||||
*/
|
||||
public Set<String> getNodeLabelsForQueue();
|
||||
}
|
||||
|
@ -528,10 +528,20 @@ public void submitApplication(ApplicationId applicationId, String userName,
|
||||
|
||||
}
|
||||
|
||||
public synchronized Resource getAMResourceLimit() {
|
||||
return getAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL);
|
||||
public Resource getAMResourceLimit() {
|
||||
return queueUsage.getAMLimit();
|
||||
}
|
||||
|
||||
public Resource getAMResourceLimitPerPartition(String nodePartition) {
|
||||
return queueUsage.getAMLimit(nodePartition);
|
||||
}
|
||||
|
||||
public synchronized Resource calculateAndGetAMResourceLimit() {
|
||||
return calculateAndGetAMResourceLimitPerPartition(
|
||||
RMNodeLabelsManager.NO_LABEL);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public synchronized Resource getUserAMResourceLimit() {
|
||||
return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL);
|
||||
}
|
||||
@ -552,13 +562,17 @@ public synchronized Resource getUserAMResourceLimitPerPartition(
|
||||
labelManager.getResourceByLabel(nodePartition, lastClusterResource),
|
||||
queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation);
|
||||
|
||||
return Resources.multiplyAndNormalizeUp(resourceCalculator,
|
||||
Resource userAMLimit = Resources.multiplyAndNormalizeUp(resourceCalculator,
|
||||
queuePartitionResource,
|
||||
queueCapacities.getMaxAMResourcePercentage(nodePartition)
|
||||
* effectiveUserLimit * userLimitFactor, minimumAllocation);
|
||||
return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
|
||||
userAMLimit, getAMResourceLimitPerPartition(nodePartition))
|
||||
? userAMLimit
|
||||
: getAMResourceLimitPerPartition(nodePartition);
|
||||
}
|
||||
|
||||
public synchronized Resource getAMResourceLimitPerPartition(
|
||||
public synchronized Resource calculateAndGetAMResourceLimitPerPartition(
|
||||
String nodePartition) {
|
||||
/*
|
||||
* For non-labeled partition, get the max value from resources currently
|
||||
@ -601,20 +615,26 @@ public synchronized Resource getAMResourceLimitPerPartition(
|
||||
|
||||
private synchronized void activateApplications() {
|
||||
// limit of allowed resource usage for application masters
|
||||
Map<String, Resource> amPartitionLimit = new HashMap<String, Resource>();
|
||||
Map<String, Resource> userAmPartitionLimit =
|
||||
new HashMap<String, Resource>();
|
||||
|
||||
// AM Resource Limit for accessible labels can be pre-calculated.
|
||||
// This will help in updating AMResourceLimit for all labels when queue
|
||||
// is initialized for the first time (when no applications are present).
|
||||
for (String nodePartition : getNodeLabelsForQueue()) {
|
||||
calculateAndGetAMResourceLimitPerPartition(nodePartition);
|
||||
}
|
||||
|
||||
activateApplications(getPendingAppsOrderingPolicyRecovery()
|
||||
.getAssignmentIterator(), amPartitionLimit, userAmPartitionLimit);
|
||||
.getAssignmentIterator(), userAmPartitionLimit);
|
||||
|
||||
activateApplications(
|
||||
getPendingAppsOrderingPolicy().getAssignmentIterator(),
|
||||
amPartitionLimit, userAmPartitionLimit);
|
||||
userAmPartitionLimit);
|
||||
}
|
||||
|
||||
private synchronized void activateApplications(
|
||||
Iterator<FiCaSchedulerApp> fsApp, Map<String, Resource> amPartitionLimit,
|
||||
Iterator<FiCaSchedulerApp> fsApp,
|
||||
Map<String, Resource> userAmPartitionLimit) {
|
||||
while (fsApp.hasNext()) {
|
||||
FiCaSchedulerApp application = fsApp.next();
|
||||
@ -624,11 +644,10 @@ private synchronized void activateApplications(
|
||||
// and calculate max-am resource limit for this partition.
|
||||
String partitionName = application.getAppAMNodePartitionName();
|
||||
|
||||
Resource amLimit = amPartitionLimit.get(partitionName);
|
||||
Resource amLimit = getAMResourceLimitPerPartition(partitionName);
|
||||
// Verify whether we already calculated am-limit for this label.
|
||||
if (amLimit == null) {
|
||||
amLimit = getAMResourceLimitPerPartition(partitionName);
|
||||
amPartitionLimit.put(partitionName, amLimit);
|
||||
amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName);
|
||||
}
|
||||
// Check am resource limit.
|
||||
Resource amIfStarted = Resources.add(
|
||||
@ -705,6 +724,7 @@ private synchronized void activateApplications(
|
||||
application.getAMResource(partitionName));
|
||||
user.getResourceUsage().incAMUsed(partitionName,
|
||||
application.getAMResource(partitionName));
|
||||
user.getResourceUsage().setAMLimit(partitionName, userAMLimit);
|
||||
metrics.incAMUsed(application.getUser(),
|
||||
application.getAMResource(partitionName));
|
||||
metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit);
|
||||
|
@ -25,7 +25,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceUsageInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourcesInfo;
|
||||
|
||||
@XmlRootElement
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
@ -36,7 +36,7 @@ public class UserInfo {
|
||||
protected int numActiveApplications;
|
||||
protected ResourceInfo AMResourceUsed;
|
||||
protected ResourceInfo userResourceLimit;
|
||||
protected ResourceUsageInfo resources;
|
||||
protected ResourcesInfo resources;
|
||||
|
||||
UserInfo() {}
|
||||
|
||||
@ -48,7 +48,7 @@ public class UserInfo {
|
||||
this.numPendingApplications = pendingApps;
|
||||
this.AMResourceUsed = new ResourceInfo(amResUsed);
|
||||
this.userResourceLimit = new ResourceInfo(resourceLimit);
|
||||
this.resources = new ResourceUsageInfo(resourceUsage);
|
||||
this.resources = new ResourcesInfo(resourceUsage);
|
||||
}
|
||||
|
||||
public String getUsername() {
|
||||
@ -75,7 +75,7 @@ public ResourceInfo getUserResourceLimit() {
|
||||
return userResourceLimit;
|
||||
}
|
||||
|
||||
public ResourceUsageInfo getResourceUsageInfo() {
|
||||
public ResourcesInfo getResourceUsageInfo() {
|
||||
return resources;
|
||||
}
|
||||
}
|
||||
|
@ -39,10 +39,11 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionQueueCapacitiesInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionResourceUsageInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionResourcesInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.webapp.ResponseInfo;
|
||||
import org.apache.hadoop.yarn.webapp.SubView;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||
@ -134,8 +135,23 @@ private void renderLeafQueueInfoWithoutParition(Block html) {
|
||||
private void renderQueueCapacityInfo(ResponseInfo ri, String label) {
|
||||
PartitionQueueCapacitiesInfo capacities =
|
||||
lqinfo.getCapacities().getPartitionQueueCapacitiesInfo(label);
|
||||
PartitionResourceUsageInfo resourceUsages =
|
||||
PartitionResourcesInfo resourceUsages =
|
||||
lqinfo.getResources().getPartitionResourceUsageInfo(label);
|
||||
|
||||
// Get UserInfo from first user to calculate AM Resource Limit per user.
|
||||
ResourceInfo userAMResourceLimit = null;
|
||||
ArrayList<UserInfo> usersList = lqinfo.getUsers().getUsersList();
|
||||
if (usersList.isEmpty()) {
|
||||
// If no users are present, consider AM Limit for that queue.
|
||||
userAMResourceLimit = resourceUsages.getAMLimit();
|
||||
} else {
|
||||
userAMResourceLimit = usersList.get(0)
|
||||
.getResourceUsageInfo().getPartitionResourceUsageInfo(label)
|
||||
.getAMLimit();
|
||||
}
|
||||
ResourceInfo amUsed = (resourceUsages.getAmUsed() == null)
|
||||
? new ResourceInfo(Resources.none())
|
||||
: resourceUsages.getAmUsed();
|
||||
ri.
|
||||
_("Used Capacity:", percent(capacities.getUsedCapacity() / 100)).
|
||||
_("Configured Capacity:", percent(capacities.getCapacity() / 100)).
|
||||
@ -143,7 +159,15 @@ private void renderQueueCapacityInfo(ResponseInfo ri, String label) {
|
||||
_("Absolute Used Capacity:", percent(capacities.getAbsoluteUsedCapacity() / 100)).
|
||||
_("Absolute Configured Capacity:", percent(capacities.getAbsoluteCapacity() / 100)).
|
||||
_("Absolute Configured Max Capacity:", percent(capacities.getAbsoluteMaxCapacity() / 100)).
|
||||
_("Used Resources:", resourceUsages.getUsed().toString());
|
||||
_("Used Resources:", resourceUsages.getUsed().toString()).
|
||||
_("Configured Max Application Master Limit:", StringUtils.format("%.1f",
|
||||
capacities.getMaxAMLimitPercentage())).
|
||||
_("Max Application Master Resources:",
|
||||
resourceUsages.getAMLimit().toString()).
|
||||
_("Used Application Master Resources:",
|
||||
amUsed.toString()).
|
||||
_("Max Application Master Resources Per User:",
|
||||
userAMResourceLimit.toString());
|
||||
}
|
||||
|
||||
private void renderCommonLeafQueueInfo(ResponseInfo ri) {
|
||||
@ -153,9 +177,6 @@ private void renderCommonLeafQueueInfo(ResponseInfo ri) {
|
||||
_("Num Containers:", Integer.toString(lqinfo.getNumContainers())).
|
||||
_("Max Applications:", Integer.toString(lqinfo.getMaxApplications())).
|
||||
_("Max Applications Per User:", Integer.toString(lqinfo.getMaxApplicationsPerUser())).
|
||||
_("Max Application Master Resources:", lqinfo.getAMResourceLimit().toString()).
|
||||
_("Used Application Master Resources:", lqinfo.getUsedAMResource().toString()).
|
||||
_("Max Application Master Resources Per User:", lqinfo.getUserAMResourceLimit().toString()).
|
||||
_("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").
|
||||
_("Configured User Limit Factor:", lqinfo.getUserLimitFactor()).
|
||||
_("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())).
|
||||
@ -197,15 +218,21 @@ protected void render(Block html) {
|
||||
ArrayList<UserInfo> users = lqinfo.getUsers().getUsersList();
|
||||
for (UserInfo userInfo : users) {
|
||||
ResourceInfo resourcesUsed = userInfo.getResourcesUsed();
|
||||
PartitionResourcesInfo resourceUsages = lqinfo
|
||||
.getResources()
|
||||
.getPartitionResourceUsageInfo((nodeLabel == null) ? "" : nodeLabel);
|
||||
if (nodeLabel != null) {
|
||||
resourcesUsed = userInfo.getResourceUsageInfo()
|
||||
.getPartitionResourceUsageInfo(nodeLabel).getUsed();
|
||||
}
|
||||
ResourceInfo amUsed = (resourceUsages.getAmUsed() == null)
|
||||
? new ResourceInfo(Resources.none())
|
||||
: resourceUsages.getAmUsed();
|
||||
tbody.tr().td(userInfo.getUsername())
|
||||
.td(userInfo.getUserResourceLimit().toString())
|
||||
.td(resourcesUsed.toString())
|
||||
.td(lqinfo.getUserAMResourceLimit().toString())
|
||||
.td(userInfo.getAMResourcesUsed().toString())
|
||||
.td(resourceUsages.getAMLimit().toString())
|
||||
.td(amUsed.toString())
|
||||
.td(Integer.toString(userInfo.getNumActiveApplications()))
|
||||
.td(Integer.toString(userInfo.getNumPendingApplications()))._();
|
||||
}
|
||||
|
@ -56,7 +56,7 @@ public CapacitySchedulerInfo(CSQueue parent, CapacityScheduler cs) {
|
||||
max = 1f;
|
||||
this.maxCapacity = max * 100;
|
||||
|
||||
capacities = new QueueCapacitiesInfo(parent.getQueueCapacities());
|
||||
capacities = new QueueCapacitiesInfo(parent.getQueueCapacities(), false);
|
||||
queues = getQueues(parent);
|
||||
health = new CapacitySchedulerHealthInfo(cs);
|
||||
}
|
||||
|
@ -17,13 +17,18 @@
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
import javax.xml.bind.annotation.XmlTransient;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UserInfo;
|
||||
|
||||
@XmlRootElement
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
@ -62,16 +67,30 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
||||
userLimitFactor = q.getUserLimitFactor();
|
||||
AMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
|
||||
usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());
|
||||
userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit());
|
||||
preemptionDisabled = q.getPreemptionDisabled();
|
||||
orderingPolicyInfo = q.getOrderingPolicy().getInfo();
|
||||
defaultNodeLabelExpression = q.getDefaultNodeLabelExpression();
|
||||
defaultPriority = q.getDefaultApplicationPriority().getPriority();
|
||||
ArrayList<UserInfo> usersList = users.getUsersList();
|
||||
if (usersList.isEmpty()) {
|
||||
// If no users are present, consider AM Limit for that queue.
|
||||
userAMResourceLimit = resources.getPartitionResourceUsageInfo(
|
||||
RMNodeLabelsManager.NO_LABEL).getAMLimit();
|
||||
} else {
|
||||
userAMResourceLimit = usersList.get(0).getResourceUsageInfo()
|
||||
.getPartitionResourceUsageInfo(RMNodeLabelsManager.NO_LABEL)
|
||||
.getAMLimit();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void populateQueueResourceUsage(ResourceUsage queueResourceUsage) {
|
||||
resources = new ResourceUsageInfo(queueResourceUsage);
|
||||
resources = new ResourcesInfo(queueResourceUsage);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void populateQueueCapacities(QueueCapacities qCapacities) {
|
||||
capacities = new QueueCapacitiesInfo(qCapacities);
|
||||
}
|
||||
|
||||
public int getNumActiveApplications() {
|
||||
|
@ -61,7 +61,7 @@ public class CapacitySchedulerQueueInfo {
|
||||
protected long reservedContainers;
|
||||
protected long pendingContainers;
|
||||
protected QueueCapacitiesInfo capacities;
|
||||
protected ResourceUsageInfo resources;
|
||||
protected ResourcesInfo resources;
|
||||
|
||||
CapacitySchedulerQueueInfo() {
|
||||
};
|
||||
@ -101,14 +101,18 @@ public class CapacitySchedulerQueueInfo {
|
||||
Collections.sort(nodeLabels);
|
||||
}
|
||||
QueueCapacities qCapacities = q.getQueueCapacities();
|
||||
capacities = new QueueCapacitiesInfo(qCapacities);
|
||||
populateQueueCapacities(qCapacities);
|
||||
|
||||
ResourceUsage queueResourceUsage = q.getQueueResourceUsage();
|
||||
populateQueueResourceUsage(queueResourceUsage);
|
||||
}
|
||||
|
||||
protected void populateQueueResourceUsage(ResourceUsage queueResourceUsage) {
|
||||
resources = new ResourceUsageInfo(queueResourceUsage, false);
|
||||
resources = new ResourcesInfo(queueResourceUsage, false);
|
||||
}
|
||||
|
||||
protected void populateQueueCapacities(QueueCapacities qCapacities) {
|
||||
capacities = new QueueCapacitiesInfo(qCapacities, false);
|
||||
}
|
||||
|
||||
public float getCapacity() {
|
||||
@ -193,7 +197,7 @@ public QueueCapacitiesInfo getCapacities() {
|
||||
return capacities;
|
||||
}
|
||||
|
||||
public ResourceUsageInfo getResources() {
|
||||
public ResourcesInfo getResources() {
|
||||
return resources;
|
||||
}
|
||||
}
|
||||
|
@ -35,13 +35,14 @@ public class PartitionQueueCapacitiesInfo {
|
||||
private float absoluteCapacity;
|
||||
private float absoluteUsedCapacity;
|
||||
private float absoluteMaxCapacity = 100;
|
||||
private Float maxAMLimitPercentage;
|
||||
|
||||
public PartitionQueueCapacitiesInfo() {
|
||||
}
|
||||
|
||||
public PartitionQueueCapacitiesInfo(String partitionName, float capacity,
|
||||
float usedCapacity, float maxCapacity, float absCapacity,
|
||||
float absUsedCapacity, float absMaxCapacity) {
|
||||
float absUsedCapacity, float absMaxCapacity, Float maxAMLimitPercentage) {
|
||||
super();
|
||||
this.partitionName = partitionName;
|
||||
this.capacity = capacity;
|
||||
@ -50,6 +51,7 @@ public PartitionQueueCapacitiesInfo(String partitionName, float capacity,
|
||||
this.absoluteCapacity = absCapacity;
|
||||
this.absoluteUsedCapacity = absUsedCapacity;
|
||||
this.absoluteMaxCapacity = absMaxCapacity;
|
||||
this.maxAMLimitPercentage = maxAMLimitPercentage;
|
||||
}
|
||||
|
||||
public float getCapacity() {
|
||||
@ -107,4 +109,12 @@ public float getAbsoluteMaxCapacity() {
|
||||
public void setAbsoluteMaxCapacity(float absoluteMaxCapacity) {
|
||||
this.absoluteMaxCapacity = absoluteMaxCapacity;
|
||||
}
|
||||
|
||||
public float getMaxAMLimitPercentage() {
|
||||
return maxAMLimitPercentage;
|
||||
}
|
||||
|
||||
public void setMaxAMLimitPercentage(float maxAMLimitPercentage) {
|
||||
this.maxAMLimitPercentage = maxAMLimitPercentage;
|
||||
}
|
||||
}
|
@ -26,25 +26,27 @@
|
||||
*/
|
||||
@XmlRootElement
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
public class PartitionResourceUsageInfo {
|
||||
public class PartitionResourcesInfo {
|
||||
private String partitionName;
|
||||
private ResourceInfo used = new ResourceInfo();
|
||||
private ResourceInfo reserved;
|
||||
private ResourceInfo pending;
|
||||
private ResourceInfo amUsed;
|
||||
private ResourceInfo amLimit = new ResourceInfo();
|
||||
|
||||
public PartitionResourceUsageInfo() {
|
||||
public PartitionResourcesInfo() {
|
||||
}
|
||||
|
||||
public PartitionResourceUsageInfo(String partitionName, ResourceInfo used,
|
||||
public PartitionResourcesInfo(String partitionName, ResourceInfo used,
|
||||
ResourceInfo reserved, ResourceInfo pending,
|
||||
ResourceInfo amResourceUsed) {
|
||||
ResourceInfo amResourceUsed, ResourceInfo amResourceLimit) {
|
||||
super();
|
||||
this.partitionName = partitionName;
|
||||
this.used = used;
|
||||
this.reserved = reserved;
|
||||
this.pending = pending;
|
||||
this.amUsed = amResourceUsed;
|
||||
this.amLimit = amResourceLimit;
|
||||
}
|
||||
|
||||
public String getPartitionName() {
|
||||
@ -86,4 +88,12 @@ public ResourceInfo getAmUsed() {
|
||||
public void setAmUsed(ResourceInfo amResourceUsed) {
|
||||
this.amUsed = amResourceUsed;
|
||||
}
|
||||
|
||||
public ResourceInfo getAMLimit() {
|
||||
return amLimit;
|
||||
}
|
||||
|
||||
public void setAMLimit(ResourceInfo amLimit) {
|
||||
this.amLimit = amLimit;
|
||||
}
|
||||
}
|
@ -38,7 +38,8 @@ public class QueueCapacitiesInfo {
|
||||
public QueueCapacitiesInfo() {
|
||||
}
|
||||
|
||||
public QueueCapacitiesInfo(QueueCapacities capacities) {
|
||||
public QueueCapacitiesInfo(QueueCapacities capacities,
|
||||
boolean considerAMUsage) {
|
||||
if (capacities == null) {
|
||||
return;
|
||||
}
|
||||
@ -48,6 +49,7 @@ public QueueCapacitiesInfo(QueueCapacities capacities) {
|
||||
float absCapacity;
|
||||
float absUsedCapacity;
|
||||
float absMaxCapacity;
|
||||
Float maxAMLimitPercentage;
|
||||
for (String partitionName : capacities.getExistingNodeLabels()) {
|
||||
usedCapacity = capacities.getUsedCapacity(partitionName) * 100;
|
||||
capacity = capacities.getCapacity(partitionName) * 100;
|
||||
@ -58,15 +60,22 @@ public QueueCapacitiesInfo(QueueCapacities capacities) {
|
||||
.cap(capacities.getAbsoluteUsedCapacity(partitionName), 0f, 1f) * 100;
|
||||
absMaxCapacity = CapacitySchedulerQueueInfo.cap(
|
||||
capacities.getAbsoluteMaximumCapacity(partitionName), 0f, 1f) * 100;
|
||||
maxAMLimitPercentage = capacities
|
||||
.getMaxAMResourcePercentage(partitionName) * 100;
|
||||
if (maxCapacity < CapacitySchedulerQueueInfo.EPSILON || maxCapacity > 1f)
|
||||
maxCapacity = 1f;
|
||||
maxCapacity = maxCapacity * 100;
|
||||
queueCapacitiesByPartition.add(
|
||||
new PartitionQueueCapacitiesInfo(partitionName, capacity, usedCapacity,
|
||||
maxCapacity, absCapacity, absUsedCapacity, absMaxCapacity));
|
||||
queueCapacitiesByPartition.add(new PartitionQueueCapacitiesInfo(
|
||||
partitionName, capacity, usedCapacity, maxCapacity, absCapacity,
|
||||
absUsedCapacity, absMaxCapacity,
|
||||
considerAMUsage ? maxAMLimitPercentage : null));
|
||||
}
|
||||
}
|
||||
|
||||
public QueueCapacitiesInfo(QueueCapacities capacities) {
|
||||
this(capacities, true);
|
||||
}
|
||||
|
||||
public void add(PartitionQueueCapacitiesInfo partitionQueueCapacitiesInfo) {
|
||||
queueCapacitiesByPartition.add(partitionQueueCapacitiesInfo);
|
||||
}
|
||||
|
@ -31,49 +31,51 @@
|
||||
*/
|
||||
@XmlRootElement
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
public class ResourceUsageInfo {
|
||||
List<PartitionResourceUsageInfo> resourceUsagesByPartition =
|
||||
public class ResourcesInfo {
|
||||
List<PartitionResourcesInfo> resourceUsagesByPartition =
|
||||
new ArrayList<>();
|
||||
|
||||
public ResourceUsageInfo() {
|
||||
public ResourcesInfo() {
|
||||
}
|
||||
|
||||
public ResourceUsageInfo(ResourceUsage resourceUsage,
|
||||
public ResourcesInfo(ResourceUsage resourceUsage,
|
||||
boolean considerAMUsage) {
|
||||
if (resourceUsage == null) {
|
||||
return;
|
||||
}
|
||||
for (String partitionName : resourceUsage.getNodePartitionsSet()) {
|
||||
resourceUsagesByPartition.add(new PartitionResourceUsageInfo(
|
||||
partitionName, new ResourceInfo(resourceUsage.getUsed(partitionName)),
|
||||
resourceUsagesByPartition.add(new PartitionResourcesInfo(partitionName,
|
||||
new ResourceInfo(resourceUsage.getUsed(partitionName)),
|
||||
new ResourceInfo(resourceUsage.getReserved(partitionName)),
|
||||
new ResourceInfo(resourceUsage.getPending(partitionName)),
|
||||
considerAMUsage
|
||||
? new ResourceInfo(resourceUsage.getAMUsed(partitionName))
|
||||
: null));
|
||||
considerAMUsage ? new ResourceInfo(resourceUsage
|
||||
.getAMUsed(partitionName)) : null,
|
||||
considerAMUsage ? new ResourceInfo(resourceUsage
|
||||
.getAMLimit(partitionName)) : null));
|
||||
}
|
||||
}
|
||||
|
||||
public ResourceUsageInfo(ResourceUsage resourceUsage) {
|
||||
public ResourcesInfo(ResourceUsage resourceUsage) {
|
||||
this(resourceUsage, true);
|
||||
}
|
||||
|
||||
public List<PartitionResourceUsageInfo> getPartitionResourceUsages() {
|
||||
public List<PartitionResourcesInfo> getPartitionResourceUsages() {
|
||||
return resourceUsagesByPartition;
|
||||
}
|
||||
|
||||
public void setPartitionResourceUsages(
|
||||
List<PartitionResourceUsageInfo> resources) {
|
||||
List<PartitionResourcesInfo> resources) {
|
||||
this.resourceUsagesByPartition = resources;
|
||||
}
|
||||
|
||||
public PartitionResourceUsageInfo getPartitionResourceUsageInfo(
|
||||
public PartitionResourcesInfo getPartitionResourceUsageInfo(
|
||||
String partitionName) {
|
||||
for (PartitionResourceUsageInfo partitionResourceUsageInfo : resourceUsagesByPartition) {
|
||||
for (PartitionResourcesInfo partitionResourceUsageInfo :
|
||||
resourceUsagesByPartition) {
|
||||
if (partitionResourceUsageInfo.getPartitionName().equals(partitionName)) {
|
||||
return partitionResourceUsageInfo;
|
||||
}
|
||||
}
|
||||
return new PartitionResourceUsageInfo();
|
||||
return new PartitionResourcesInfo();
|
||||
}
|
||||
}
|
@ -181,7 +181,8 @@ public void testAMResourceLimit() throws Exception {
|
||||
ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class);
|
||||
when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
|
||||
|
||||
assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit());
|
||||
assertEquals(Resource.newInstance(8 * GB, 1),
|
||||
queue.calculateAndGetAMResourceLimit());
|
||||
assertEquals(Resource.newInstance(4 * GB, 1),
|
||||
queue.getUserAMResourceLimit());
|
||||
|
||||
@ -282,8 +283,7 @@ public void testLimitsComputation() throws Exception {
|
||||
queue.getUserAMResourceLimit());
|
||||
|
||||
Resource amResourceLimit = Resource.newInstance(160 * GB, 1);
|
||||
assertEquals(queue.getAMResourceLimit(), amResourceLimit);
|
||||
assertEquals(queue.getAMResourceLimit(), amResourceLimit);
|
||||
assertEquals(queue.calculateAndGetAMResourceLimit(), amResourceLimit);
|
||||
assertEquals(queue.getUserAMResourceLimit(),
|
||||
Resource.newInstance(80*GB, 1));
|
||||
|
||||
@ -303,7 +303,8 @@ public void testLimitsComputation() throws Exception {
|
||||
root.updateClusterResource(clusterResource, new ResourceLimits(
|
||||
clusterResource));
|
||||
|
||||
assertEquals(queue.getAMResourceLimit(), Resource.newInstance(192*GB, 1));
|
||||
assertEquals(queue.calculateAndGetAMResourceLimit(),
|
||||
Resource.newInstance(192 * GB, 1));
|
||||
assertEquals(queue.getUserAMResourceLimit(),
|
||||
Resource.newInstance(96*GB, 1));
|
||||
|
||||
@ -353,7 +354,8 @@ public void testLimitsComputation() throws Exception {
|
||||
queue.getQueuePath())
|
||||
);
|
||||
|
||||
assertEquals(queue.getAMResourceLimit(), Resource.newInstance(800*GB, 1));
|
||||
assertEquals(queue.calculateAndGetAMResourceLimit(),
|
||||
Resource.newInstance(800 * GB, 1));
|
||||
assertEquals(queue.getUserAMResourceLimit(),
|
||||
Resource.newInstance(400*GB, 1));
|
||||
|
||||
@ -383,7 +385,8 @@ public void testActiveApplicationLimits() throws Exception {
|
||||
final String user_1 = "user_1";
|
||||
final String user_2 = "user_2";
|
||||
|
||||
assertEquals(Resource.newInstance(16 * GB, 1), queue.getAMResourceLimit());
|
||||
assertEquals(Resource.newInstance(16 * GB, 1),
|
||||
queue.calculateAndGetAMResourceLimit());
|
||||
assertEquals(Resource.newInstance(8 * GB, 1),
|
||||
queue.getUserAMResourceLimit());
|
||||
|
||||
|
@ -263,18 +263,18 @@ public void testInitializeQueue() throws Exception {
|
||||
assertEquals(0.1, c.getMaximumCapacity(), epsilon);
|
||||
assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon);
|
||||
|
||||
//Verify the value for getAMResourceLimit for queues with < .1 maxcap
|
||||
Resource clusterResource = Resource.newInstance(50 * GB, 50);
|
||||
|
||||
// Verify the value for getAMResourceLimit for queues with < .1 maxcap
|
||||
Resource clusterResource = Resource.newInstance(50 * GB, 50);
|
||||
|
||||
a.updateClusterResource(clusterResource,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(Resource.newInstance(1 * GB, 1),
|
||||
a.getAMResourceLimit());
|
||||
|
||||
b.updateClusterResource(clusterResource,
|
||||
assertEquals(Resource.newInstance(1 * GB, 1),
|
||||
a.calculateAndGetAMResourceLimit());
|
||||
|
||||
b.updateClusterResource(clusterResource,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(Resource.newInstance(5 * GB, 1),
|
||||
b.getAMResourceLimit());
|
||||
assertEquals(Resource.newInstance(5 * GB, 1),
|
||||
b.calculateAndGetAMResourceLimit());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -2323,21 +2323,24 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh()
|
||||
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 80);
|
||||
LeafQueue a = new LeafQueue(csContext, A, root, null);
|
||||
assertEquals(0.1f, a.getMaxAMResourcePerQueuePercent(), 1e-3f);
|
||||
assertEquals(a.getAMResourceLimit(), Resources.createResource(160 * GB, 1));
|
||||
assertEquals(a.calculateAndGetAMResourceLimit(),
|
||||
Resources.createResource(160 * GB, 1));
|
||||
|
||||
csConf.setFloat(CapacitySchedulerConfiguration.
|
||||
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.2f);
|
||||
LeafQueue newA = new LeafQueue(csContext, A, root, null);
|
||||
a.reinitialize(newA, clusterResource);
|
||||
assertEquals(0.2f, a.getMaxAMResourcePerQueuePercent(), 1e-3f);
|
||||
assertEquals(a.getAMResourceLimit(), Resources.createResource(320 * GB, 1));
|
||||
assertEquals(a.calculateAndGetAMResourceLimit(),
|
||||
Resources.createResource(320 * GB, 1));
|
||||
|
||||
Resource newClusterResource = Resources.createResource(100 * 20 * GB,
|
||||
100 * 32);
|
||||
a.updateClusterResource(newClusterResource,
|
||||
new ResourceLimits(newClusterResource));
|
||||
// 100 * 20 * 0.2 = 400
|
||||
assertEquals(a.getAMResourceLimit(), Resources.createResource(400 * GB, 1));
|
||||
assertEquals(a.calculateAndGetAMResourceLimit(),
|
||||
Resources.createResource(400 * GB, 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -501,13 +501,13 @@ private void verifySchedulerInfoJson(JSONObject json)
|
||||
partitionInfo = partitionsCapsArray.getJSONObject(0);
|
||||
partitionName = partitionInfo.getString("partitionName");
|
||||
verifyPartitionCapacityInfoJson(partitionInfo, 30, 0, 50, 30, 0, 50);
|
||||
assertEquals("incorrect number of elements", 5,
|
||||
assertEquals("incorrect number of elements", 6,
|
||||
partitionsResourcesArray.getJSONObject(0).length());
|
||||
break;
|
||||
case QUEUE_B:
|
||||
assertEquals("Invalid default Label expression", LABEL_LX,
|
||||
queueJson.getString("defaultNodeLabelExpression"));
|
||||
assertEquals("incorrect number of elements", 5,
|
||||
assertEquals("incorrect number of elements", 6,
|
||||
partitionsResourcesArray.getJSONObject(0).length());
|
||||
verifyAccesibleNodeLabels(queueJson, ImmutableSet.of(LABEL_LX));
|
||||
assertEquals("incorrect number of partitions", 2,
|
||||
|
Loading…
Reference in New Issue
Block a user