diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java index 1406fec923..93f8ce2b98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java @@ -64,7 +64,7 @@ public class AbstractResourceUsage { // be written by ordering policies USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4), CACHED_PENDING( 5), AMLIMIT(6), MIN_RESOURCE(7), MAX_RESOURCE(8), EFF_MIN_RESOURCE( - 9), EFF_MAX_RESOURCE(10); + 9), EFF_MAX_RESOURCE(10), USERAMLIMIT(11); private int idx; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index 711a468232..37958de529 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -268,6 +268,22 @@ public class ResourceUsage extends AbstractResourceUsage { _set(label, ResourceType.AMLIMIT, res); } + public Resource getUserAMLimit() { + return getAMLimit(NL); + } + + public Resource getUserAMLimit(String label) { + return _get(label, ResourceType.USERAMLIMIT); + } + + public void setUserAMLimit(Resource res) { + setAMLimit(NL, res); + } + + public void setUserAMLimit(String label, Resource res) { + _set(label, ResourceType.USERAMLIMIT, res); + } + public Resource getCachedDemand(String label) { try { readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 329f7de09e..31032b3c90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -703,6 +703,7 @@ public class LeafQueue extends AbstractCSQueue { */ float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f, 1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1)); + float preWeightedUserLimit = effectiveUserLimit; effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f); Resource queuePartitionResource = getEffectiveCapacity(nodePartition); @@ -712,10 +713,28 @@ public class LeafQueue extends AbstractCSQueue { queueCapacities.getMaxAMResourcePercentage(nodePartition) * effectiveUserLimit * usersManager.getUserLimitFactor(), minimumAllocation); - return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, - userAMLimit, getAMResourceLimitPerPartition(nodePartition)) ? - userAMLimit : - getAMResourceLimitPerPartition(nodePartition); + userAMLimit = + Resources.min(resourceCalculator, lastClusterResource, + userAMLimit, + Resources.clone(getAMResourceLimitPerPartition(nodePartition))); + + Resource preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp( + resourceCalculator, queuePartitionResource, + queueCapacities.getMaxAMResourcePercentage(nodePartition) + * preWeightedUserLimit * usersManager.getUserLimitFactor(), + minimumAllocation); + preWeighteduserAMLimit = + Resources.min(resourceCalculator, lastClusterResource, + preWeighteduserAMLimit, + Resources.clone(getAMResourceLimitPerPartition(nodePartition))); + queueUsage.setUserAMLimit(nodePartition, preWeighteduserAMLimit); + + if (LOG.isDebugEnabled()) { + LOG.debug("Effective user AM limit for \"" + userName + "\":" + + preWeighteduserAMLimit + ". " + "Effective weighted user AM limit: " + + userAMLimit + ". User weight: " + userWeight); + } + return userAMLimit; } finally { readLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java index de85590b77..7f025a7976 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java @@ -144,13 +144,13 @@ class CapacitySchedulerPage extends RmView { // Get UserInfo from first user to calculate AM Resource Limit per user. ResourceInfo userAMResourceLimit = null; ArrayList usersList = lqinfo.getUsers().getUsersList(); - if (usersList.isEmpty()) { - // If no users are present, consider AM Limit for that queue. + if (!usersList.isEmpty()) { + userAMResourceLimit = resourceUsages.getUserAmLimit(); + } + // If no users are present or if AM limit per user doesn't exist, retrieve + // AM Limit for that queue. + if (userAMResourceLimit == null) { userAMResourceLimit = resourceUsages.getAMLimit(); - } else { - userAMResourceLimit = usersList.get(0) - .getResourceUsageInfo().getPartitionResourceUsageInfo(label) - .getAMLimit(); } ResourceInfo amUsed = (resourceUsages.getAmUsed() == null) ? new ResourceInfo(Resources.none()) @@ -235,11 +235,25 @@ class CapacitySchedulerPage extends RmView { .$class("ui-state-default").__("Non-Schedulable Apps").__().__().__() .tbody(); + PartitionResourcesInfo queueUsageResources = + lqinfo.getResources().getPartitionResourceUsageInfo( + nodeLabel == null ? "" : nodeLabel); + ArrayList users = lqinfo.getUsers().getUsersList(); for (UserInfo userInfo : users) { ResourceInfo resourcesUsed = userInfo.getResourcesUsed(); - PartitionResourcesInfo resourceUsages = userInfo.getResourceUsageInfo() - .getPartitionResourceUsageInfo((nodeLabel == null) ? "" : nodeLabel); + ResourceInfo userAMLimitPerPartition = + queueUsageResources.getUserAmLimit(); + // If AM limit per user is null, use the AM limit for the queue level. + if (userAMLimitPerPartition == null) { + userAMLimitPerPartition = queueUsageResources.getAMLimit(); + } + if (userInfo.getUserWeight() != 1.0) { + userAMLimitPerPartition = + new ResourceInfo( + Resources.multiply(userAMLimitPerPartition.getResource(), + userInfo.getUserWeight())); + } if (nodeLabel != null) { resourcesUsed = userInfo.getResourceUsageInfo() .getPartitionResourceUsageInfo(nodeLabel).getUsed(); @@ -254,7 +268,7 @@ class CapacitySchedulerPage extends RmView { .td(userInfo.getUserResourceLimit().toString()) .td(String.valueOf(userInfo.getUserWeight())) .td(resourcesUsed.toString()) - .td(resourceUsages.getAMLimit().toString()) + .td(userAMLimitPerPartition.toString()) .td(amUsed.toString()) .td(Integer.toString(userInfo.getNumActiveApplications())) .td(Integer.toString(userInfo.getNumPendingApplications())).__(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/PartitionResourcesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/PartitionResourcesInfo.java index 8b43f5d6bb..6f4c49d72f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/PartitionResourcesInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/PartitionResourcesInfo.java @@ -33,13 +33,15 @@ public class PartitionResourcesInfo { private ResourceInfo pending; private ResourceInfo amUsed; private ResourceInfo amLimit = new ResourceInfo(); + private ResourceInfo userAmLimit; public PartitionResourcesInfo() { } public PartitionResourcesInfo(String partitionName, ResourceInfo used, ResourceInfo reserved, ResourceInfo pending, - ResourceInfo amResourceUsed, ResourceInfo amResourceLimit) { + ResourceInfo amResourceUsed, ResourceInfo amResourceLimit, + ResourceInfo perUserAmResourceLimit) { super(); this.partitionName = partitionName; this.used = used; @@ -47,6 +49,7 @@ public class PartitionResourcesInfo { this.pending = pending; this.amUsed = amResourceUsed; this.amLimit = amResourceLimit; + this.userAmLimit = perUserAmResourceLimit; } public String getPartitionName() { @@ -96,4 +99,18 @@ public class PartitionResourcesInfo { public void setAMLimit(ResourceInfo amLimit) { this.amLimit = amLimit; } + + /** + * @return the userAmLimit + */ + public ResourceInfo getUserAmLimit() { + return userAmLimit; + } + + /** + * @param userAmLimit the userAmLimit to set + */ + public void setUserAmLimit(ResourceInfo userAmLimit) { + this.userAmLimit = userAmLimit; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourcesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourcesInfo.java index 4f1e1c956d..1d96f79081 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourcesInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourcesInfo.java @@ -51,7 +51,9 @@ public class ResourcesInfo { considerAMUsage ? new ResourceInfo(resourceUsage .getAMUsed(partitionName)) : null, considerAMUsage ? new ResourceInfo(resourceUsage - .getAMLimit(partitionName)) : null)); + .getAMLimit(partitionName)) : null, + considerAMUsage ? new ResourceInfo(resourceUsage + .getUserAMLimit(partitionName)) : null)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesForCSWithPartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesForCSWithPartitions.java index c286186827..95dffce227 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesForCSWithPartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesForCSWithPartitions.java @@ -508,13 +508,13 @@ public class TestRMWebServicesForCSWithPartitions extends JerseyTestBase { partitionInfo = partitionsCapsArray.getJSONObject(0); partitionName = partitionInfo.getString("partitionName"); verifyPartitionCapacityInfoJson(partitionInfo, 30, 0, 50, 30, 0, 50); - assertEquals("incorrect number of elements", 6, + assertEquals("incorrect number of elements", 7, partitionsResourcesArray.getJSONObject(0).length()); break; case QUEUE_B: assertEquals("Invalid default Label expression", LABEL_LX, queueJson.getString("defaultNodeLabelExpression")); - assertEquals("incorrect number of elements", 6, + assertEquals("incorrect number of elements", 7, partitionsResourcesArray.getJSONObject(0).length()); verifyAccesibleNodeLabels(queueJson, ImmutableSet.of(LABEL_LX)); assertEquals("incorrect number of partitions", 2,