diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 48ad610310..2110e3c196 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -175,11 +175,12 @@ public void setConf(Configuration conf) { } private void init(Configuration tempConf) throws ClassNotFoundException { + // runner configuration + setConf(tempConf); + nmMap = new ConcurrentHashMap<>(); queueAppNumMap = new HashMap<>(); amRunner = new AMRunner(runner, this); - // runner configuration - setConf(tempConf); // runner poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index b6fe5c0f96..39170b1872 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -39,7 +39,7 @@ @Private @Unstable public class SLSCapacityScheduler extends CapacityScheduler implements - SchedulerWrapper,Configurable { + SchedulerWrapper, Configurable { private final SLSSchedulerCommons schedulerCommons; private Configuration conf; @@ -65,6 +65,15 @@ public Allocation allocate(ApplicationAttemptId attemptId, containerIds, blacklistAdditions, blacklistRemovals, updateRequests); } + @Override + public Allocation allocatePropagated(ApplicationAttemptId attemptId, + List resourceRequests, + List schedulingRequests, + List containerIds, List blacklistAdditions, + List blacklistRemovals, ContainerUpdates updateRequests) { + return super.allocate(attemptId, resourceRequests, schedulingRequests, + containerIds, blacklistAdditions, blacklistRemovals, updateRequests); + } @Override public boolean tryCommit(Resource cluster, ResourceCommitRequest r, @@ -97,6 +106,11 @@ public void handle(SchedulerEvent schedulerEvent) { schedulerCommons.handle(schedulerEvent); } + @Override + public void propagatedHandle(SchedulerEvent schedulerEvent) { + super.handle(schedulerEvent); + } + @Override public void serviceStop() throws Exception { schedulerCommons.stopMetrics(); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java index b164316486..2835b413ca 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java @@ -63,6 +63,21 @@ public void handle(SchedulerEvent schedulerEvent) { schedulerCommons.handle(schedulerEvent); } + @Override + public void propagatedHandle(SchedulerEvent schedulerEvent) { + super.handle(schedulerEvent); + } + + @Override + public Allocation allocatePropagated(ApplicationAttemptId attemptId, + List resourceRequests, + List schedulingRequests, + List containerIds, List blacklistAdditions, + List blacklistRemovals, ContainerUpdates updateRequests) { + return super.allocate(attemptId, resourceRequests, schedulingRequests, + containerIds, blacklistAdditions, blacklistRemovals, updateRequests); + } + @Override public void serviceStop() throws Exception { schedulerCommons.stopMetrics(); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java index 7132fc9e62..d83fe5c6d0 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java @@ -100,7 +100,8 @@ public Allocation allocate(ApplicationAttemptId attemptId, .time(); Allocation allocation = null; try { - allocation = scheduler.allocate(attemptId, resourceRequests, + allocation = ((SchedulerWrapper)scheduler).allocatePropagated( + attemptId, resourceRequests, schedulingRequests, containerIds, blacklistAdditions, blacklistRemovals, updateRequests); return allocation; @@ -118,7 +119,8 @@ public Allocation allocate(ApplicationAttemptId attemptId, } } } else { - return scheduler.allocate(attemptId, resourceRequests, schedulingRequests, + return ((SchedulerWrapper)scheduler).allocatePropagated( + attemptId, resourceRequests, schedulingRequests, containerIds, blacklistAdditions, blacklistRemovals, updateRequests); } @@ -204,7 +206,7 @@ private void updateQueueWithAllocateRequest(Allocation allocation, public void handle(SchedulerEvent schedulerEvent) { if (!metricsON) { - scheduler.handle(schedulerEvent); + ((SchedulerWrapper)scheduler).propagatedHandle(schedulerEvent); return; } @@ -245,7 +247,7 @@ public void handle(SchedulerEvent schedulerEvent) { operationTimer = schedulerMetrics.getSchedulerHandleTimer( schedulerEvent.getType()).time(); - scheduler.handle(schedulerEvent); + ((SchedulerWrapper)scheduler).propagatedHandle(schedulerEvent); } finally { if (handlerTimer != null) { handlerTimer.stop(); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java index 7112b1a6fd..5ee088d792 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java @@ -19,7 +19,16 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; + +import java.util.List; @Private @Unstable @@ -29,4 +38,14 @@ public interface SchedulerWrapper { Tracker getTracker(); String getRealQueueName(String queue) throws YarnException; + + void propagatedHandle(SchedulerEvent schedulerEvent); + + Allocation allocatePropagated(ApplicationAttemptId attemptId, + List resourceRequests, + List schedulingRequests, + List containerIds, + List blacklistAdditions, + List blacklistRemovals, + ContainerUpdates updateRequests); }