diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 1f5820d807..2da52d55b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -73,6 +73,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -1189,84 +1190,72 @@ boolean canAssignToThisQueue(Resource clusterResource, } + private static String ensurePartition(String partition) { + return Optional.ofNullable(partition).orElse(RMNodeLabelsManager.NO_LABEL); + } + + @FunctionalInterface + interface Counter { + void count(String partition, Resource resource); + } + + @FunctionalInterface + interface CounterWithApp { + void count(String partition, Resource reservedRes, SchedulerApplicationAttempt application); + } + + private void count(String partition, Resource resource, Counter counter, Counter parentCounter) { + final String checkedPartition = ensurePartition(partition); + counter.count(checkedPartition, resource); + Optional.ofNullable(parentCounter).ifPresent(c -> c.count(checkedPartition, resource)); + } + + private void countAndUpdate(String partition, Resource resource, + Counter counter, CounterWithApp parentCounter) { + final String checkedPartition = ensurePartition(partition); + counter.count(checkedPartition, resource); + CSQueueUtils.updateUsedCapacity(resourceCalculator, + labelManager.getResourceByLabel(checkedPartition, Resources.none()), + checkedPartition, this); + Optional.ofNullable(parentCounter).ifPresent(c -> c.count(checkedPartition, resource, null)); + } + @Override public void incReservedResource(String partition, Resource reservedRes) { - if (partition == null) { - partition = RMNodeLabelsManager.NO_LABEL; - } - - queueUsage.incReserved(partition, reservedRes); - if(null != parent){ - parent.incReservedResource(partition, reservedRes); - } + count(partition, reservedRes, queueUsage::incReserved, + parent == null ? null : parent::incReservedResource); } @Override public void decReservedResource(String partition, Resource reservedRes) { - if (partition == null) { - partition = RMNodeLabelsManager.NO_LABEL; - } - - queueUsage.decReserved(partition, reservedRes); - if(null != parent){ - parent.decReservedResource(partition, reservedRes); - } + count(partition, reservedRes, queueUsage::decReserved, + parent == null ? null : parent::decReservedResource); } @Override public void incPendingResource(String nodeLabel, Resource resourceToInc) { - if (nodeLabel == null) { - nodeLabel = RMNodeLabelsManager.NO_LABEL; - } - // ResourceUsage has its own lock, no addition lock needs here. - queueUsage.incPending(nodeLabel, resourceToInc); - if (null != parent) { - parent.incPendingResource(nodeLabel, resourceToInc); - } + count(nodeLabel, resourceToInc, queueUsage::incPending, + parent == null ? null : parent::incPendingResource); } @Override public void decPendingResource(String nodeLabel, Resource resourceToDec) { - if (nodeLabel == null) { - nodeLabel = RMNodeLabelsManager.NO_LABEL; - } - // ResourceUsage has its own lock, no addition lock needs here. - queueUsage.decPending(nodeLabel, resourceToDec); - if (null != parent) { - parent.decPendingResource(nodeLabel, resourceToDec); - } + count(nodeLabel, resourceToDec, queueUsage::decPending, + parent == null ? null : parent::decPendingResource); } @Override public void incUsedResource(String nodeLabel, Resource resourceToInc, SchedulerApplicationAttempt application) { - if (nodeLabel == null) { - nodeLabel = RMNodeLabelsManager.NO_LABEL; - } - // ResourceUsage has its own lock, no addition lock needs here. - queueUsage.incUsed(nodeLabel, resourceToInc); - CSQueueUtils.updateUsedCapacity(resourceCalculator, - labelManager.getResourceByLabel(nodeLabel, Resources.none()), - nodeLabel, this); - if (null != parent) { - parent.incUsedResource(nodeLabel, resourceToInc, null); - } + countAndUpdate(nodeLabel, resourceToInc, queueUsage::incUsed, + parent == null ? null : parent::incUsedResource); } @Override public void decUsedResource(String nodeLabel, Resource resourceToDec, SchedulerApplicationAttempt application) { - if (nodeLabel == null) { - nodeLabel = RMNodeLabelsManager.NO_LABEL; - } - // ResourceUsage has its own lock, no addition lock needs here. - queueUsage.decUsed(nodeLabel, resourceToDec); - CSQueueUtils.updateUsedCapacity(resourceCalculator, - labelManager.getResourceByLabel(nodeLabel, Resources.none()), - nodeLabel, this); - if (null != parent) { - parent.decUsedResource(nodeLabel, resourceToDec, null); - } + countAndUpdate(nodeLabel, resourceToDec, queueUsage::decUsed, + parent == null ? null : parent::decUsedResource); } /**