增加容量调度器的代码注释。

This commit is contained in:
LingZhaoHui 2024-03-17 22:12:04 +08:00
parent aa9f3e1da4
commit b0e5465244
Signed by: zeekling
GPG Key ID: D96E4E75267CA2CC
2 changed files with 13 additions and 2 deletions

View File

@ -213,6 +213,8 @@ public void finishApplicationAttempt(FiCaSchedulerApp application,
/**
* Assign containers to applications in the queue or it's children (if any).
* 分配资源包括父队列以及子队列
*
* @param clusterResource the resource of the cluster.
* @param candidates {@link CandidateNodeSet} the nodes that are considered
* for the current placement.

View File

@ -515,6 +515,13 @@ long getAsyncScheduleInterval() {
private final static Random random = new Random(System.currentTimeMillis());
/**
* 跳过已经由2次心跳周期没有上报的节点
* @param node
* @param cs
* @param printVerboseLog
* @return
*/
private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node,
CapacityScheduler cs, boolean printVerboseLog) {
// Skip node which missed 2 heartbeats since the node might be dead and
@ -557,7 +564,7 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{
printedVerboseLoggingForAsyncScheduling = false;
}
// Allocate containers of node [start, end)
// Allocate containers of node [start, end) 从随机的节点开始分配可以保证分配均衡
for (FiCaSchedulerNode node : nodes) {
if (current++ >= start) {
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
@ -569,7 +576,7 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{
current = 0;
// Allocate containers of node [0, start)
// Allocate containers of node [0, start) 从0到随机的节点
for (FiCaSchedulerNode node : nodes) {
if (current++ > start) {
break;
@ -1796,6 +1803,7 @@ CSAssignment allocateContainersToNode(
if (!multiNodePlacementEnabled) {
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
node.getNodeID());
// 申请资源到单个节点老版本逻辑
assignment = allocateContainerOnSingleNode(candidates,
node, withNodeHeartbeat);
ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
@ -1803,6 +1811,7 @@ CSAssignment allocateContainersToNode(
} else{
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
ActivitiesManager.EMPTY_NODE_ID);
// 申请资源到多个节点新逻辑
assignment = allocateContainersOnMultiNodes(candidates);
ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
ActivitiesManager.EMPTY_NODE_ID, candidates.getPartition());