From 526142447abdee02b86820d884b577b23b769663 Mon Sep 17 00:00:00 2001 From: 9uapaw Date: Thu, 24 Mar 2022 16:24:33 +0100 Subject: [PATCH] YARN-10552. Eliminate code duplication in SLSCapacityScheduler and SLSFairScheduler. Contributed by Szilard Nemeth. --- .../sls/scheduler/SLSCapacityScheduler.java | 332 ++--------------- .../yarn/sls/scheduler/SLSFairScheduler.java | 291 +-------------- .../sls/scheduler/SLSSchedulerCommons.java | 343 ++++++++++++++++++ 3 files changed, 388 insertions(+), 578 deletions(-) create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index e0cb15138f..b6fe5c0f96 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -17,12 +17,7 @@ */ package org.apache.hadoop.yarn.sls.scheduler; -import java.io.IOException; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -30,119 +25,51 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; -import org.apache.hadoop.yarn.sls.SLSRunner; -import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.codahale.metrics.Timer; @Private @Unstable public class SLSCapacityScheduler extends CapacityScheduler implements SchedulerWrapper,Configurable { + + private final SLSSchedulerCommons schedulerCommons; private Configuration conf; - - private Map appQueueMap = - new ConcurrentHashMap(); - - private Map preemptionContainerMap = - new ConcurrentHashMap(); - - // metrics - private SchedulerMetrics schedulerMetrics; - private boolean metricsON; - private Tracker tracker; - - // logger - private static final Logger LOG = LoggerFactory.getLogger(SLSCapacityScheduler.class); - - public Tracker getTracker() { - return tracker; - } public SLSCapacityScheduler() { - tracker = new Tracker(); + schedulerCommons = new SLSSchedulerCommons(this); } @Override public void setConf(Configuration conf) { this.conf = conf; super.setConf(conf); - metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true); - if (metricsON) { - try { - schedulerMetrics = SchedulerMetrics.getInstance(conf, - CapacityScheduler.class); - schedulerMetrics.init(this, conf); - } catch (Exception e) { - LOG.error("Caught exception while initializing schedulerMetrics", e); - } - } + schedulerCommons.initMetrics(CapacityScheduler.class, conf); } @Override public Allocation allocate(ApplicationAttemptId attemptId, List resourceRequests, List schedulingRequests, List containerIds, - List strings, List strings2, ContainerUpdates updateRequests) { - if (metricsON) { - final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer() - .time(); - Allocation allocation = null; - try { - allocation = super - .allocate(attemptId, resourceRequests, schedulingRequests, - containerIds, strings, - strings2, updateRequests); - return allocation; - } catch (Exception e) { - LOG.error("Caught exception from allocate", e); - throw e; - } finally { - context.stop(); - schedulerMetrics.increaseSchedulerAllocationCounter(); - try { - updateQueueWithAllocateRequest(allocation, attemptId, - resourceRequests, containerIds); - } catch (IOException e) { - LOG.error("Caught exception while executing finally block", e); - } - } - } else { - return super.allocate(attemptId, resourceRequests, schedulingRequests, - containerIds, strings, - strings2, updateRequests); - } + List blacklistAdditions, List blacklistRemovals, + ContainerUpdates updateRequests) { + return schedulerCommons.allocate(attemptId, resourceRequests, schedulingRequests, + containerIds, blacklistAdditions, blacklistRemovals, updateRequests); } @Override public boolean tryCommit(Resource cluster, ResourceCommitRequest r, boolean updatePending) { - if (metricsON) { + if (schedulerCommons.isMetricsON()) { boolean isSuccess = false; long startTimeNs = System.nanoTime(); try { @@ -151,13 +78,13 @@ public boolean tryCommit(Resource cluster, ResourceCommitRequest r, } finally { long elapsedNs = System.nanoTime() - startTimeNs; if (isSuccess) { - schedulerMetrics.getSchedulerCommitSuccessTimer() + getSchedulerMetrics().getSchedulerCommitSuccessTimer() .update(elapsedNs, TimeUnit.NANOSECONDS); - schedulerMetrics.increaseSchedulerCommitSuccessCounter(); + getSchedulerMetrics().increaseSchedulerCommitSuccessCounter(); } else { - schedulerMetrics.getSchedulerCommitFailureTimer() + getSchedulerMetrics().getSchedulerCommitFailureTimer() .update(elapsedNs, TimeUnit.NANOSECONDS); - schedulerMetrics.increaseSchedulerCommitFailureCounter(); + getSchedulerMetrics().increaseSchedulerCommitFailureCounter(); } } } else { @@ -167,229 +94,16 @@ public boolean tryCommit(Resource cluster, ResourceCommitRequest r, @Override public void handle(SchedulerEvent schedulerEvent) { - if (!metricsON) { - super.handle(schedulerEvent); - return; - } - - if (!schedulerMetrics.isRunning()) { - schedulerMetrics.setRunning(true); - } - - Timer.Context handlerTimer = null; - Timer.Context operationTimer = null; - - NodeUpdateSchedulerEventWrapper eventWrapper; - try { - if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE - && schedulerEvent instanceof NodeUpdateSchedulerEvent) { - eventWrapper = new NodeUpdateSchedulerEventWrapper( - (NodeUpdateSchedulerEvent)schedulerEvent); - schedulerEvent = eventWrapper; - updateQueueWithNodeUpdate(eventWrapper); - } else if (schedulerEvent.getType() == - SchedulerEventType.APP_ATTEMPT_REMOVED - && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { - // check if having AM Container, update resource usage information - AppAttemptRemovedSchedulerEvent appRemoveEvent = - (AppAttemptRemovedSchedulerEvent) schedulerEvent; - ApplicationAttemptId appAttemptId = - appRemoveEvent.getApplicationAttemptID(); - String queue = appQueueMap.get(appAttemptId); - SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId); - if (!app.getLiveContainers().isEmpty()) { // have 0 or 1 - // should have one container which is AM container - RMContainer rmc = app.getLiveContainers().iterator().next(); - schedulerMetrics.updateQueueMetricsByRelease( - rmc.getContainer().getResource(), queue); - } - } - - handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time(); - operationTimer = schedulerMetrics.getSchedulerHandleTimer( - schedulerEvent.getType()).time(); - - super.handle(schedulerEvent); - } finally { - if (handlerTimer != null) { - handlerTimer.stop(); - } - if (operationTimer != null) { - operationTimer.stop(); - } - schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType()); - - if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED - && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { - SLSRunner.decreaseRemainingApps(); - AppAttemptRemovedSchedulerEvent appRemoveEvent = - (AppAttemptRemovedSchedulerEvent) schedulerEvent; - appQueueMap.remove(appRemoveEvent.getApplicationAttemptID()); - if (SLSRunner.getRemainingApps() == 0) { - try { - getSchedulerMetrics().tearDown(); - SLSRunner.exitSLSRunner(); - } catch (Exception e) { - LOG.error("Scheduler Metrics failed to tear down.", e); - } - } - } else if (schedulerEvent.getType() == - SchedulerEventType.APP_ATTEMPT_ADDED - && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) { - AppAttemptAddedSchedulerEvent appAddEvent = - (AppAttemptAddedSchedulerEvent) schedulerEvent; - SchedulerApplication app = - applications.get(appAddEvent.getApplicationAttemptId() - .getApplicationId()); - appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue() - .getQueueName()); - } - } - } - - private void updateQueueWithNodeUpdate( - NodeUpdateSchedulerEventWrapper eventWrapper) { - RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode(); - List containerList = node.getContainerUpdates(); - for (UpdatedContainerInfo info : containerList) { - for (ContainerStatus status : info.getCompletedContainers()) { - ContainerId containerId = status.getContainerId(); - SchedulerAppReport app = super.getSchedulerAppInfo( - containerId.getApplicationAttemptId()); - - if (app == null) { - // this happens for the AM container - // The app have already removed when the NM sends the release - // information. - continue; - } - - String queue = appQueueMap.get(containerId.getApplicationAttemptId()); - int releasedMemory = 0, releasedVCores = 0; - if (status.getExitStatus() == ContainerExitStatus.SUCCESS) { - for (RMContainer rmc : app.getLiveContainers()) { - if (rmc.getContainerId() == containerId) { - releasedMemory += rmc.getContainer().getResource().getMemorySize(); - releasedVCores += rmc.getContainer() - .getResource().getVirtualCores(); - break; - } - } - } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) { - if (preemptionContainerMap.containsKey(containerId)) { - Resource preResource = preemptionContainerMap.get(containerId); - releasedMemory += preResource.getMemorySize(); - releasedVCores += preResource.getVirtualCores(); - preemptionContainerMap.remove(containerId); - } - } - // update queue counters - schedulerMetrics.updateQueueMetricsByRelease( - Resource.newInstance(releasedMemory, releasedVCores), queue); - } - } - } - - private void updateQueueWithAllocateRequest(Allocation allocation, - ApplicationAttemptId attemptId, - List resourceRequests, - List containerIds) throws IOException { - // update queue information - Resource pendingResource = Resources.createResource(0, 0); - Resource allocatedResource = Resources.createResource(0, 0); - String queueName = appQueueMap.get(attemptId); - // container requested - for (ResourceRequest request : resourceRequests) { - if (request.getResourceName().equals(ResourceRequest.ANY)) { - Resources.addTo(pendingResource, - Resources.multiply(request.getCapability(), - request.getNumContainers())); - } - } - // container allocated - for (Container container : allocation.getContainers()) { - Resources.addTo(allocatedResource, container.getResource()); - Resources.subtractFrom(pendingResource, container.getResource()); - } - // container released from AM - SchedulerAppReport report = super.getSchedulerAppInfo(attemptId); - for (ContainerId containerId : containerIds) { - Container container = null; - for (RMContainer c : report.getLiveContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - // released allocated containers - Resources.subtractFrom(allocatedResource, container.getResource()); - } else { - for (RMContainer c : report.getReservedContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - // released reserved containers - Resources.subtractFrom(pendingResource, container.getResource()); - } - } - } - // containers released/preemption from scheduler - Set preemptionContainers = new HashSet(); - if (allocation.getContainerPreemptions() != null) { - preemptionContainers.addAll(allocation.getContainerPreemptions()); - } - if (allocation.getStrictContainerPreemptions() != null) { - preemptionContainers.addAll(allocation.getStrictContainerPreemptions()); - } - if (! preemptionContainers.isEmpty()) { - for (ContainerId containerId : preemptionContainers) { - if (! preemptionContainerMap.containsKey(containerId)) { - Container container = null; - for (RMContainer c : report.getLiveContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - preemptionContainerMap.put(containerId, container.getResource()); - } - } - - } - } - - // update metrics - schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource, - queueName); + schedulerCommons.handle(schedulerEvent); } @Override public void serviceStop() throws Exception { - try { - if (metricsON) { - schedulerMetrics.tearDown(); - } - } catch (Exception e) { - LOG.error("Caught exception while stopping service", e); - } + schedulerCommons.stopMetrics(); super.serviceStop(); } - public SchedulerMetrics getSchedulerMetrics() { - return schedulerMetrics; - } - - @Override - public Configuration getConf() { - return conf; - } - public String getRealQueueName(String queue) throws YarnException { if (getQueue(queue) == null) { throw new YarnException("Can't find the queue by the given name: " + queue @@ -397,4 +111,18 @@ public String getRealQueueName(String queue) throws YarnException { } return getQueue(queue).getQueuePath(); } -} \ No newline at end of file + + public SchedulerMetrics getSchedulerMetrics() { + return schedulerCommons.getSchedulerMetrics(); + } + + @Override + public Configuration getConf() { + return conf; + } + + public Tracker getTracker() { + return schedulerCommons.getTracker(); + } +} + diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java index 84549bc492..b164316486 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java @@ -17,84 +17,35 @@ */ package org.apache.hadoop.yarn.sls.scheduler; -import com.codahale.metrics.Timer; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.sls.SLSRunner; -import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; @Private @Unstable public class SLSFairScheduler extends FairScheduler implements SchedulerWrapper, Configurable { - private SchedulerMetrics schedulerMetrics; - private boolean metricsON; - private Tracker tracker; - - private Map preemptionContainerMap = - new ConcurrentHashMap<>(); - - // logger - private static final Logger LOG = - LoggerFactory.getLogger(SLSFairScheduler.class); - - public SchedulerMetrics getSchedulerMetrics() { - return schedulerMetrics; - } - - public Tracker getTracker() { - return tracker; - } + private final SLSSchedulerCommons schedulerCommons; public SLSFairScheduler() { - tracker = new Tracker(); + schedulerCommons = new SLSSchedulerCommons(this); } @Override public void setConf(Configuration conf) { super.setConfig(conf); - - metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true); - if (metricsON) { - try { - schedulerMetrics = SchedulerMetrics.getInstance(conf, - FairScheduler.class); - schedulerMetrics.init(this, conf); - } catch (Exception e) { - LOG.error("Caught exception while initializing schedulerMetrics", e); - } - } + schedulerCommons.initMetrics(FairScheduler.class, conf); } @Override @@ -103,237 +54,18 @@ public Allocation allocate(ApplicationAttemptId attemptId, List schedulingRequests, List containerIds, List blacklistAdditions, List blacklistRemovals, ContainerUpdates updateRequests) { - if (metricsON) { - final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer() - .time(); - Allocation allocation = null; - try { - allocation = super.allocate(attemptId, resourceRequests, - schedulingRequests, containerIds, - blacklistAdditions, blacklistRemovals, updateRequests); - return allocation; - } catch (Exception e) { - LOG.error("Caught exception from allocate", e); - throw e; - } finally { - context.stop(); - schedulerMetrics.increaseSchedulerAllocationCounter(); - try { - updateQueueWithAllocateRequest(allocation, attemptId, - resourceRequests, containerIds); - } catch (IOException e) { - LOG.error("Caught exception while executing finally block", e); - } - } - } else { - return super.allocate(attemptId, resourceRequests, schedulingRequests, - containerIds, - blacklistAdditions, blacklistRemovals, updateRequests); - } + return schedulerCommons.allocate(attemptId, resourceRequests, schedulingRequests, + containerIds, blacklistAdditions, blacklistRemovals, updateRequests); } @Override public void handle(SchedulerEvent schedulerEvent) { - // metrics off - if (!metricsON) { - super.handle(schedulerEvent); - return; - } - - // metrics on - if(!schedulerMetrics.isRunning()) { - schedulerMetrics.setRunning(true); - } - - Timer.Context handlerTimer = null; - Timer.Context operationTimer = null; - - NodeUpdateSchedulerEventWrapper eventWrapper; - try { - if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE - && schedulerEvent instanceof NodeUpdateSchedulerEvent) { - eventWrapper = new NodeUpdateSchedulerEventWrapper( - (NodeUpdateSchedulerEvent)schedulerEvent); - schedulerEvent = eventWrapper; - updateQueueWithNodeUpdate(eventWrapper); - } else if ( - schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED - && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { - // check if having AM Container, update resource usage information - AppAttemptRemovedSchedulerEvent appRemoveEvent = - (AppAttemptRemovedSchedulerEvent) schedulerEvent; - ApplicationAttemptId appAttemptId = - appRemoveEvent.getApplicationAttemptID(); - String queueName = getSchedulerApp(appAttemptId).getQueue().getName(); - SchedulerAppReport app = getSchedulerAppInfo(appAttemptId); - if (!app.getLiveContainers().isEmpty()) { // have 0 or 1 - // should have one container which is AM container - RMContainer rmc = app.getLiveContainers().iterator().next(); - schedulerMetrics.updateQueueMetricsByRelease( - rmc.getContainer().getResource(), queueName); - } - } - - handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time(); - operationTimer = schedulerMetrics.getSchedulerHandleTimer( - schedulerEvent.getType()).time(); - - super.handle(schedulerEvent); - } finally { - if (handlerTimer != null) { - handlerTimer.stop(); - } - if (operationTimer != null) { - operationTimer.stop(); - } - schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType()); - - if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED - && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { - SLSRunner.decreaseRemainingApps(); - if (SLSRunner.getRemainingApps() == 0) { - try { - getSchedulerMetrics().tearDown(); - SLSRunner.exitSLSRunner(); - } catch (Exception e) { - LOG.error("Scheduler Metrics failed to tear down.", e); - } - } - } - } - } - - private void updateQueueWithNodeUpdate( - NodeUpdateSchedulerEventWrapper eventWrapper) { - RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode(); - List containerList = node.getContainerUpdates(); - for (UpdatedContainerInfo info : containerList) { - for (ContainerStatus status : info.getCompletedContainers()) { - ContainerId containerId = status.getContainerId(); - SchedulerAppReport app = super.getSchedulerAppInfo( - containerId.getApplicationAttemptId()); - - if (app == null) { - // this happens for the AM container - // The app have already removed when the NM sends the release - // information. - continue; - } - - int releasedMemory = 0, releasedVCores = 0; - if (status.getExitStatus() == ContainerExitStatus.SUCCESS) { - for (RMContainer rmc : app.getLiveContainers()) { - if (rmc.getContainerId() == containerId) { - Resource resource = rmc.getContainer().getResource(); - releasedMemory += resource.getMemorySize(); - releasedVCores += resource.getVirtualCores(); - break; - } - } - } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) { - if (preemptionContainerMap.containsKey(containerId)) { - Resource preResource = preemptionContainerMap.get(containerId); - releasedMemory += preResource.getMemorySize(); - releasedVCores += preResource.getVirtualCores(); - preemptionContainerMap.remove(containerId); - } - } - // update queue counters - String queue = getSchedulerApp(containerId.getApplicationAttemptId()). - getQueueName(); - schedulerMetrics.updateQueueMetricsByRelease( - Resource.newInstance(releasedMemory, releasedVCores), queue); - } - } - } - - private void updateQueueWithAllocateRequest(Allocation allocation, - ApplicationAttemptId attemptId, - List resourceRequests, - List containerIds) throws IOException { - // update queue information - Resource pendingResource = Resources.createResource(0, 0); - Resource allocatedResource = Resources.createResource(0, 0); - // container requested - for (ResourceRequest request : resourceRequests) { - if (request.getResourceName().equals(ResourceRequest.ANY)) { - Resources.addTo(pendingResource, - Resources.multiply(request.getCapability(), - request.getNumContainers())); - } - } - // container allocated - for (Container container : allocation.getContainers()) { - Resources.addTo(allocatedResource, container.getResource()); - Resources.subtractFrom(pendingResource, container.getResource()); - } - // container released from AM - SchedulerAppReport report = super.getSchedulerAppInfo(attemptId); - for (ContainerId containerId : containerIds) { - Container container = null; - for (RMContainer c : report.getLiveContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - // released allocated containers - Resources.subtractFrom(allocatedResource, container.getResource()); - } else { - for (RMContainer c : report.getReservedContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - // released reserved containers - Resources.subtractFrom(pendingResource, container.getResource()); - } - } - } - // containers released/preemption from scheduler - Set preemptionContainers = new HashSet(); - if (allocation.getContainerPreemptions() != null) { - preemptionContainers.addAll(allocation.getContainerPreemptions()); - } - if (allocation.getStrictContainerPreemptions() != null) { - preemptionContainers.addAll(allocation.getStrictContainerPreemptions()); - } - if (!preemptionContainers.isEmpty()) { - for (ContainerId containerId : preemptionContainers) { - if (!preemptionContainerMap.containsKey(containerId)) { - Container container = null; - for (RMContainer c : report.getLiveContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - preemptionContainerMap.put(containerId, container.getResource()); - } - } - - } - } - - // update metrics - String queueName = getSchedulerApp(attemptId).getQueueName(); - schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource, - queueName); + schedulerCommons.handle(schedulerEvent); } @Override public void serviceStop() throws Exception { - try { - if (metricsON) { - schedulerMetrics.tearDown(); - } - } catch (Exception e) { - LOG.error("Caught exception while stopping service", e); - } + schedulerCommons.stopMetrics(); super.serviceStop(); } @@ -344,5 +76,12 @@ public String getRealQueueName(String queue) throws YarnException { } return getQueueManager().getQueue(queue).getQueueName(); } -} + public SchedulerMetrics getSchedulerMetrics() { + return schedulerCommons.getSchedulerMetrics(); + } + + public Tracker getTracker() { + return schedulerCommons.getTracker(); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java new file mode 100644 index 0000000000..92aa96069c --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls.scheduler; + +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.sls.SLSRunner; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class SLSSchedulerCommons { + private static final Logger LOG = LoggerFactory.getLogger(SLSSchedulerCommons.class); + + private AbstractYarnScheduler scheduler; + private boolean metricsON; + private SchedulerMetrics schedulerMetrics; + private Map preemptionContainerMap = + new ConcurrentHashMap<>(); + + private Map appQueueMap = + new ConcurrentHashMap<>(); + private Tracker tracker; + + public SLSSchedulerCommons(AbstractYarnScheduler scheduler) { + this.scheduler = scheduler; + this.tracker = new Tracker(); + } + + public void initMetrics(Class schedulerClass, Configuration conf) { + metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true); + if (metricsON) { + try { + schedulerMetrics = SchedulerMetrics.getInstance(conf, schedulerClass); + schedulerMetrics.init(scheduler, conf); + } catch (Exception e) { + LOG.error("Caught exception while initializing schedulerMetrics", e); + } + } + } + + void stopMetrics() { + try { + if (metricsON) { + schedulerMetrics.tearDown(); + } + } catch (Exception e) { + LOG.error("Caught exception while stopping service", e); + } + } + + public Allocation allocate(ApplicationAttemptId attemptId, + List resourceRequests, + List schedulingRequests, + List containerIds, + List blacklistAdditions, + List blacklistRemovals, + ContainerUpdates updateRequests) { + if (metricsON) { + final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer() + .time(); + Allocation allocation = null; + try { + allocation = scheduler.allocate(attemptId, resourceRequests, + schedulingRequests, containerIds, + blacklistAdditions, blacklistRemovals, updateRequests); + return allocation; + } catch (Exception e) { + LOG.error("Caught exception from allocate", e); + throw e; + } finally { + context.stop(); + schedulerMetrics.increaseSchedulerAllocationCounter(); + try { + updateQueueWithAllocateRequest(allocation, attemptId, + resourceRequests, containerIds); + } catch (IOException e) { + LOG.error("Caught exception while executing finally block", e); + } + } + } else { + return scheduler.allocate(attemptId, resourceRequests, schedulingRequests, + containerIds, + blacklistAdditions, blacklistRemovals, updateRequests); + } + } + + private void updateQueueWithAllocateRequest(Allocation allocation, + ApplicationAttemptId attemptId, + List resourceRequests, + List containerIds) throws IOException { + // update queue information + Resource pendingResource = Resources.createResource(0, 0); + Resource allocatedResource = Resources.createResource(0, 0); + String queueName = appQueueMap.get(attemptId); + // container requested + for (ResourceRequest request : resourceRequests) { + if (request.getResourceName().equals(ResourceRequest.ANY)) { + Resources.addTo(pendingResource, + Resources.multiply(request.getCapability(), + request.getNumContainers())); + } + } + // container allocated + for (Container container : allocation.getContainers()) { + Resources.addTo(allocatedResource, container.getResource()); + Resources.subtractFrom(pendingResource, container.getResource()); + } + // container released from AM + SchedulerAppReport report = scheduler.getSchedulerAppInfo(attemptId); + for (ContainerId containerId : containerIds) { + Container container = null; + for (RMContainer c : report.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + // released allocated containers + Resources.subtractFrom(allocatedResource, container.getResource()); + } else { + for (RMContainer c : report.getReservedContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + // released reserved containers + Resources.subtractFrom(pendingResource, container.getResource()); + } + } + } + // containers released/preemption from scheduler + Set preemptionContainers = new HashSet(); + if (allocation.getContainerPreemptions() != null) { + preemptionContainers.addAll(allocation.getContainerPreemptions()); + } + if (allocation.getStrictContainerPreemptions() != null) { + preemptionContainers.addAll(allocation.getStrictContainerPreemptions()); + } + if (!preemptionContainers.isEmpty()) { + for (ContainerId containerId : preemptionContainers) { + if (!preemptionContainerMap.containsKey(containerId)) { + Container container = null; + for (RMContainer c : report.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + preemptionContainerMap.put(containerId, container.getResource()); + } + } + + } + } + + // update metrics + schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource, + queueName); + } + + public void handle(SchedulerEvent schedulerEvent) { + if (!metricsON) { + scheduler.handle(schedulerEvent); + return; + } + + if (!schedulerMetrics.isRunning()) { + schedulerMetrics.setRunning(true); + } + + Timer.Context handlerTimer = null; + Timer.Context operationTimer = null; + + NodeUpdateSchedulerEventWrapper eventWrapper; + try { + if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE + && schedulerEvent instanceof NodeUpdateSchedulerEvent) { + eventWrapper = new NodeUpdateSchedulerEventWrapper( + (NodeUpdateSchedulerEvent) schedulerEvent); + schedulerEvent = eventWrapper; + updateQueueWithNodeUpdate(eventWrapper); + } else if ( + schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + // check if having AM Container, update resource usage information + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + ApplicationAttemptId appAttemptId = + appRemoveEvent.getApplicationAttemptID(); + String queue = appQueueMap.get(appAttemptId); + SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId); + if (!app.getLiveContainers().isEmpty()) { // have 0 or 1 + // should have one container which is AM container + RMContainer rmc = app.getLiveContainers().iterator().next(); + schedulerMetrics.updateQueueMetricsByRelease( + rmc.getContainer().getResource(), queue); + } + } + + handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time(); + operationTimer = schedulerMetrics.getSchedulerHandleTimer( + schedulerEvent.getType()).time(); + + scheduler.handle(schedulerEvent); + } finally { + if (handlerTimer != null) { + handlerTimer.stop(); + } + if (operationTimer != null) { + operationTimer.stop(); + } + schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType()); + + if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + SLSRunner.decreaseRemainingApps(); + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + appQueueMap.remove(appRemoveEvent.getApplicationAttemptID()); + if (SLSRunner.getRemainingApps() == 0) { + try { + schedulerMetrics.tearDown(); + SLSRunner.exitSLSRunner(); + } catch (Exception e) { + LOG.error("Scheduler Metrics failed to tear down.", e); + } + } + } else if (schedulerEvent.getType() == + SchedulerEventType.APP_ATTEMPT_ADDED + && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) { + AppAttemptAddedSchedulerEvent appAddEvent = + (AppAttemptAddedSchedulerEvent) schedulerEvent; + SchedulerApplication app = + (SchedulerApplication) scheduler.getSchedulerApplications() + .get(appAddEvent.getApplicationAttemptId().getApplicationId()); + appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue() + .getQueueName()); + } + } + } + + private void updateQueueWithNodeUpdate( + NodeUpdateSchedulerEventWrapper eventWrapper) { + RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode(); + List containerList = node.getContainerUpdates(); + for (UpdatedContainerInfo info : containerList) { + for (ContainerStatus status : info.getCompletedContainers()) { + ContainerId containerId = status.getContainerId(); + SchedulerAppReport app = scheduler.getSchedulerAppInfo( + containerId.getApplicationAttemptId()); + + if (app == null) { + // this happens for the AM container + // The app have already removed when the NM sends the release + // information. + continue; + } + + int releasedMemory = 0, releasedVCores = 0; + if (status.getExitStatus() == ContainerExitStatus.SUCCESS) { + for (RMContainer rmc : app.getLiveContainers()) { + if (rmc.getContainerId() == containerId) { + Resource resource = rmc.getContainer().getResource(); + releasedMemory += resource.getMemorySize(); + releasedVCores += resource.getVirtualCores(); + break; + } + } + } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) { + if (preemptionContainerMap.containsKey(containerId)) { + Resource preResource = preemptionContainerMap.get(containerId); + releasedMemory += preResource.getMemorySize(); + releasedVCores += preResource.getVirtualCores(); + preemptionContainerMap.remove(containerId); + } + } + // update queue counters + String queue = appQueueMap.get(containerId.getApplicationAttemptId()); + schedulerMetrics.updateQueueMetricsByRelease( + Resource.newInstance(releasedMemory, releasedVCores), queue); + } + } + } + + public SchedulerMetrics getSchedulerMetrics() { + return schedulerMetrics; + } + + public boolean isMetricsON() { + return metricsON; + } + + public Tracker getTracker() { + return tracker; + } +}