添加Capacity调度器的源码注释

This commit is contained in:
LingZhaoHui 2024-12-15 16:24:44 +08:00
parent dce4387b3d
commit f94a3d9ca5
Signed by: zeekling
GPG Key ID: D96E4E75267CA2CC
3 changed files with 18 additions and 5 deletions

View File

@ -1123,6 +1123,16 @@ private ConcurrentMap<String, CachedUserLimit> getUserLimitCache(
} }
} }
/**
* 分配资源到子队列的逻辑
* @param clusterResource the resource of the cluster.
* @param candidates {@link CandidateNodeSet} the nodes that are considered
* for the current placement.
* @param currentResourceLimits how much overall resource of this queue can use.
* @param schedulingMode Type of exclusive check when assign container on a
* NodeManager, see {@link SchedulingMode}.
* @return
*/
@Override @Override
public CSAssignment assignContainers(Resource clusterResource, public CSAssignment assignContainers(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates, CandidateNodeSet<FiCaSchedulerNode> candidates,
@ -1135,6 +1145,7 @@ public CSAssignment assignContainers(Resource clusterResource,
+ " #applications=" + orderingPolicy.getNumSchedulableEntities()); + " #applications=" + orderingPolicy.getNumSchedulableEntities());
} }
// 是否允许抢占
setPreemptionAllowed(currentResourceLimits, candidates.getPartition()); setPreemptionAllowed(currentResourceLimits, candidates.getPartition());
// Check for reserved resources, try to allocate reserved container first. // Check for reserved resources, try to allocate reserved container first.
@ -1228,6 +1239,7 @@ public CSAssignment assignContainers(Resource clusterResource,
if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) { if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) {
userAssignable = false; userAssignable = false;
} else { } else {
// 资源分配到用户
userAssignable = canAssignToUser(clusterResource, application.getUser(), userAssignable = canAssignToUser(clusterResource, application.getUser(),
userLimit, application, candidates.getPartition(), userLimit, application, candidates.getPartition(),
currentResourceLimits); currentResourceLimits);

View File

@ -626,7 +626,7 @@ public CSAssignment assignContainers(Resource clusterResource,
break; break;
} }
// Schedule // Schedule , 开始给子队列分配资源
CSAssignment assignedToChild = assignContainersToChildQueues( CSAssignment assignedToChild = assignContainersToChildQueues(
clusterResource, candidates, resourceLimits, schedulingMode); clusterResource, candidates, resourceLimits, schedulingMode);
assignment.setType(assignedToChild.getType()); assignment.setType(assignedToChild.getType());
@ -834,6 +834,7 @@ private CSAssignment assignContainersToChildQueues(Resource cluster,
getResourceLimitsOfChild(childQueue, cluster, limits.getNetLimit(), getResourceLimitsOfChild(childQueue, cluster, limits.getNetLimit(),
candidates.getPartition()); candidates.getPartition());
// 分配资源到队列
CSAssignment childAssignment = childQueue.assignContainers(cluster, CSAssignment childAssignment = childQueue.assignContainers(cluster,
candidates, childLimits, schedulingMode); candidates, childLimits, schedulingMode);
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {

View File

@ -423,7 +423,7 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource,
ContainerAllocation allocation; ContainerAllocation allocation;
NodeType requestLocalityType = null; NodeType requestLocalityType = null;
// Data-local // Data-local 优先分配local
PendingAsk nodeLocalAsk = PendingAsk nodeLocalAsk =
application.getPendingAsk(schedulerKey, node.getNodeName()); application.getPendingAsk(schedulerKey, node.getNodeName());
if (nodeLocalAsk.getCount() > 0) { if (nodeLocalAsk.getCount() > 0) {
@ -439,7 +439,7 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource,
} }
} }
// Rack-local // Rack-local 在Rack-local 分配
PendingAsk rackLocalAsk = PendingAsk rackLocalAsk =
application.getPendingAsk(schedulerKey, node.getRackName()); application.getPendingAsk(schedulerKey, node.getRackName());
if (rackLocalAsk.getCount() > 0) { if (rackLocalAsk.getCount() > 0) {
@ -466,7 +466,7 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource,
} }
} }
// Off-switch // Off-switch 检查不限制
PendingAsk offSwitchAsk = PendingAsk offSwitchAsk =
application.getPendingAsk(schedulerKey, ResourceRequest.ANY); application.getPendingAsk(schedulerKey, ResourceRequest.ANY);
if (offSwitchAsk.getCount() > 0) { if (offSwitchAsk.getCount() > 0) {
@ -922,7 +922,7 @@ public CSAssignment assignContainers(Resource clusterResource,
return CSAssignment.SKIP_ASSIGNMENT; return CSAssignment.SKIP_ASSIGNMENT;
} }
// Schedule in priority order // Schedule in priority order 按照优先级顺序分配资源
for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) { for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
ContainerAllocation result = allocate(clusterResource, candidates, ContainerAllocation result = allocate(clusterResource, candidates,
schedulingMode, resourceLimits, schedulerKey, null); schedulingMode, resourceLimits, schedulerKey, null);