From b43901c2712c3d3c1bc0305acfcf768c942f9d35 Mon Sep 17 00:00:00 2001 From: zeekling Date: Fri, 8 Nov 2024 23:21:45 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0Capacity=E8=B0=83=E5=BA=A6?= =?UTF-8?q?=E5=99=A8=E7=9A=84=E6=BA=90=E7=A0=81=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hadoop/yarn/util/resource/Resources.java | 12 ++++++++++ .../scheduler/capacity/AbstractCSQueue.java | 22 +++++++++++++++++++ .../scheduler/capacity/CapacityScheduler.java | 3 ++- .../scheduler/capacity/ParentQueue.java | 21 +++++++++++++++++- 4 files changed, 56 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 9b96fd72b9..c5e5776391 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -251,6 +251,12 @@ public static Resource add(Resource lhs, Resource rhs) { return addTo(clone(lhs), rhs); } + /** + * result = lhs - rhs + * @param lhs + * @param rhs + * @return + */ public static Resource subtractFrom(Resource lhs, Resource rhs) { int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { @@ -266,6 +272,12 @@ public static Resource subtractFrom(Resource lhs, Resource rhs) { return lhs; } + /** + * result = lhs - rhs + * @param lhs + * @param rhs + * @return + */ public static Resource subtract(Resource lhs, Resource rhs) { return subtractFrom(clone(lhs), rhs); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 3870d4214b..37f94aa8e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -1004,6 +1004,15 @@ private boolean isIntraQueueHierarchyPreemptionDisabled(CSQueue q, parentQ.getIntraQueuePreemptionDisabledInHierarchy()); } + /** + * 当前队列限制,如果是labeled resource: limit = queue-max-resource, + * 如果是non-labeled resource:limit = min(queue-max-resource, limit-set-by-parent) + * @param nodePartition + * @param clusterResource + * @param currentResourceLimits + * @param schedulingMode + * @return + */ private Resource getCurrentLimitResource(String nodePartition, Resource clusterResource, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { @@ -1037,6 +1046,15 @@ public boolean hasChildQueues() { return childQueues != null && !childQueues.isEmpty(); } + /** + * 是否可以给当前队列分配资源,在可以分配或者能够预留资源的时候返回true。 + * @param clusterResource + * @param nodePartition + * @param currentResourceLimits + * @param resourceCouldBeUnreserved + * @param schedulingMode + * @return + */ boolean canAssignToThisQueue(Resource clusterResource, String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) { @@ -1071,6 +1089,7 @@ boolean canAssignToThisQueue(Resource clusterResource, if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, usedExceptKillable, currentLimitResource)) { + // 真实已使用的 >= 当前队列的限制 // if reservation continue looking enabled, check to see if could we // potentially use this node instead of a reserved node if the application @@ -1078,6 +1097,7 @@ boolean canAssignToThisQueue(Resource clusterResource, if (this.reservationsContinueLooking && Resources.greaterThan(resourceCalculator, clusterResource, resourceCouldBeUnreserved, Resources.none())) { + // 开启运行时预留,👃并且当前节点存在资源可以预留。 // resource-without-reserved = used - reserved Resource newTotalWithoutReservedResource = Resources.subtract( usedExceptKillable, resourceCouldBeUnreserved); @@ -1086,6 +1106,7 @@ boolean canAssignToThisQueue(Resource clusterResource, // have chance to allocate on this node by unreserving some containers if (Resources.lessThan(resourceCalculator, clusterResource, newTotalWithoutReservedResource, currentLimitResource)) { + // 需要预留的资源没有超过限制。 if (LOG.isDebugEnabled()) { LOG.debug("try to use reserved: " + getQueuePath() + " usedResources: " + queueUsage.getUsed() @@ -1111,6 +1132,7 @@ boolean canAssignToThisQueue(Resource clusterResource, } return false; } + // 资源足够,可以给当前队列分配。 if (LOG.isDebugEnabled()) { LOG.debug("Check assign to queue: " + getQueuePath() + " nodePartition: " + nodePartition diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 148861f604..2cf58e2fc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1696,7 +1696,7 @@ private void allocateFromReservedContainer(FiCaSchedulerNode node, private CSAssignment allocateOrReserveNewContainers( CandidateNodeSet candidates, boolean withNodeHeartbeat) { - // 优先按照资源池标签从根队列开始分配。 + // 优先从没有资源池标签从根队列开始分配。 CSAssignment assignment = getRootQueue().assignContainers( getClusterResource(), candidates, new ResourceLimits(labelManager .getResourceByLabel(candidates.getPartition(), @@ -1738,6 +1738,7 @@ private CSAssignment allocateOrReserveNewContainers( return null; } + // 从资源包含资源标签的节点分配。 // Try to use NON_EXCLUSIVE assignment = getRootQueue().assignContainers(getClusterResource(), candidates, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index c5dc31504c..061e25dd0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -575,7 +575,7 @@ public CSAssignment assignContainers(Resource clusterResource, // queue doesn't need more resources. if (!super.hasPendingResourceRequest(candidates.getPartition(), clusterResource, schedulingMode)) { - // 没分配请求 + // 没有请求可以分配。 if (LOG.isDebugEnabled()) { long now = System.currentTimeMillis(); // Do logging every 1 sec to avoid excessive logging. @@ -720,6 +720,12 @@ public CSAssignment assignContainers(Resource clusterResource, return assignment; } + /** + * 计算是不是可以分配,有资源可以分配,没资源不可以分配。 + * @param clusterResource 集群资源 + * @param node 节点信息 + * @return + */ private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { // When node == null means global scheduling is enabled, always return true if (null == node) { @@ -782,11 +788,24 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, return new ResourceLimits(childLimit); } + /** + * 子队列排序。 + * @param partition + * @return + */ private Iterator sortAndGetChildrenAllocationIterator( String partition) { return queueOrderingPolicy.getAssignmentIterator(partition); } + /** + * 分配资源到子队列。 + * @param cluster + * @param candidates + * @param limits + * @param schedulingMode + * @return + */ private CSAssignment assignContainersToChildQueues(Resource cluster, CandidateNodeSet candidates, ResourceLimits limits, SchedulingMode schedulingMode) {