YARN-11100. Fix StackOverflowError in SLS scheduler event handling. Contributed by Szilard Nemeth.

This commit is contained in:
9uapaw 2022-03-26 21:43:10 +01:00
parent 61e809b245
commit adbaf48082
5 changed files with 58 additions and 7 deletions

View File

@ -175,11 +175,12 @@ public void setConf(Configuration conf) {
}
private void init(Configuration tempConf) throws ClassNotFoundException {
// runner configuration
setConf(tempConf);
nmMap = new ConcurrentHashMap<>();
queueAppNumMap = new HashMap<>();
amRunner = new AMRunner(runner, this);
// runner configuration
setConf(tempConf);
// runner
poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE,

View File

@ -39,7 +39,7 @@
@Private
@Unstable
public class SLSCapacityScheduler extends CapacityScheduler implements
SchedulerWrapper,Configurable {
SchedulerWrapper, Configurable {
private final SLSSchedulerCommons schedulerCommons;
private Configuration conf;
@ -65,6 +65,15 @@ public Allocation allocate(ApplicationAttemptId attemptId,
containerIds, blacklistAdditions, blacklistRemovals, updateRequests);
}
@Override
public Allocation allocatePropagated(ApplicationAttemptId attemptId,
List<ResourceRequest> resourceRequests,
List<SchedulingRequest> schedulingRequests,
List<ContainerId> containerIds, List<String> blacklistAdditions,
List<String> blacklistRemovals, ContainerUpdates updateRequests) {
return super.allocate(attemptId, resourceRequests, schedulingRequests,
containerIds, blacklistAdditions, blacklistRemovals, updateRequests);
}
@Override
public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
@ -97,6 +106,11 @@ public void handle(SchedulerEvent schedulerEvent) {
schedulerCommons.handle(schedulerEvent);
}
@Override
public void propagatedHandle(SchedulerEvent schedulerEvent) {
super.handle(schedulerEvent);
}
@Override
public void serviceStop() throws Exception {
schedulerCommons.stopMetrics();

View File

@ -63,6 +63,21 @@ public void handle(SchedulerEvent schedulerEvent) {
schedulerCommons.handle(schedulerEvent);
}
@Override
public void propagatedHandle(SchedulerEvent schedulerEvent) {
super.handle(schedulerEvent);
}
@Override
public Allocation allocatePropagated(ApplicationAttemptId attemptId,
List<ResourceRequest> resourceRequests,
List<SchedulingRequest> schedulingRequests,
List<ContainerId> containerIds, List<String> blacklistAdditions,
List<String> blacklistRemovals, ContainerUpdates updateRequests) {
return super.allocate(attemptId, resourceRequests, schedulingRequests,
containerIds, blacklistAdditions, blacklistRemovals, updateRequests);
}
@Override
public void serviceStop() throws Exception {
schedulerCommons.stopMetrics();

View File

@ -100,7 +100,8 @@ public Allocation allocate(ApplicationAttemptId attemptId,
.time();
Allocation allocation = null;
try {
allocation = scheduler.allocate(attemptId, resourceRequests,
allocation = ((SchedulerWrapper)scheduler).allocatePropagated(
attemptId, resourceRequests,
schedulingRequests, containerIds,
blacklistAdditions, blacklistRemovals, updateRequests);
return allocation;
@ -118,7 +119,8 @@ public Allocation allocate(ApplicationAttemptId attemptId,
}
}
} else {
return scheduler.allocate(attemptId, resourceRequests, schedulingRequests,
return ((SchedulerWrapper)scheduler).allocatePropagated(
attemptId, resourceRequests, schedulingRequests,
containerIds,
blacklistAdditions, blacklistRemovals, updateRequests);
}
@ -204,7 +206,7 @@ private void updateQueueWithAllocateRequest(Allocation allocation,
public void handle(SchedulerEvent schedulerEvent) {
if (!metricsON) {
scheduler.handle(schedulerEvent);
((SchedulerWrapper)scheduler).propagatedHandle(schedulerEvent);
return;
}
@ -245,7 +247,7 @@ public void handle(SchedulerEvent schedulerEvent) {
operationTimer = schedulerMetrics.getSchedulerHandleTimer(
schedulerEvent.getType()).time();
scheduler.handle(schedulerEvent);
((SchedulerWrapper)scheduler).propagatedHandle(schedulerEvent);
} finally {
if (handlerTimer != null) {
handlerTimer.stop();

View File

@ -19,7 +19,16 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import java.util.List;
@Private
@Unstable
@ -29,4 +38,14 @@ public interface SchedulerWrapper {
Tracker getTracker();
String getRealQueueName(String queue) throws YarnException;
void propagatedHandle(SchedulerEvent schedulerEvent);
Allocation allocatePropagated(ApplicationAttemptId attemptId,
List<ResourceRequest> resourceRequests,
List<SchedulingRequest> schedulingRequests,
List<ContainerId> containerIds,
List<String> blacklistAdditions,
List<String> blacklistRemovals,
ContainerUpdates updateRequests);
}