添加Capacity调度器的源码注释

This commit is contained in:
LingZhaoHui 2024-11-06 00:37:42 +08:00
parent 4e06ce7167
commit 8b59d85733
Signed by: zeekling
GPG Key ID: D96E4E75267CA2CC
2 changed files with 6 additions and 0 deletions

View File

@ -1606,6 +1606,7 @@ private CSAssignment allocateContainerOnSingleNode(
// 2. Schedule if there are no reservations // 2. Schedule if there are no reservations
RMContainer reservedContainer = node.getReservedContainer(); RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) { if (reservedContainer != null) {
// 存在预留资源优先使用预留资源
allocateFromReservedContainer(node, withNodeHeartbeat, reservedContainer); allocateFromReservedContainer(node, withNodeHeartbeat, reservedContainer);
// Do not schedule if there are any reservations to fulfill on the node // Do not schedule if there are any reservations to fulfill on the node
LOG.debug("Skipping scheduling since node {} is reserved by" LOG.debug("Skipping scheduling since node {} is reserved by"
@ -1620,6 +1621,7 @@ private CSAssignment allocateContainerOnSingleNode(
if (calculator.computeAvailableContainers(Resources if (calculator.computeAvailableContainers(Resources
.add(node.getUnallocatedResource(), node.getTotalKillableResources()), .add(node.getUnallocatedResource(), node.getTotalKillableResources()),
minimumAllocation) <= 0) { minimumAllocation) <= 0) {
// 对于剩余资源不够的情况无须继续分配资源
LOG.debug("This node " + node.getNodeID() + " doesn't have sufficient " LOG.debug("This node " + node.getNodeID() + " doesn't have sufficient "
+ "available or preemptible resource for minimum allocation"); + "available or preemptible resource for minimum allocation");
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
@ -1631,6 +1633,7 @@ private CSAssignment allocateContainerOnSingleNode(
return null; return null;
} }
// 开始分配或者预留资源
return allocateOrReserveNewContainers(candidates, withNodeHeartbeat); return allocateOrReserveNewContainers(candidates, withNodeHeartbeat);
} }
@ -1693,6 +1696,7 @@ private void allocateFromReservedContainer(FiCaSchedulerNode node,
private CSAssignment allocateOrReserveNewContainers( private CSAssignment allocateOrReserveNewContainers(
CandidateNodeSet<FiCaSchedulerNode> candidates, CandidateNodeSet<FiCaSchedulerNode> candidates,
boolean withNodeHeartbeat) { boolean withNodeHeartbeat) {
// 优先按照资源池标签从根队列开始分配
CSAssignment assignment = getRootQueue().assignContainers( CSAssignment assignment = getRootQueue().assignContainers(
getClusterResource(), candidates, new ResourceLimits(labelManager getClusterResource(), candidates, new ResourceLimits(labelManager
.getResourceByLabel(candidates.getPartition(), .getResourceByLabel(candidates.getPartition(),

View File

@ -548,6 +548,7 @@ public CSAssignment assignContainers(Resource clusterResource,
// if our queue cannot access this node, just return // if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(candidates.getPartition())) { && !accessibleToPartition(candidates.getPartition())) {
// 没权限
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
// Do logging every 1 sec to avoid excessive logging. // Do logging every 1 sec to avoid excessive logging.
@ -574,6 +575,7 @@ public CSAssignment assignContainers(Resource clusterResource,
// queue doesn't need more resources. // queue doesn't need more resources.
if (!super.hasPendingResourceRequest(candidates.getPartition(), if (!super.hasPendingResourceRequest(candidates.getPartition(),
clusterResource, schedulingMode)) { clusterResource, schedulingMode)) {
// 没分配请求
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
// Do logging every 1 sec to avoid excessive logging. // Do logging every 1 sec to avoid excessive logging.