添加Capacity调度器的源码注释

This commit is contained in:
LingZhaoHui 2024-12-08 23:43:57 +08:00
parent fbd6e04709
commit dce4387b3d
Signed by: zeekling
GPG Key ID: D96E4E75267CA2CC
2 changed files with 29 additions and 1 deletions

View File

@ -22,7 +22,12 @@
* Resource classification.
*/
public enum NodeType {
NODE_LOCAL(0), RACK_LOCAL(1), OFF_SWITCH(2);
//请求规定了必须运行在某个的服务器节点
NODE_LOCAL(0),
// 请求规定了必须运行在某个机架上
RACK_LOCAL(1),
// 这个请求对本地化没有要求
OFF_SWITCH(2);
private final int index;

View File

@ -1363,6 +1363,7 @@ protected void nodeUpdate(RMNode rmNode) {
readLock.lock();
try {
setLastNodeUpdateTime(Time.now());
// 优先处理心跳逻辑
super.nodeUpdate(rmNode);
} finally {
readLock.unlock();
@ -1370,6 +1371,7 @@ protected void nodeUpdate(RMNode rmNode) {
// Try to do scheduling
if (!scheduleAsynchronously) {
// 在不开启异步调度的时候需要借助节点的心跳进行调度默认建议开启异步调度
writeLock.lock();
try {
// reset allocation and reservation stats before we start doing any
@ -1484,6 +1486,13 @@ private void updateSchedulerHealth(long now, NodeId nodeId,
.getAssignmentInformation().getReserved());
}
/**
* 是否需要继续分配资源
* @param assignment 已经分配的资源信息
* @param offswitchCount 不限制节点和机架的个数
* @param assignedContainers 已经分配的Container个数
* @return
*/
private boolean canAllocateMore(CSAssignment assignment, int offswitchCount,
int assignedContainers) {
// Current assignment shouldn't be empty
@ -1529,6 +1538,7 @@ private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
/**
* We need to make sure when doing allocation, Node should be existed
* And we will construct a {@link CandidateNodeSet} before proceeding
* @param withNodeHeartbeat 如果是未开启异步调度则需要通过心跳来实现当前值为true,否则为false
*/
private void allocateContainersToNode(NodeId nodeId,
boolean withNodeHeartbeat) {
@ -1544,6 +1554,7 @@ private void allocateContainersToNode(NodeId nodeId,
// Only check if we can allocate more container on the same node when
// scheduling is triggered by node heartbeat
if (null != assignment && withNodeHeartbeat) {
// 如果是由心跳触发的分配会尝试再次分配
if (assignment.getType() == NodeType.OFF_SWITCH) {
offswitchCount++;
}
@ -1693,6 +1704,12 @@ private void allocateFromReservedContainer(FiCaSchedulerNode node,
submitResourceCommitRequest(getClusterResource(), assignment);
}
/**
* 为Container申请或者预留资源
* @param candidates
* @param withNodeHeartbeat
* @return
*/
private CSAssignment allocateOrReserveNewContainers(
CandidateNodeSet<FiCaSchedulerNode> candidates,
boolean withNodeHeartbeat) {
@ -1787,6 +1804,12 @@ private CSAssignment allocateContainersOnMultiNodes(
return allocateOrReserveNewContainers(candidates, false);
}
/**
* 将资源分派给指定的节点
* @param candidates
* @param withNodeHeartbeat
* @return
*/
@VisibleForTesting
CSAssignment allocateContainersToNode(
CandidateNodeSet<FiCaSchedulerNode> candidates,