添加Capacity调度器的源码注释

This commit is contained in:
LingZhaoHui 2024-11-08 23:21:45 +08:00
parent 8b59d85733
commit b43901c271
Signed by: zeekling
GPG Key ID: D96E4E75267CA2CC
4 changed files with 56 additions and 2 deletions

View File

@ -251,6 +251,12 @@ public static Resource add(Resource lhs, Resource rhs) {
return addTo(clone(lhs), rhs);
}
/**
* result = lhs - rhs
* @param lhs
* @param rhs
* @return
*/
public static Resource subtractFrom(Resource lhs, Resource rhs) {
int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
for (int i = 0; i < maxLength; i++) {
@ -266,6 +272,12 @@ public static Resource subtractFrom(Resource lhs, Resource rhs) {
return lhs;
}
/**
* result = lhs - rhs
* @param lhs
* @param rhs
* @return
*/
public static Resource subtract(Resource lhs, Resource rhs) {
return subtractFrom(clone(lhs), rhs);
}

View File

@ -1004,6 +1004,15 @@ private boolean isIntraQueueHierarchyPreemptionDisabled(CSQueue q,
parentQ.getIntraQueuePreemptionDisabledInHierarchy());
}
/**
* 当前队列限制如果是labeled resource: limit = queue-max-resource
* 如果是non-labeled resourcelimit = min(queue-max-resource, limit-set-by-parent)
* @param nodePartition
* @param clusterResource
* @param currentResourceLimits
* @param schedulingMode
* @return
*/
private Resource getCurrentLimitResource(String nodePartition,
Resource clusterResource, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode) {
@ -1037,6 +1046,15 @@ public boolean hasChildQueues() {
return childQueues != null && !childQueues.isEmpty();
}
/**
* 是否可以给当前队列分配资源在可以分配或者能够预留资源的时候返回true
* @param clusterResource
* @param nodePartition
* @param currentResourceLimits
* @param resourceCouldBeUnreserved
* @param schedulingMode
* @return
*/
boolean canAssignToThisQueue(Resource clusterResource,
String nodePartition, ResourceLimits currentResourceLimits,
Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) {
@ -1071,6 +1089,7 @@ boolean canAssignToThisQueue(Resource clusterResource,
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
usedExceptKillable, currentLimitResource)) {
// 真实已使用的 >= 当前队列的限制
// if reservation continue looking enabled, check to see if could we
// potentially use this node instead of a reserved node if the application
@ -1078,6 +1097,7 @@ boolean canAssignToThisQueue(Resource clusterResource,
if (this.reservationsContinueLooking
&& Resources.greaterThan(resourceCalculator, clusterResource,
resourceCouldBeUnreserved, Resources.none())) {
// 开启运行时预留👃并且当前节点存在资源可以预留
// resource-without-reserved = used - reserved
Resource newTotalWithoutReservedResource = Resources.subtract(
usedExceptKillable, resourceCouldBeUnreserved);
@ -1086,6 +1106,7 @@ boolean canAssignToThisQueue(Resource clusterResource,
// have chance to allocate on this node by unreserving some containers
if (Resources.lessThan(resourceCalculator, clusterResource,
newTotalWithoutReservedResource, currentLimitResource)) {
// 需要预留的资源没有超过限制
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: " + getQueuePath()
+ " usedResources: " + queueUsage.getUsed()
@ -1111,6 +1132,7 @@ boolean canAssignToThisQueue(Resource clusterResource,
}
return false;
}
// 资源足够可以给当前队列分配
if (LOG.isDebugEnabled()) {
LOG.debug("Check assign to queue: " + getQueuePath()
+ " nodePartition: " + nodePartition

View File

@ -1696,7 +1696,7 @@ private void allocateFromReservedContainer(FiCaSchedulerNode node,
private CSAssignment allocateOrReserveNewContainers(
CandidateNodeSet<FiCaSchedulerNode> candidates,
boolean withNodeHeartbeat) {
// 优先按照资源池标签从根队列开始分配
// 优先从没有资源池标签从根队列开始分配
CSAssignment assignment = getRootQueue().assignContainers(
getClusterResource(), candidates, new ResourceLimits(labelManager
.getResourceByLabel(candidates.getPartition(),
@ -1738,6 +1738,7 @@ private CSAssignment allocateOrReserveNewContainers(
return null;
}
// 从资源包含资源标签的节点分配
// Try to use NON_EXCLUSIVE
assignment = getRootQueue().assignContainers(getClusterResource(),
candidates,

View File

@ -575,7 +575,7 @@ public CSAssignment assignContainers(Resource clusterResource,
// queue doesn't need more resources.
if (!super.hasPendingResourceRequest(candidates.getPartition(),
clusterResource, schedulingMode)) {
// 分配请求
// 有请求可以分配
if (LOG.isDebugEnabled()) {
long now = System.currentTimeMillis();
// Do logging every 1 sec to avoid excessive logging.
@ -720,6 +720,12 @@ public CSAssignment assignContainers(Resource clusterResource,
return assignment;
}
/**
* 计算是不是可以分配有资源可以分配没资源不可以分配
* @param clusterResource 集群资源
* @param node 节点信息
* @return
*/
private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
// When node == null means global scheduling is enabled, always return true
if (null == node) {
@ -782,11 +788,24 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child,
return new ResourceLimits(childLimit);
}
/**
* 子队列排序
* @param partition
* @return
*/
private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(
String partition) {
return queueOrderingPolicy.getAssignmentIterator(partition);
}
/**
* 分配资源到子队列
* @param cluster
* @param candidates
* @param limits
* @param schedulingMode
* @return
*/
private CSAssignment assignContainersToChildQueues(Resource cluster,
CandidateNodeSet<FiCaSchedulerNode> candidates, ResourceLimits limits,
SchedulingMode schedulingMode) {