diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 61b7f36ee9..ba43816de1 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -59,16 +59,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; -import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; -import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper; -import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; -import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; -import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.scheduler.*; import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Logger; @@ -152,9 +150,9 @@ public class SLSRunner { // start application masters startAM(); // set queue & tracked apps information - ((SchedulerWrapper) rm.getResourceScheduler()) + ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() .setQueueSet(this.queueAppNumMap.keySet()); - ((SchedulerWrapper) rm.getResourceScheduler()) + ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() .setTrackedAppSet(this.trackedApps); // print out simulation info printSimulationInfo(); @@ -164,7 +162,7 @@ public class SLSRunner { runner.start(); } - private void startRM() throws IOException, ClassNotFoundException { + private void startRM() throws Exception { Configuration rmConf = new YarnConfiguration(); String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); @@ -175,10 +173,12 @@ public class SLSRunner { if(Class.forName(schedulerClass) == CapacityScheduler.class) { rmConf.set(YarnConfiguration.RM_SCHEDULER, SLSCapacityScheduler.class.getName()); - } else { + } else if (Class.forName(schedulerClass) == FairScheduler.class) { rmConf.set(YarnConfiguration.RM_SCHEDULER, - ResourceSchedulerWrapper.class.getName()); - rmConf.set(SLSConfiguration.RM_SCHEDULER, schedulerClass); + SLSFairScheduler.class.getName()); + } else if (Class.forName(schedulerClass) == FifoScheduler.class){ + // TODO add support for FifoScheduler + throw new Exception("Fifo Scheduler is not supported yet."); } rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 0573bae0ec..a62f2b6024 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Logger; @@ -219,10 +220,12 @@ public abstract class AMSimulator extends TaskRunner.Task { simulateFinishTimeMS = System.currentTimeMillis() - SLSRunner.getRunner().getStartTimeMS(); // record job running information - ((SchedulerWrapper)rm.getResourceScheduler()) - .addAMRuntime(appId, - traceStartTimeMS, traceFinishTimeMS, - simulateStartTimeMS, simulateFinishTimeMS); + SchedulerMetrics schedulerMetrics = + ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); + if (schedulerMetrics != null) { + schedulerMetrics.addAMRuntime(appId, traceStartTimeMS, traceFinishTimeMS, + simulateStartTimeMS, simulateFinishTimeMS); + } } protected ResourceRequest createResourceRequest( @@ -330,13 +333,20 @@ public abstract class AMSimulator extends TaskRunner.Task { private void trackApp() { if (isTracked) { - ((SchedulerWrapper) rm.getResourceScheduler()) - .addTrackedApp(appId, oldAppId); + SchedulerMetrics schedulerMetrics = + ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); + if (schedulerMetrics != null) { + schedulerMetrics.addTrackedApp(appId, oldAppId); + } } } public void untrackApp() { if (isTracked) { - ((SchedulerWrapper) rm.getResourceScheduler()).removeTrackedApp(oldAppId); + SchedulerMetrics schedulerMetrics = + ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); + if (schedulerMetrics != null) { + schedulerMetrics.removeTrackedApp(oldAppId); + } } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java deleted file mode 100644 index a4b8e642ba..0000000000 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ /dev/null @@ -1,969 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.sls.scheduler; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.ShutdownHookManager; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.QueueACL; -import org.apache.hadoop.yarn.api.records.QueueInfo; -import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; -import org.apache.hadoop.yarn.sls.SLSRunner; -import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; -import org.apache.hadoop.yarn.sls.web.SLSWebApp; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.log4j.Logger; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.CsvReporter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SlidingWindowReservoir; -import com.codahale.metrics.Timer; - -@Private -@Unstable -final public class ResourceSchedulerWrapper - extends AbstractYarnScheduler - implements SchedulerWrapper, ResourceScheduler, Configurable { - private static final String EOL = System.getProperty("line.separator"); - private static final int SAMPLING_SIZE = 60; - private ScheduledExecutorService pool; - // counters for scheduler allocate/handle operations - private Counter schedulerAllocateCounter; - private Counter schedulerHandleCounter; - private Map schedulerHandleCounterMap; - // Timers for scheduler allocate/handle operations - private Timer schedulerAllocateTimer; - private Timer schedulerHandleTimer; - private Map schedulerHandleTimerMap; - private List schedulerHistogramList; - private Map histogramTimerMap; - private Lock samplerLock; - private Lock queueLock; - - private Configuration conf; - private ResourceScheduler scheduler; - private Map appQueueMap = - new ConcurrentHashMap(); - private BufferedWriter jobRuntimeLogBW; - - // Priority of the ResourceSchedulerWrapper shutdown hook. - public static final int SHUTDOWN_HOOK_PRIORITY = 30; - - // web app - private SLSWebApp web; - - private Map preemptionContainerMap = - new ConcurrentHashMap(); - - // metrics - private MetricRegistry metrics; - private SchedulerMetrics schedulerMetrics; - private boolean metricsON; - private String metricsOutputDir; - private BufferedWriter metricsLogBW; - private boolean running = false; - private static Map defaultSchedulerMetricsMap = - new HashMap(); - static { - defaultSchedulerMetricsMap.put(FairScheduler.class, - FairSchedulerMetrics.class); - defaultSchedulerMetricsMap.put(FifoScheduler.class, - FifoSchedulerMetrics.class); - defaultSchedulerMetricsMap.put(CapacityScheduler.class, - CapacitySchedulerMetrics.class); - } - // must set by outside - private Set queueSet; - private Set trackedAppSet; - - public final Logger LOG = Logger.getLogger(ResourceSchedulerWrapper.class); - - public ResourceSchedulerWrapper() { - super(ResourceSchedulerWrapper.class.getName()); - samplerLock = new ReentrantLock(); - queueLock = new ReentrantLock(); - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - // set scheduler - Class klass = conf.getClass( - SLSConfiguration.RM_SCHEDULER, null, ResourceScheduler.class); - - scheduler = ReflectionUtils.newInstance(klass, conf); - // start metrics - metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true); - if (metricsON) { - try { - initMetrics(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - ShutdownHookManager.get().addShutdownHook(new Runnable() { - @Override - public void run() { - try { - if (metricsLogBW != null) { - metricsLogBW.write("]"); - metricsLogBW.close(); - } - if (web != null) { - web.stop(); - } - tearDown(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }, SHUTDOWN_HOOK_PRIORITY); - } - - @Override - public Allocation allocate(ApplicationAttemptId attemptId, - List resourceRequests, List containerIds, - List strings, List strings2, - ContainerUpdates updateRequests) { - if (metricsON) { - final Timer.Context context = schedulerAllocateTimer.time(); - Allocation allocation = null; - try { - allocation = scheduler.allocate(attemptId, resourceRequests, - containerIds, strings, strings2, updateRequests); - return allocation; - } finally { - context.stop(); - schedulerAllocateCounter.inc(); - try { - updateQueueWithAllocateRequest(allocation, attemptId, - resourceRequests, containerIds); - } catch (IOException e) { - e.printStackTrace(); - } - } - } else { - return scheduler.allocate(attemptId, - resourceRequests, containerIds, strings, strings2, updateRequests); - } - } - - @Override - public void handle(SchedulerEvent schedulerEvent) { - // metrics off - if (! metricsON) { - scheduler.handle(schedulerEvent); - return; - } - if(!running) running = true; - - // metrics on - Timer.Context handlerTimer = null; - Timer.Context operationTimer = null; - - NodeUpdateSchedulerEventWrapper eventWrapper; - try { - //if (schedulerEvent instanceof NodeUpdateSchedulerEvent) { - if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE - && schedulerEvent instanceof NodeUpdateSchedulerEvent) { - eventWrapper = new NodeUpdateSchedulerEventWrapper( - (NodeUpdateSchedulerEvent)schedulerEvent); - schedulerEvent = eventWrapper; - updateQueueWithNodeUpdate(eventWrapper); - } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED - && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { - // check if having AM Container, update resource usage information - AppAttemptRemovedSchedulerEvent appRemoveEvent = - (AppAttemptRemovedSchedulerEvent) schedulerEvent; - ApplicationAttemptId appAttemptId = - appRemoveEvent.getApplicationAttemptID(); - String queue = appQueueMap.get(appAttemptId.getApplicationId()); - SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId); - if (! app.getLiveContainers().isEmpty()) { // have 0 or 1 - // should have one container which is AM container - RMContainer rmc = app.getLiveContainers().iterator().next(); - updateQueueMetrics(queue, - rmc.getContainer().getResource().getMemorySize(), - rmc.getContainer().getResource().getVirtualCores()); - } - } - - handlerTimer = schedulerHandleTimer.time(); - operationTimer = schedulerHandleTimerMap - .get(schedulerEvent.getType()).time(); - - scheduler.handle(schedulerEvent); - } finally { - if (handlerTimer != null) handlerTimer.stop(); - if (operationTimer != null) operationTimer.stop(); - schedulerHandleCounter.inc(); - schedulerHandleCounterMap.get(schedulerEvent.getType()).inc(); - - if (schedulerEvent.getType() == SchedulerEventType.APP_REMOVED - && schedulerEvent instanceof AppRemovedSchedulerEvent) { - SLSRunner.decreaseRemainingApps(); - AppRemovedSchedulerEvent appRemoveEvent = - (AppRemovedSchedulerEvent) schedulerEvent; - appQueueMap.remove(appRemoveEvent.getApplicationID()); - } else if (schedulerEvent.getType() == SchedulerEventType.APP_ADDED - && schedulerEvent instanceof AppAddedSchedulerEvent) { - AppAddedSchedulerEvent appAddEvent = - (AppAddedSchedulerEvent) schedulerEvent; - String queueName = appAddEvent.getQueue(); - appQueueMap.put(appAddEvent.getApplicationId(), queueName); - } - } - } - - private void updateQueueWithNodeUpdate( - NodeUpdateSchedulerEventWrapper eventWrapper) { - RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode(); - List containerList = node.getContainerUpdates(); - for (UpdatedContainerInfo info : containerList) { - for (ContainerStatus status : info.getCompletedContainers()) { - ContainerId containerId = status.getContainerId(); - SchedulerAppReport app = scheduler.getSchedulerAppInfo( - containerId.getApplicationAttemptId()); - - if (app == null) { - // this happens for the AM container - // The app have already removed when the NM sends the release - // information. - continue; - } - - String queue = - appQueueMap.get(containerId.getApplicationAttemptId() - .getApplicationId()); - int releasedMemory = 0, releasedVCores = 0; - if (status.getExitStatus() == ContainerExitStatus.SUCCESS) { - for (RMContainer rmc : app.getLiveContainers()) { - if (rmc.getContainerId() == containerId) { - releasedMemory += rmc.getContainer().getResource().getMemorySize(); - releasedVCores += rmc.getContainer() - .getResource().getVirtualCores(); - break; - } - } - } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) { - if (preemptionContainerMap.containsKey(containerId)) { - Resource preResource = preemptionContainerMap.get(containerId); - releasedMemory += preResource.getMemorySize(); - releasedVCores += preResource.getVirtualCores(); - preemptionContainerMap.remove(containerId); - } - } - // update queue counters - updateQueueMetrics(queue, releasedMemory, releasedVCores); - } - } - } - - private void updateQueueWithAllocateRequest(Allocation allocation, - ApplicationAttemptId attemptId, - List resourceRequests, - List containerIds) throws IOException { - // update queue information - Resource pendingResource = Resources.createResource(0, 0); - Resource allocatedResource = Resources.createResource(0, 0); - String queueName = appQueueMap.get(attemptId.getApplicationId()); - // container requested - for (ResourceRequest request : resourceRequests) { - if (request.getResourceName().equals(ResourceRequest.ANY)) { - Resources.addTo(pendingResource, - Resources.multiply(request.getCapability(), - request.getNumContainers())); - } - } - // container allocated - for (Container container : allocation.getContainers()) { - Resources.addTo(allocatedResource, container.getResource()); - Resources.subtractFrom(pendingResource, container.getResource()); - } - // container released from AM - SchedulerAppReport report = scheduler.getSchedulerAppInfo(attemptId); - for (ContainerId containerId : containerIds) { - Container container = null; - for (RMContainer c : report.getLiveContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - // released allocated containers - Resources.subtractFrom(allocatedResource, container.getResource()); - } else { - for (RMContainer c : report.getReservedContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - // released reserved containers - Resources.subtractFrom(pendingResource, container.getResource()); - } - } - } - // containers released/preemption from scheduler - Set preemptionContainers = new HashSet(); - if (allocation.getContainerPreemptions() != null) { - preemptionContainers.addAll(allocation.getContainerPreemptions()); - } - if (allocation.getStrictContainerPreemptions() != null) { - preemptionContainers.addAll(allocation.getStrictContainerPreemptions()); - } - if (! preemptionContainers.isEmpty()) { - for (ContainerId containerId : preemptionContainers) { - if (! preemptionContainerMap.containsKey(containerId)) { - Container container = null; - for (RMContainer c : report.getLiveContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - preemptionContainerMap.put(containerId, container.getResource()); - } - } - - } - } - - // update metrics - SortedMap counterMap = metrics.getCounters(); - String names[] = new String[]{ - "counter.queue." + queueName + ".pending.memory", - "counter.queue." + queueName + ".pending.cores", - "counter.queue." + queueName + ".allocated.memory", - "counter.queue." + queueName + ".allocated.cores"}; - long values[] = new long[]{pendingResource.getMemorySize(), - pendingResource.getVirtualCores(), - allocatedResource.getMemorySize(), allocatedResource.getVirtualCores()}; - for (int i = names.length - 1; i >= 0; i --) { - if (! counterMap.containsKey(names[i])) { - metrics.counter(names[i]); - counterMap = metrics.getCounters(); - } - counterMap.get(names[i]).inc(values[i]); - } - - queueLock.lock(); - try { - if (! schedulerMetrics.isTracked(queueName)) { - schedulerMetrics.trackQueue(queueName); - } - } finally { - queueLock.unlock(); - } - } - - private void tearDown() throws IOException { - // close job runtime writer - if (jobRuntimeLogBW != null) { - jobRuntimeLogBW.close(); - } - // shut pool - if (pool != null) pool.shutdown(); - } - - @SuppressWarnings("unchecked") - private void initMetrics() throws Exception { - metrics = new MetricRegistry(); - // configuration - metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR); - int metricsWebAddressPort = conf.getInt( - SLSConfiguration.METRICS_WEB_ADDRESS_PORT, - SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT); - // create SchedulerMetrics for current scheduler - String schedulerMetricsType = conf.get(scheduler.getClass().getName()); - Class schedulerMetricsClass = schedulerMetricsType == null? - defaultSchedulerMetricsMap.get(scheduler.getClass()) : - Class.forName(schedulerMetricsType); - schedulerMetrics = (SchedulerMetrics)ReflectionUtils - .newInstance(schedulerMetricsClass, new Configuration()); - schedulerMetrics.init(scheduler, metrics); - - // register various metrics - registerJvmMetrics(); - registerClusterResourceMetrics(); - registerContainerAppNumMetrics(); - registerSchedulerMetrics(); - - // .csv output - initMetricsCSVOutput(); - - // start web app to provide real-time tracking - web = new SLSWebApp(this, metricsWebAddressPort); - web.start(); - - // a thread to update histogram timer - pool = new ScheduledThreadPoolExecutor(2); - pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000, - TimeUnit.MILLISECONDS); - - // a thread to output metrics for real-tiem tracking - pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000, - TimeUnit.MILLISECONDS); - - // application running information - jobRuntimeLogBW = - new BufferedWriter(new OutputStreamWriter(new FileOutputStream( - metricsOutputDir + "/jobruntime.csv"), "UTF-8")); - jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," + - "simulate_start_time,simulate_end_time" + EOL); - jobRuntimeLogBW.flush(); - } - - private void registerJvmMetrics() { - // add JVM gauges - metrics.register("variable.jvm.free.memory", - new Gauge() { - @Override - public Long getValue() { - return Runtime.getRuntime().freeMemory(); - } - } - ); - metrics.register("variable.jvm.max.memory", - new Gauge() { - @Override - public Long getValue() { - return Runtime.getRuntime().maxMemory(); - } - } - ); - metrics.register("variable.jvm.total.memory", - new Gauge() { - @Override - public Long getValue() { - return Runtime.getRuntime().totalMemory(); - } - } - ); - } - - private void registerClusterResourceMetrics() { - metrics.register("variable.cluster.allocated.memory", - new Gauge() { - @Override - public Long getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0L; - } else { - return scheduler.getRootQueueMetrics().getAllocatedMB(); - } - } - } - ); - metrics.register("variable.cluster.allocated.vcores", - new Gauge() { - @Override - public Integer getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0; - } else { - return scheduler.getRootQueueMetrics().getAllocatedVirtualCores(); - } - } - } - ); - metrics.register("variable.cluster.available.memory", - new Gauge() { - @Override - public Long getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0L; - } else { - return scheduler.getRootQueueMetrics().getAvailableMB(); - } - } - } - ); - metrics.register("variable.cluster.available.vcores", - new Gauge() { - @Override - public Integer getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0; - } else { - return scheduler.getRootQueueMetrics().getAvailableVirtualCores(); - } - } - } - ); - } - - private void registerContainerAppNumMetrics() { - metrics.register("variable.running.application", - new Gauge() { - @Override - public Integer getValue() { - if (scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0; - } else { - return scheduler.getRootQueueMetrics().getAppsRunning(); - } - } - } - ); - metrics.register("variable.running.container", - new Gauge() { - @Override - public Integer getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0; - } else { - return scheduler.getRootQueueMetrics().getAllocatedContainers(); - } - } - } - ); - } - - private void registerSchedulerMetrics() { - samplerLock.lock(); - try { - // counters for scheduler operations - schedulerAllocateCounter = metrics.counter( - "counter.scheduler.operation.allocate"); - schedulerHandleCounter = metrics.counter( - "counter.scheduler.operation.handle"); - schedulerHandleCounterMap = new HashMap(); - for (SchedulerEventType e : SchedulerEventType.values()) { - Counter counter = metrics.counter( - "counter.scheduler.operation.handle." + e); - schedulerHandleCounterMap.put(e, counter); - } - // timers for scheduler operations - int timeWindowSize = conf.getInt( - SLSConfiguration.METRICS_TIMER_WINDOW_SIZE, - SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT); - schedulerAllocateTimer = new Timer( - new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimer = new Timer( - new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimerMap = new HashMap(); - for (SchedulerEventType e : SchedulerEventType.values()) { - Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimerMap.put(e, timer); - } - // histogram for scheduler operations (Samplers) - schedulerHistogramList = new ArrayList(); - histogramTimerMap = new HashMap(); - Histogram schedulerAllocateHistogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register("sampler.scheduler.operation.allocate.timecost", - schedulerAllocateHistogram); - schedulerHistogramList.add(schedulerAllocateHistogram); - histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer); - Histogram schedulerHandleHistogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register("sampler.scheduler.operation.handle.timecost", - schedulerHandleHistogram); - schedulerHistogramList.add(schedulerHandleHistogram); - histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer); - for (SchedulerEventType e : SchedulerEventType.values()) { - Histogram histogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register( - "sampler.scheduler.operation.handle." + e + ".timecost", - histogram); - schedulerHistogramList.add(histogram); - histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e)); - } - } finally { - samplerLock.unlock(); - } - } - - private void initMetricsCSVOutput() { - int timeIntervalMS = conf.getInt( - SLSConfiguration.METRICS_RECORD_INTERVAL_MS, - SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT); - File dir = new File(metricsOutputDir + "/metrics"); - if(! dir.exists() - && ! dir.mkdirs()) { - LOG.error("Cannot create directory " + dir.getAbsoluteFile()); - } - final CsvReporter reporter = CsvReporter.forRegistry(metrics) - .formatFor(Locale.US) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build(new File(metricsOutputDir + "/metrics")); - reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS); - } - - class HistogramsRunnable implements Runnable { - @Override - public void run() { - samplerLock.lock(); - try { - for (Histogram histogram : schedulerHistogramList) { - Timer timer = histogramTimerMap.get(histogram); - histogram.update((int) timer.getSnapshot().getMean()); - } - } finally { - samplerLock.unlock(); - } - } - } - - class MetricsLogRunnable implements Runnable { - private boolean firstLine = true; - public MetricsLogRunnable() { - try { - metricsLogBW = - new BufferedWriter(new OutputStreamWriter(new FileOutputStream( - metricsOutputDir + "/realtimetrack.json"), "UTF-8")); - metricsLogBW.write("["); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - public void run() { - if(running) { - // all WebApp to get real tracking json - String metrics = web.generateRealTimeTrackingMetrics(); - // output - try { - if(firstLine) { - metricsLogBW.write(metrics + EOL); - firstLine = false; - } else { - metricsLogBW.write("," + metrics + EOL); - } - metricsLogBW.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - } - - // the following functions are used by AMSimulator - public void addAMRuntime(ApplicationId appId, - long traceStartTimeMS, long traceEndTimeMS, - long simulateStartTimeMS, long simulateEndTimeMS) { - if (metricsON) { - try { - // write job runtime information - StringBuilder sb = new StringBuilder(); - sb.append(appId).append(",").append(traceStartTimeMS).append(",") - .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) - .append(",").append(simulateEndTimeMS); - jobRuntimeLogBW.write(sb.toString() + EOL); - jobRuntimeLogBW.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - private void updateQueueMetrics(String queue, - long releasedMemory, int releasedVCores) { - // update queue counters - SortedMap counterMap = metrics.getCounters(); - if (releasedMemory != 0) { - String name = "counter.queue." + queue + ".allocated.memory"; - if (! counterMap.containsKey(name)) { - metrics.counter(name); - counterMap = metrics.getCounters(); - } - counterMap.get(name).inc(-releasedMemory); - } - if (releasedVCores != 0) { - String name = "counter.queue." + queue + ".allocated.cores"; - if (! counterMap.containsKey(name)) { - metrics.counter(name); - counterMap = metrics.getCounters(); - } - counterMap.get(name).inc(-releasedVCores); - } - } - - public void setQueueSet(Set queues) { - this.queueSet = queues; - } - - public Set getQueueSet() { - return this.queueSet; - } - - public void setTrackedAppSet(Set apps) { - this.trackedAppSet = apps; - } - - public Set getTrackedAppSet() { - return this.trackedAppSet; - } - - public MetricRegistry getMetrics() { - return metrics; - } - - public SchedulerMetrics getSchedulerMetrics() { - return schedulerMetrics; - } - - // API open to out classes - public void addTrackedApp(ApplicationId appId, String oldAppId) { - if (metricsON) { - schedulerMetrics.trackApp(appId, oldAppId); - } - } - - public void removeTrackedApp(String oldAppId) { - if (metricsON) { - schedulerMetrics.untrackApp(oldAppId); - } - } - - @Override - public Configuration getConf() { - return conf; - } - - @SuppressWarnings("unchecked") - @Override - public void serviceInit(Configuration conf) throws Exception { - ((AbstractYarnScheduler) - scheduler).init(conf); - super.serviceInit(conf); - initScheduler(conf); - } - - private synchronized void initScheduler(Configuration configuration) throws - IOException { - this.applications = - new ConcurrentHashMap>(); - } - - @SuppressWarnings("unchecked") - @Override - public void serviceStart() throws Exception { - ((AbstractYarnScheduler) - scheduler).start(); - super.serviceStart(); - } - - @SuppressWarnings("unchecked") - @Override - public void serviceStop() throws Exception { - ((AbstractYarnScheduler) - scheduler).stop(); - super.serviceStop(); - } - - @Override - public void setRMContext(RMContext rmContext) { - scheduler.setRMContext(rmContext); - } - - @Override - public void reinitialize(Configuration conf, RMContext rmContext) - throws IOException { - scheduler.reinitialize(conf, rmContext); - } - - @Override - public void recover(RMStateStore.RMState rmState) throws Exception { - scheduler.recover(rmState); - } - - @Override - public QueueInfo getQueueInfo(String s, boolean b, boolean b2) - throws IOException { - return scheduler.getQueueInfo(s, b, b2); - } - - @Override - public List getQueueUserAclInfo() { - return scheduler.getQueueUserAclInfo(); - } - - @Override - public Resource getMinimumResourceCapability() { - return scheduler.getMinimumResourceCapability(); - } - - @Override - public Resource getMaximumResourceCapability() { - return scheduler.getMaximumResourceCapability(); - } - - @Override - public ResourceCalculator getResourceCalculator() { - return scheduler.getResourceCalculator(); - } - - @Override - public int getNumClusterNodes() { - return scheduler.getNumClusterNodes(); - } - - @Override - public SchedulerNodeReport getNodeReport(NodeId nodeId) { - return scheduler.getNodeReport(nodeId); - } - - @Override - public SchedulerAppReport getSchedulerAppInfo( - ApplicationAttemptId attemptId) { - return scheduler.getSchedulerAppInfo(attemptId); - } - - @Override - public QueueMetrics getRootQueueMetrics() { - return scheduler.getRootQueueMetrics(); - } - - @Override - public synchronized boolean checkAccess(UserGroupInformation callerUGI, - QueueACL acl, String queueName) { - return scheduler.checkAccess(callerUGI, acl, queueName); - } - - @Override - public ApplicationResourceUsageReport getAppResourceUsageReport( - ApplicationAttemptId appAttemptId) { - return scheduler.getAppResourceUsageReport(appAttemptId); - } - - @Override - public List getAppsInQueue(String queue) { - return scheduler.getAppsInQueue(queue); - } - - @Override - public RMContainer getRMContainer(ContainerId containerId) { - return null; - } - - @Override - public String moveApplication(ApplicationId appId, String newQueue) - throws YarnException { - return scheduler.moveApplication(appId, newQueue); - } - - @Override - @LimitedPrivate("yarn") - @Unstable - public Resource getClusterResource() { - return super.getClusterResource(); - } - - @Override - public synchronized List getTransferredContainers( - ApplicationAttemptId currentAttempt) { - return new ArrayList(); - } - - @Override - public Map> - getSchedulerApplications() { - return new HashMap>(); - } - - @Override - protected void completedContainerInternal(RMContainer rmContainer, - ContainerStatus containerStatus, RMContainerEventType event) { - // do nothing - } - - @Override - public Priority checkAndGetApplicationPriority(Priority priority, - UserGroupInformation user, String queueName, ApplicationId applicationId) - throws YarnException { - // TODO Dummy implementation. - return Priority.newInstance(0); - } - -} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 6ea2ab0f1e..7c37465d7b 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -17,34 +17,19 @@ */ package org.apache.hadoop.yarn.sls.scheduler; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.SortedMap; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -65,117 +50,63 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptR import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; -import org.apache.hadoop.yarn.sls.web.SLSWebApp; +import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.log4j.Logger; -import com.codahale.metrics.Counter; -import com.codahale.metrics.CsvReporter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SlidingWindowReservoir; import com.codahale.metrics.Timer; @Private @Unstable public class SLSCapacityScheduler extends CapacityScheduler implements SchedulerWrapper,Configurable { - private static final String EOL = System.getProperty("line.separator"); - private static final String QUEUE_COUNTER_PREFIX = "counter.queue."; - private static final int SAMPLING_SIZE = 60; - private ScheduledExecutorService pool; - // counters for scheduler allocate/handle operations - private Counter schedulerAllocateCounter; - private Counter schedulerHandleCounter; - private Map schedulerHandleCounterMap; - // Timers for scheduler allocate/handle operations - private Timer schedulerAllocateTimer; - private Timer schedulerHandleTimer; - private Map schedulerHandleTimerMap; - private List schedulerHistogramList; - private Map histogramTimerMap; - private Lock samplerLock; - private Lock queueLock; private Configuration conf; private Map appQueueMap = new ConcurrentHashMap(); - private BufferedWriter jobRuntimeLogBW; - - // Priority of the ResourceSchedulerWrapper shutdown hook. - public static final int SHUTDOWN_HOOK_PRIORITY = 30; - - // web app - private SLSWebApp web; private Map preemptionContainerMap = new ConcurrentHashMap(); // metrics - private MetricRegistry metrics; private SchedulerMetrics schedulerMetrics; private boolean metricsON; - private String metricsOutputDir; - private BufferedWriter metricsLogBW; - private boolean running = false; - private static Map defaultSchedulerMetricsMap = - new HashMap(); - static { - defaultSchedulerMetricsMap.put(FairScheduler.class, - FairSchedulerMetrics.class); - defaultSchedulerMetricsMap.put(FifoScheduler.class, - FifoSchedulerMetrics.class); - defaultSchedulerMetricsMap.put(CapacityScheduler.class, - CapacitySchedulerMetrics.class); - } - // must set by outside - private Set queueSet; - private Set trackedAppSet; + private Tracker tracker; - public final Logger LOG = Logger.getLogger(SLSCapacityScheduler.class); + public Tracker getTracker() { + return tracker; + } public SLSCapacityScheduler() { - samplerLock = new ReentrantLock(); - queueLock = new ReentrantLock(); + tracker = new Tracker(); } @Override public void setConf(Configuration conf) { this.conf = conf; super.setConf(conf); - // start metrics metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true); if (metricsON) { try { - initMetrics(); + schedulerMetrics = SchedulerMetrics.getInstance(conf, + CapacityScheduler.class); + schedulerMetrics.init(this, conf); } catch (Exception e) { e.printStackTrace(); } - } - ShutdownHookManager.get().addShutdownHook(new Runnable() { - @Override - public void run() { - try { - if (metricsLogBW != null) { - metricsLogBW.write("]"); - metricsLogBW.close(); + ShutdownHookManager.get().addShutdownHook(new Runnable() { + @Override public void run() { + try { + schedulerMetrics.tearDown(); + } catch (Exception e) { + e.printStackTrace(); } - if (web != null) { - web.stop(); - } - tearDown(); - } catch (Exception e) { - e.printStackTrace(); } - } - }, SHUTDOWN_HOOK_PRIORITY); + }, SLSUtils.SHUTDOWN_HOOK_PRIORITY); + } } @Override @@ -184,7 +115,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements List strings, List strings2, ContainerUpdates updateRequests) { if (metricsON) { - final Timer.Context context = schedulerAllocateTimer.time(); + final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer() + .time(); Allocation allocation = null; try { allocation = super @@ -193,7 +125,7 @@ public class SLSCapacityScheduler extends CapacityScheduler implements return allocation; } finally { context.stop(); - schedulerAllocateCounter.inc(); + schedulerMetrics.increaseSchedulerAllocationCounter(); try { updateQueueWithAllocateRequest(allocation, attemptId, resourceRequests, containerIds); @@ -209,74 +141,76 @@ public class SLSCapacityScheduler extends CapacityScheduler implements @Override public void handle(SchedulerEvent schedulerEvent) { - // metrics off - if (! metricsON) { - super.handle(schedulerEvent); - return; - } - if(!running) running = true; + if (!metricsON) { + super.handle(schedulerEvent); + return; + } - // metrics on - Timer.Context handlerTimer = null; - Timer.Context operationTimer = null; + if (!schedulerMetrics.isRunning()) { + schedulerMetrics.setRunning(true); + } - NodeUpdateSchedulerEventWrapper eventWrapper; - try { - //if (schedulerEvent instanceof NodeUpdateSchedulerEvent) { - if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE - && schedulerEvent instanceof NodeUpdateSchedulerEvent) { - eventWrapper = new NodeUpdateSchedulerEventWrapper( - (NodeUpdateSchedulerEvent)schedulerEvent); - schedulerEvent = eventWrapper; - updateQueueWithNodeUpdate(eventWrapper); - } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED - && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { - // check if having AM Container, update resource usage information - AppAttemptRemovedSchedulerEvent appRemoveEvent = - (AppAttemptRemovedSchedulerEvent) schedulerEvent; - ApplicationAttemptId appAttemptId = - appRemoveEvent.getApplicationAttemptID(); - String queue = appQueueMap.get(appAttemptId); - SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId); - if (! app.getLiveContainers().isEmpty()) { // have 0 or 1 - // should have one container which is AM container - RMContainer rmc = app.getLiveContainers().iterator().next(); - updateQueueMetrics(queue, - rmc.getContainer().getResource().getMemorySize(), - rmc.getContainer().getResource().getVirtualCores()); - } - } + Timer.Context handlerTimer = null; + Timer.Context operationTimer = null; - handlerTimer = schedulerHandleTimer.time(); - operationTimer = schedulerHandleTimerMap - .get(schedulerEvent.getType()).time(); + NodeUpdateSchedulerEventWrapper eventWrapper; + try { + if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE + && schedulerEvent instanceof NodeUpdateSchedulerEvent) { + eventWrapper = new NodeUpdateSchedulerEventWrapper( + (NodeUpdateSchedulerEvent)schedulerEvent); + schedulerEvent = eventWrapper; + updateQueueWithNodeUpdate(eventWrapper); + } else if (schedulerEvent.getType() == + SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + // check if having AM Container, update resource usage information + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + ApplicationAttemptId appAttemptId = + appRemoveEvent.getApplicationAttemptID(); + String queue = appQueueMap.get(appAttemptId); + SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId); + if (!app.getLiveContainers().isEmpty()) { // have 0 or 1 + // should have one container which is AM container + RMContainer rmc = app.getLiveContainers().iterator().next(); + schedulerMetrics.updateQueueMetricsByRelease( + rmc.getContainer().getResource(), queue); + } + } - super.handle(schedulerEvent); - } finally { - if (handlerTimer != null) handlerTimer.stop(); - if (operationTimer != null) operationTimer.stop(); - schedulerHandleCounter.inc(); - schedulerHandleCounterMap.get(schedulerEvent.getType()).inc(); + handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time(); + operationTimer = schedulerMetrics.getSchedulerHandleTimer( + schedulerEvent.getType()).time(); - if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED - && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { - SLSRunner.decreaseRemainingApps(); - AppAttemptRemovedSchedulerEvent appRemoveEvent = - (AppAttemptRemovedSchedulerEvent) schedulerEvent; - ApplicationAttemptId appAttemptId = - appRemoveEvent.getApplicationAttemptID(); - appQueueMap.remove(appRemoveEvent.getApplicationAttemptID()); - } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_ADDED - && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) { - AppAttemptAddedSchedulerEvent appAddEvent = - (AppAttemptAddedSchedulerEvent) schedulerEvent; - SchedulerApplication app = - applications.get(appAddEvent.getApplicationAttemptId() + super.handle(schedulerEvent); + } finally { + if (handlerTimer != null) { + handlerTimer.stop(); + } + if (operationTimer != null) { + operationTimer.stop(); + } + schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType()); + + if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + SLSRunner.decreaseRemainingApps(); + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + appQueueMap.remove(appRemoveEvent.getApplicationAttemptID()); + } else if (schedulerEvent.getType() == + SchedulerEventType.APP_ATTEMPT_ADDED + && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) { + AppAttemptAddedSchedulerEvent appAddEvent = + (AppAttemptAddedSchedulerEvent) schedulerEvent; + SchedulerApplication app = + applications.get(appAddEvent.getApplicationAttemptId() .getApplicationId()); - appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue() - .getQueueName()); - } - } + appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue() + .getQueueName()); + } + } } private void updateQueueWithNodeUpdate( @@ -316,7 +250,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements } } // update queue counters - updateQueueMetrics(queue, releasedMemory, releasedVCores); + schedulerMetrics.updateQueueMetricsByRelease( + Resource.newInstance(releasedMemory, releasedVCores), queue); } } } @@ -395,410 +330,13 @@ public class SLSCapacityScheduler extends CapacityScheduler implements } // update metrics - SortedMap counterMap = metrics.getCounters(); - String names[] = new String[]{ - "counter.queue." + queueName + ".pending.memory", - "counter.queue." + queueName + ".pending.cores", - "counter.queue." + queueName + ".allocated.memory", - "counter.queue." + queueName + ".allocated.cores"}; - long values[] = new long[]{pendingResource.getMemorySize(), - pendingResource.getVirtualCores(), - allocatedResource.getMemorySize(), allocatedResource.getVirtualCores()}; - for (int i = names.length - 1; i >= 0; i --) { - if (! counterMap.containsKey(names[i])) { - metrics.counter(names[i]); - counterMap = metrics.getCounters(); - } - counterMap.get(names[i]).inc(values[i]); - } - - queueLock.lock(); - try { - if (! schedulerMetrics.isTracked(queueName)) { - schedulerMetrics.trackQueue(queueName); - } - } finally { - queueLock.unlock(); - } - } - - private void tearDown() throws IOException { - // close job runtime writer - if (jobRuntimeLogBW != null) { - jobRuntimeLogBW.close(); - } - // shut pool - if (pool != null) pool.shutdown(); - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - private void initMetrics() throws Exception { - metrics = new MetricRegistry(); - // configuration - metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR); - int metricsWebAddressPort = conf.getInt( - SLSConfiguration.METRICS_WEB_ADDRESS_PORT, - SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT); - // create SchedulerMetrics for current scheduler - String schedulerMetricsType = conf.get(CapacityScheduler.class.getName()); - Class schedulerMetricsClass = schedulerMetricsType == null? - defaultSchedulerMetricsMap.get(CapacityScheduler.class) : - Class.forName(schedulerMetricsType); - schedulerMetrics = (SchedulerMetrics)ReflectionUtils - .newInstance(schedulerMetricsClass, new Configuration()); - schedulerMetrics.init(this, metrics); - - // register various metrics - registerJvmMetrics(); - registerClusterResourceMetrics(); - registerContainerAppNumMetrics(); - registerSchedulerMetrics(); - - // .csv output - initMetricsCSVOutput(); - - // start web app to provide real-time tracking - web = new SLSWebApp(this, metricsWebAddressPort); - web.start(); - - // a thread to update histogram timer - pool = new ScheduledThreadPoolExecutor(2); - pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000, - TimeUnit.MILLISECONDS); - - // a thread to output metrics for real-tiem tracking - pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000, - TimeUnit.MILLISECONDS); - - // application running information - jobRuntimeLogBW = - new BufferedWriter(new OutputStreamWriter(new FileOutputStream( - metricsOutputDir + "/jobruntime.csv"), "UTF-8")); - jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," + - "simulate_start_time,simulate_end_time" + EOL); - jobRuntimeLogBW.flush(); - } - - private void registerJvmMetrics() { - // add JVM gauges - metrics.register("variable.jvm.free.memory", - new Gauge() { - @Override - public Long getValue() { - return Runtime.getRuntime().freeMemory(); - } - } - ); - metrics.register("variable.jvm.max.memory", - new Gauge() { - @Override - public Long getValue() { - return Runtime.getRuntime().maxMemory(); - } - } - ); - metrics.register("variable.jvm.total.memory", - new Gauge() { - @Override - public Long getValue() { - return Runtime.getRuntime().totalMemory(); - } - } - ); - } - - private void registerClusterResourceMetrics() { - metrics.register("variable.cluster.allocated.memory", - new Gauge() { - @Override - public Long getValue() { - if( getRootQueueMetrics() == null) { - return 0L; - } else { - return getRootQueueMetrics().getAllocatedMB(); - } - } - } - ); - metrics.register("variable.cluster.allocated.vcores", - new Gauge() { - @Override - public Integer getValue() { - if(getRootQueueMetrics() == null) { - return 0; - } else { - return getRootQueueMetrics().getAllocatedVirtualCores(); - } - } - } - ); - metrics.register("variable.cluster.available.memory", - new Gauge() { - @Override - public Long getValue() { - if(getRootQueueMetrics() == null) { - return 0L; - } else { - return getRootQueueMetrics().getAvailableMB(); - } - } - } - ); - metrics.register("variable.cluster.available.vcores", - new Gauge() { - @Override - public Integer getValue() { - if(getRootQueueMetrics() == null) { - return 0; - } else { - return getRootQueueMetrics().getAvailableVirtualCores(); - } - } - } - ); - metrics.register("variable.cluster.reserved.memory", - new Gauge() { - @Override - public Long getValue() { - if(getRootQueueMetrics() == null) { - return 0L; - } else { - return getRootQueueMetrics().getReservedMB(); - } - } - } - ); - metrics.register("variable.cluster.reserved.vcores", - new Gauge() { - @Override - public Integer getValue() { - if(getRootQueueMetrics() == null) { - return 0; - } else { - return getRootQueueMetrics().getReservedVirtualCores(); - } - } - } - ); - } - - private void registerContainerAppNumMetrics() { - metrics.register("variable.running.application", - new Gauge() { - @Override - public Integer getValue() { - if(getRootQueueMetrics() == null) { - return 0; - } else { - return getRootQueueMetrics().getAppsRunning(); - } - } - } - ); - metrics.register("variable.running.container", - new Gauge() { - @Override - public Integer getValue() { - if(getRootQueueMetrics() == null) { - return 0; - } else { - return getRootQueueMetrics().getAllocatedContainers(); - } - } - } - ); - } - - private void registerSchedulerMetrics() { - samplerLock.lock(); - try { - // counters for scheduler operations - schedulerAllocateCounter = metrics.counter( - "counter.scheduler.operation.allocate"); - schedulerHandleCounter = metrics.counter( - "counter.scheduler.operation.handle"); - schedulerHandleCounterMap = new HashMap(); - for (SchedulerEventType e : SchedulerEventType.values()) { - Counter counter = metrics.counter( - "counter.scheduler.operation.handle." + e); - schedulerHandleCounterMap.put(e, counter); - } - // timers for scheduler operations - int timeWindowSize = conf.getInt( - SLSConfiguration.METRICS_TIMER_WINDOW_SIZE, - SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT); - schedulerAllocateTimer = new Timer( - new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimer = new Timer( - new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimerMap = new HashMap(); - for (SchedulerEventType e : SchedulerEventType.values()) { - Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimerMap.put(e, timer); - } - // histogram for scheduler operations (Samplers) - schedulerHistogramList = new ArrayList(); - histogramTimerMap = new HashMap(); - Histogram schedulerAllocateHistogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register("sampler.scheduler.operation.allocate.timecost", - schedulerAllocateHistogram); - schedulerHistogramList.add(schedulerAllocateHistogram); - histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer); - Histogram schedulerHandleHistogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register("sampler.scheduler.operation.handle.timecost", - schedulerHandleHistogram); - schedulerHistogramList.add(schedulerHandleHistogram); - histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer); - for (SchedulerEventType e : SchedulerEventType.values()) { - Histogram histogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register( - "sampler.scheduler.operation.handle." + e + ".timecost", - histogram); - schedulerHistogramList.add(histogram); - histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e)); - } - } finally { - samplerLock.unlock(); - } - } - - private void initMetricsCSVOutput() { - int timeIntervalMS = conf.getInt( - SLSConfiguration.METRICS_RECORD_INTERVAL_MS, - SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT); - File dir = new File(metricsOutputDir + "/metrics"); - if(! dir.exists() - && ! dir.mkdirs()) { - LOG.error("Cannot create directory " + dir.getAbsoluteFile()); - } - final CsvReporter reporter = CsvReporter.forRegistry(metrics) - .formatFor(Locale.US) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build(new File(metricsOutputDir + "/metrics")); - reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS); - } - - class HistogramsRunnable implements Runnable { - @Override - public void run() { - samplerLock.lock(); - try { - for (Histogram histogram : schedulerHistogramList) { - Timer timer = histogramTimerMap.get(histogram); - histogram.update((int) timer.getSnapshot().getMean()); - } - } finally { - samplerLock.unlock(); - } - } - } - - class MetricsLogRunnable implements Runnable { - private boolean firstLine = true; - public MetricsLogRunnable() { - try { - metricsLogBW = - new BufferedWriter(new OutputStreamWriter(new FileOutputStream( - metricsOutputDir + "/realtimetrack.json"), "UTF-8")); - metricsLogBW.write("["); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - public void run() { - if(running) { - // all WebApp to get real tracking json - String metrics = web.generateRealTimeTrackingMetrics(); - // output - try { - if(firstLine) { - metricsLogBW.write(metrics + EOL); - firstLine = false; - } else { - metricsLogBW.write("," + metrics + EOL); - } - metricsLogBW.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - } - - // the following functions are used by AMSimulator - public void addAMRuntime(ApplicationId appId, - long traceStartTimeMS, long traceEndTimeMS, - long simulateStartTimeMS, long simulateEndTimeMS) { - - if (metricsON) { - try { - // write job runtime information - StringBuilder sb = new StringBuilder(); - sb.append(appId).append(",").append(traceStartTimeMS).append(",") - .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) - .append(",").append(simulateEndTimeMS); - jobRuntimeLogBW.write(sb.toString() + EOL); - jobRuntimeLogBW.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - private void updateQueueMetrics(String queue, - long releasedMemory, int releasedVCores) { - // update queue counters - SortedMap counterMap = metrics.getCounters(); - if (releasedMemory != 0) { - String name = "counter.queue." + queue + ".allocated.memory"; - if (! counterMap.containsKey(name)) { - metrics.counter(name); - counterMap = metrics.getCounters(); - } - counterMap.get(name).inc(-releasedMemory); - } - if (releasedVCores != 0) { - String name = "counter.queue." + queue + ".allocated.cores"; - if (! counterMap.containsKey(name)) { - metrics.counter(name); - counterMap = metrics.getCounters(); - } - counterMap.get(name).inc(-releasedVCores); - } + schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource, + queueName); } private void initQueueMetrics(CSQueue queue) { if (queue instanceof LeafQueue) { - SortedMap counterMap = metrics.getCounters(); - String queueName = queue.getQueueName(); - String[] names = new String[]{ - QUEUE_COUNTER_PREFIX + queueName + ".pending.memory", - QUEUE_COUNTER_PREFIX + queueName + ".pending.cores", - QUEUE_COUNTER_PREFIX + queueName + ".allocated.memory", - QUEUE_COUNTER_PREFIX + queueName + ".allocated.cores" }; - - for (int i = names.length - 1; i >= 0; i--) { - if (!counterMap.containsKey(names[i])) { - metrics.counter(names[i]); - counterMap = metrics.getCounters(); - } - } - - queueLock.lock(); - try { - if (!schedulerMetrics.isTracked(queueName)) { - schedulerMetrics.trackQueue(queueName); - } - } finally { - queueLock.unlock(); - } - + schedulerMetrics.initQueueMetric(queue.getQueueName()); return; } @@ -811,54 +349,17 @@ public class SLSCapacityScheduler extends CapacityScheduler implements public void serviceInit(Configuration configuration) throws Exception { super.serviceInit(configuration); - initQueueMetrics(getRootQueue()); - } - - public void setQueueSet(Set queues) { - this.queueSet = queues; - } - - public Set getQueueSet() { - return this.queueSet; - } - - public void setTrackedAppSet(Set apps) { - this.trackedAppSet = apps; - } - - public Set getTrackedAppSet() { - return this.trackedAppSet; - } - - public MetricRegistry getMetrics() { - return metrics; + if (metricsON) { + initQueueMetrics(getRootQueue()); + } } public SchedulerMetrics getSchedulerMetrics() { return schedulerMetrics; } - // API open to out classes - public void addTrackedApp(ApplicationId appId, - String oldAppId) { - if (metricsON) { - schedulerMetrics.trackApp(appId, oldAppId); - } - } - - public void removeTrackedApp(String oldAppId) { - if (metricsON) { - schedulerMetrics.untrackApp(oldAppId); - } - } - @Override public Configuration getConf() { return conf; } - - - - -} - +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java new file mode 100644 index 0000000000..572dacfc55 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.scheduler; + +import com.codahale.metrics.Timer; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.sls.SLSRunner; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +@Private +@Unstable +public class SLSFairScheduler extends FairScheduler + implements SchedulerWrapper, Configurable { + private SchedulerMetrics schedulerMetrics; + private boolean metricsON; + private Tracker tracker; + + private Map preemptionContainerMap = + new ConcurrentHashMap<>(); + + public SchedulerMetrics getSchedulerMetrics() { + return schedulerMetrics; + } + + public Tracker getTracker() { + return tracker; + } + + public SLSFairScheduler() { + tracker = new Tracker(); + } + + @Override + public void setConf(Configuration conf) { + super.setConfig(conf); + + metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true); + if (metricsON) { + try { + schedulerMetrics = SchedulerMetrics.getInstance(conf, + FairScheduler.class); + schedulerMetrics.init(this, conf); + } catch (Exception e) { + e.printStackTrace(); + } + + ShutdownHookManager.get().addShutdownHook(new Runnable() { + @Override public void run() { + try { + schedulerMetrics.tearDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, SLSUtils.SHUTDOWN_HOOK_PRIORITY); + } + } + + @Override + public Allocation allocate(ApplicationAttemptId attemptId, + List resourceRequests, List containerIds, + List blacklistAdditions, List blacklistRemovals, + ContainerUpdates updateRequests) { + if (metricsON) { + final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer() + .time(); + Allocation allocation = null; + try { + allocation = super.allocate(attemptId, resourceRequests, containerIds, + blacklistAdditions, blacklistRemovals, updateRequests); + return allocation; + } finally { + context.stop(); + schedulerMetrics.increaseSchedulerAllocationCounter(); + try { + updateQueueWithAllocateRequest(allocation, attemptId, + resourceRequests, containerIds); + } catch (IOException e) { + e.printStackTrace(); + } + } + } else { + return super.allocate(attemptId, resourceRequests, containerIds, + blacklistAdditions, blacklistRemovals, updateRequests); + } + } + + @Override + public void handle(SchedulerEvent schedulerEvent) { + // metrics off + if (!metricsON) { + super.handle(schedulerEvent); + return; + } + + // metrics on + if(!schedulerMetrics.isRunning()) { + schedulerMetrics.setRunning(true); + } + + Timer.Context handlerTimer = null; + Timer.Context operationTimer = null; + + NodeUpdateSchedulerEventWrapper eventWrapper; + try { + if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE + && schedulerEvent instanceof NodeUpdateSchedulerEvent) { + eventWrapper = new NodeUpdateSchedulerEventWrapper( + (NodeUpdateSchedulerEvent)schedulerEvent); + schedulerEvent = eventWrapper; + updateQueueWithNodeUpdate(eventWrapper); + } else if ( + schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + // check if having AM Container, update resource usage information + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + ApplicationAttemptId appAttemptId = + appRemoveEvent.getApplicationAttemptID(); + String queueName = getSchedulerApp(appAttemptId).getQueue().getName(); + SchedulerAppReport app = getSchedulerAppInfo(appAttemptId); + if (!app.getLiveContainers().isEmpty()) { // have 0 or 1 + // should have one container which is AM container + RMContainer rmc = app.getLiveContainers().iterator().next(); + schedulerMetrics.updateQueueMetricsByRelease( + rmc.getContainer().getResource(), queueName); + } + } + + handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time(); + operationTimer = schedulerMetrics.getSchedulerHandleTimer( + schedulerEvent.getType()).time(); + + super.handle(schedulerEvent); + } finally { + if (handlerTimer != null) { + handlerTimer.stop(); + } + if (operationTimer != null) { + operationTimer.stop(); + } + schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType()); + + if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + SLSRunner.decreaseRemainingApps(); + } + } + } + + private void updateQueueWithNodeUpdate( + NodeUpdateSchedulerEventWrapper eventWrapper) { + RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode(); + List containerList = node.getContainerUpdates(); + for (UpdatedContainerInfo info : containerList) { + for (ContainerStatus status : info.getCompletedContainers()) { + ContainerId containerId = status.getContainerId(); + SchedulerAppReport app = super.getSchedulerAppInfo( + containerId.getApplicationAttemptId()); + + if (app == null) { + // this happens for the AM container + // The app have already removed when the NM sends the release + // information. + continue; + } + + int releasedMemory = 0, releasedVCores = 0; + if (status.getExitStatus() == ContainerExitStatus.SUCCESS) { + for (RMContainer rmc : app.getLiveContainers()) { + if (rmc.getContainerId() == containerId) { + Resource resource = rmc.getContainer().getResource(); + releasedMemory += resource.getMemorySize(); + releasedVCores += resource.getVirtualCores(); + break; + } + } + } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) { + if (preemptionContainerMap.containsKey(containerId)) { + Resource preResource = preemptionContainerMap.get(containerId); + releasedMemory += preResource.getMemorySize(); + releasedVCores += preResource.getVirtualCores(); + preemptionContainerMap.remove(containerId); + } + } + // update queue counters + String queue = getSchedulerApp(containerId.getApplicationAttemptId()). + getQueueName(); + schedulerMetrics.updateQueueMetricsByRelease( + Resource.newInstance(releasedMemory, releasedVCores), queue); + } + } + } + + private void updateQueueWithAllocateRequest(Allocation allocation, + ApplicationAttemptId attemptId, + List resourceRequests, + List containerIds) throws IOException { + // update queue information + Resource pendingResource = Resources.createResource(0, 0); + Resource allocatedResource = Resources.createResource(0, 0); + // container requested + for (ResourceRequest request : resourceRequests) { + if (request.getResourceName().equals(ResourceRequest.ANY)) { + Resources.addTo(pendingResource, + Resources.multiply(request.getCapability(), + request.getNumContainers())); + } + } + // container allocated + for (Container container : allocation.getContainers()) { + Resources.addTo(allocatedResource, container.getResource()); + Resources.subtractFrom(pendingResource, container.getResource()); + } + // container released from AM + SchedulerAppReport report = super.getSchedulerAppInfo(attemptId); + for (ContainerId containerId : containerIds) { + Container container = null; + for (RMContainer c : report.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + // released allocated containers + Resources.subtractFrom(allocatedResource, container.getResource()); + } else { + for (RMContainer c : report.getReservedContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + // released reserved containers + Resources.subtractFrom(pendingResource, container.getResource()); + } + } + } + // containers released/preemption from scheduler + Set preemptionContainers = new HashSet(); + if (allocation.getContainerPreemptions() != null) { + preemptionContainers.addAll(allocation.getContainerPreemptions()); + } + if (allocation.getStrictContainerPreemptions() != null) { + preemptionContainers.addAll(allocation.getStrictContainerPreemptions()); + } + if (!preemptionContainers.isEmpty()) { + for (ContainerId containerId : preemptionContainers) { + if (!preemptionContainerMap.containsKey(containerId)) { + Container container = null; + for (RMContainer c : report.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + preemptionContainerMap.put(containerId, container.getResource()); + } + } + + } + } + + // update metrics + String queueName = getSchedulerApp(attemptId).getQueueName(); + schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource, + queueName); + } + + private void initQueueMetrics(FSQueue queue) { + if (queue instanceof FSLeafQueue) { + schedulerMetrics.initQueueMetric(queue.getQueueName()); + return; + } + + for (FSQueue child : queue.getChildQueues()) { + initQueueMetrics(child); + } + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + if (metricsON) { + initQueueMetrics(getQueueManager().getRootQueue()); + } + } +} + diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java index 8645a697c1..a8792e81ec 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java @@ -18,40 +18,170 @@ package org.apache.hadoop.yarn.sls.scheduler; +import java.io.BufferedWriter; +import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.SortedMap; +import java.util.Locale; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Lock; +import com.codahale.metrics.Counter; +import com.codahale.metrics.CsvReporter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SlidingWindowReservoir; +import com.codahale.metrics.Timer; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; - -import com.codahale.metrics.Gauge; -import com.codahale.metrics.MetricRegistry; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.web.SLSWebApp; +import org.apache.log4j.Logger; @Private @Unstable public abstract class SchedulerMetrics { + private static final String EOL = System.getProperty("line.separator"); + private static final int SAMPLING_SIZE = 60; + private static final Logger LOG = Logger.getLogger(SchedulerMetrics.class); + protected ResourceScheduler scheduler; protected Set trackedQueues; protected MetricRegistry metrics; protected Set appTrackedMetrics; protected Set queueTrackedMetrics; - + + private Configuration conf; + private ScheduledExecutorService pool; + private SLSWebApp web; + + // metrics + private String metricsOutputDir; + private BufferedWriter metricsLogBW; + private BufferedWriter jobRuntimeLogBW; + private boolean running = false; + + // counters for scheduler allocate/handle operations + private Counter schedulerAllocateCounter; + private Counter schedulerHandleCounter; + private Map schedulerHandleCounterMap; + + // Timers for scheduler allocate/handle operations + private Timer schedulerAllocateTimer; + private Timer schedulerHandleTimer; + private Map schedulerHandleTimerMap; + private List schedulerHistogramList; + private Map histogramTimerMap; + private Lock samplerLock; + private Lock queueLock; + + static Class getSchedulerMetricsClass(Configuration conf, + Class schedulerClass) throws ClassNotFoundException { + Class metricClass = null; + String schedulerMetricsType = conf.get(schedulerClass.getName()); + if (schedulerMetricsType != null) { + metricClass = Class.forName(schedulerMetricsType); + } + + if (schedulerClass.equals(FairScheduler.class)) { + metricClass = FairSchedulerMetrics.class; + } else if (schedulerClass.equals(CapacityScheduler.class)) { + metricClass = CapacitySchedulerMetrics.class; + } else if (schedulerClass.equals(FifoScheduler.class)) { + metricClass = FifoSchedulerMetrics.class; + } + + return metricClass; + } + + static SchedulerMetrics getInstance(Configuration conf, Class schedulerClass) + throws ClassNotFoundException { + Class schedulerMetricClass = getSchedulerMetricsClass(conf, schedulerClass); + return (SchedulerMetrics) ReflectionUtils + .newInstance(schedulerMetricClass, new Configuration()); + } + public SchedulerMetrics() { + metrics = new MetricRegistry(); + appTrackedMetrics = new HashSet<>(); appTrackedMetrics.add("live.containers"); appTrackedMetrics.add("reserved.containers"); + queueTrackedMetrics = new HashSet<>(); + trackedQueues = new HashSet<>(); + + samplerLock = new ReentrantLock(); + queueLock = new ReentrantLock(); } - - public void init(ResourceScheduler scheduler, MetricRegistry metrics) { - this.scheduler = scheduler; - this.trackedQueues = new HashSet<>(); - this.metrics = metrics; + + void init(ResourceScheduler resourceScheduler, Configuration config) + throws Exception { + this.scheduler = resourceScheduler; + this.conf = config; + + metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR); + + // register various metrics + registerJvmMetrics(); + registerClusterResourceMetrics(); + registerContainerAppNumMetrics(); + registerSchedulerMetrics(); + + // .csv output + initMetricsCSVOutput(); + + // start web app to provide real-time tracking + int metricsWebAddressPort = conf.getInt( + SLSConfiguration.METRICS_WEB_ADDRESS_PORT, + SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT); + web = new SLSWebApp((SchedulerWrapper)scheduler, metricsWebAddressPort); + web.start(); + + // a thread to update histogram timer + pool = new ScheduledThreadPoolExecutor(2); + pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000, + TimeUnit.MILLISECONDS); + + // a thread to output metrics for real-tiem tracking + pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000, + TimeUnit.MILLISECONDS); + + // application running information + jobRuntimeLogBW = + new BufferedWriter(new OutputStreamWriter(new FileOutputStream( + metricsOutputDir + "/jobruntime.csv"), "UTF-8")); + jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," + + "simulate_start_time,simulate_end_time" + EOL); + jobRuntimeLogBW.flush(); + } + + public MetricRegistry getMetrics() { + return metrics; } protected SchedulerApplicationAttempt getSchedulerAppAttempt( @@ -117,7 +247,392 @@ public abstract class SchedulerMetrics { public Set getAppTrackedMetrics() { return appTrackedMetrics; } + public Set getQueueTrackedMetrics() { return queueTrackedMetrics; } + + private void registerJvmMetrics() { + // add JVM gauges + metrics.register("variable.jvm.free.memory", + new Gauge() { + @Override + public Long getValue() { + return Runtime.getRuntime().freeMemory(); + } + } + ); + metrics.register("variable.jvm.max.memory", + new Gauge() { + @Override + public Long getValue() { + return Runtime.getRuntime().maxMemory(); + } + } + ); + metrics.register("variable.jvm.total.memory", + new Gauge() { + @Override + public Long getValue() { + return Runtime.getRuntime().totalMemory(); + } + } + ); + } + + private void registerClusterResourceMetrics() { + metrics.register("variable.cluster.allocated.memory", + new Gauge() { + @Override + public Long getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0L; + } else { + return scheduler.getRootQueueMetrics().getAllocatedMB(); + } + } + } + ); + metrics.register("variable.cluster.allocated.vcores", + new Gauge() { + @Override + public Integer getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0; + } else { + return scheduler.getRootQueueMetrics().getAllocatedVirtualCores(); + } + } + } + ); + metrics.register("variable.cluster.available.memory", + new Gauge() { + @Override + public Long getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0L; + } else { + return scheduler.getRootQueueMetrics().getAvailableMB(); + } + } + } + ); + metrics.register("variable.cluster.available.vcores", + new Gauge() { + @Override + public Integer getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0; + } else { + return scheduler.getRootQueueMetrics().getAvailableVirtualCores(); + } + } + } + ); + } + + private void registerContainerAppNumMetrics() { + metrics.register("variable.running.application", + new Gauge() { + @Override + public Integer getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0; + } else { + return scheduler.getRootQueueMetrics().getAppsRunning(); + } + } + } + ); + metrics.register("variable.running.container", + new Gauge() { + @Override + public Integer getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0; + } else { + return scheduler.getRootQueueMetrics().getAllocatedContainers(); + } + } + } + ); + } + + private void registerSchedulerMetrics() { + samplerLock.lock(); + try { + // counters for scheduler operations + schedulerAllocateCounter = metrics.counter( + "counter.scheduler.operation.allocate"); + schedulerHandleCounter = metrics.counter( + "counter.scheduler.operation.handle"); + schedulerHandleCounterMap = new HashMap<>(); + for (SchedulerEventType e : SchedulerEventType.values()) { + Counter counter = metrics.counter( + "counter.scheduler.operation.handle." + e); + schedulerHandleCounterMap.put(e, counter); + } + // timers for scheduler operations + int timeWindowSize = conf.getInt( + SLSConfiguration.METRICS_TIMER_WINDOW_SIZE, + SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT); + schedulerAllocateTimer = new Timer( + new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimer = new Timer( + new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimerMap = new HashMap<>(); + for (SchedulerEventType e : SchedulerEventType.values()) { + Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimerMap.put(e, timer); + } + // histogram for scheduler operations (Samplers) + schedulerHistogramList = new ArrayList<>(); + histogramTimerMap = new HashMap<>(); + Histogram schedulerAllocateHistogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register("sampler.scheduler.operation.allocate.timecost", + schedulerAllocateHistogram); + schedulerHistogramList.add(schedulerAllocateHistogram); + histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer); + Histogram schedulerHandleHistogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register("sampler.scheduler.operation.handle.timecost", + schedulerHandleHistogram); + schedulerHistogramList.add(schedulerHandleHistogram); + histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer); + for (SchedulerEventType e : SchedulerEventType.values()) { + Histogram histogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register( + "sampler.scheduler.operation.handle." + e + ".timecost", + histogram); + schedulerHistogramList.add(histogram); + histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e)); + } + } finally { + samplerLock.unlock(); + } + } + + private void initMetricsCSVOutput() { + int timeIntervalMS = conf.getInt( + SLSConfiguration.METRICS_RECORD_INTERVAL_MS, + SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT); + File dir = new File(metricsOutputDir + "/metrics"); + if(!dir.exists() && !dir.mkdirs()) { + LOG.error("Cannot create directory " + dir.getAbsoluteFile()); + } + final CsvReporter reporter = CsvReporter.forRegistry(metrics) + .formatFor(Locale.US) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(new File(metricsOutputDir + "/metrics")); + reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS); + } + + boolean isRunning() { + return running; + } + + void setRunning(boolean running) { + this.running = running; + } + + class HistogramsRunnable implements Runnable { + @Override + public void run() { + samplerLock.lock(); + try { + for (Histogram histogram : schedulerHistogramList) { + Timer timer = histogramTimerMap.get(histogram); + histogram.update((int) timer.getSnapshot().getMean()); + } + } finally { + samplerLock.unlock(); + } + } + } + + class MetricsLogRunnable implements Runnable { + private boolean firstLine = true; + + MetricsLogRunnable() { + try { + metricsLogBW = + new BufferedWriter(new OutputStreamWriter(new FileOutputStream( + metricsOutputDir + "/realtimetrack.json"), "UTF-8")); + metricsLogBW.write("["); + } catch (IOException e) { + LOG.info(e.getMessage()); + } + } + + @Override + public void run() { + if(running) { + // all WebApp to get real tracking json + String trackingMetrics = web.generateRealTimeTrackingMetrics(); + // output + try { + if(firstLine) { + metricsLogBW.write(trackingMetrics + EOL); + firstLine = false; + } else { + metricsLogBW.write("," + trackingMetrics + EOL); + } + metricsLogBW.flush(); + } catch (IOException e) { + LOG.info(e.getMessage()); + } + } + } + } + + void tearDown() throws Exception { + if (metricsLogBW != null) { + metricsLogBW.write("]"); + metricsLogBW.close(); + } + + if (web != null) { + web.stop(); + } + + if (jobRuntimeLogBW != null) { + jobRuntimeLogBW.close(); + } + + if (pool != null) { + pool.shutdown(); + } + } + + void increaseSchedulerAllocationCounter() { + schedulerAllocateCounter.inc(); + } + + void increaseSchedulerHandleCounter(SchedulerEventType schedulerEventType) { + schedulerHandleCounter.inc(); + schedulerHandleCounterMap.get(schedulerEventType).inc(); + } + + Timer getSchedulerAllocateTimer() { + return schedulerAllocateTimer; + } + + Timer getSchedulerHandleTimer() { + return schedulerHandleTimer; + } + + Timer getSchedulerHandleTimer(SchedulerEventType schedulerEventType) { + return schedulerHandleTimerMap.get(schedulerEventType); + } + + private enum QueueMetric { + PENDING_MEMORY("pending.memory"), + PENDING_VCORES("pending.cores"), + ALLOCATED_MEMORY("allocated.memory"), + ALLOCATED_VCORES("allocated.cores"); + + private String value; + + QueueMetric(String value) { + this.value = value; + } + } + + private String getQueueMetricName(String queue, QueueMetric metric) { + return "counter.queue." + queue + "." + metric.value; + } + + private void traceQueueIfNotTraced(String queue) { + queueLock.lock(); + try { + if (!isTracked(queue)) { + trackQueue(queue); + } + } finally { + queueLock.unlock(); + } + } + + void initQueueMetric(String queueName){ + SortedMap counterMap = metrics.getCounters(); + + for (QueueMetric queueMetric : QueueMetric.values()) { + String metricName = getQueueMetricName(queueName, queueMetric); + if (!counterMap.containsKey(metricName)) { + metrics.counter(metricName); + counterMap = metrics.getCounters(); + } + } + + traceQueueIfNotTraced(queueName); + } + + void updateQueueMetrics(Resource pendingResource, Resource allocatedResource, + String queueName) { + SortedMap counterMap = metrics.getCounters(); + for(QueueMetric metric : QueueMetric.values()) { + String metricName = getQueueMetricName(queueName, metric); + if (!counterMap.containsKey(metricName)) { + metrics.counter(metricName); + counterMap = metrics.getCounters(); + } + + if (metric == QueueMetric.PENDING_MEMORY) { + counterMap.get(metricName).inc(pendingResource.getMemorySize()); + } else if (metric == QueueMetric.PENDING_VCORES) { + counterMap.get(metricName).inc(pendingResource.getVirtualCores()); + } else if (metric == QueueMetric.ALLOCATED_MEMORY) { + counterMap.get(metricName).inc(allocatedResource.getMemorySize()); + } else if (metric == QueueMetric.ALLOCATED_VCORES){ + counterMap.get(metricName).inc(allocatedResource.getVirtualCores()); + } + } + + traceQueueIfNotTraced(queueName); + } + + void updateQueueMetricsByRelease(Resource releaseResource, String queue) { + SortedMap counterMap = metrics.getCounters(); + String name = getQueueMetricName(queue, QueueMetric.ALLOCATED_MEMORY); + if (!counterMap.containsKey(name)) { + metrics.counter(name); + counterMap = metrics.getCounters(); + } + counterMap.get(name).inc(-releaseResource.getMemorySize()); + + String vcoreMetric = + getQueueMetricName(queue, QueueMetric.ALLOCATED_VCORES); + if (!counterMap.containsKey(vcoreMetric)) { + metrics.counter(vcoreMetric); + counterMap = metrics.getCounters(); + } + counterMap.get(vcoreMetric).inc(-releaseResource.getVirtualCores()); + } + + public void addTrackedApp(ApplicationId appId, + String oldAppId) { + trackApp(appId, oldAppId); + } + + public void removeTrackedApp(String oldAppId) { + untrackApp(oldAppId); + } + + public void addAMRuntime(ApplicationId appId, long traceStartTimeMS, + long traceEndTimeMS, long simulateStartTimeMS, long simulateEndTimeMS) { + try { + // write job runtime information + StringBuilder sb = new StringBuilder(); + sb.append(appId).append(",").append(traceStartTimeMS).append(",") + .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) + .append(",").append(simulateEndTimeMS); + jobRuntimeLogBW.write(sb.toString() + EOL); + jobRuntimeLogBW.flush(); + } catch (IOException e) { + LOG.info(e.getMessage()); + } + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java index 962b137a7a..406f0508ec 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java @@ -17,28 +17,12 @@ */ package org.apache.hadoop.yarn.sls.scheduler; -import java.util.Set; - import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationId; - -import com.codahale.metrics.MetricRegistry; @Private @Unstable public interface SchedulerWrapper { - - MetricRegistry getMetrics(); SchedulerMetrics getSchedulerMetrics(); - Set getQueueSet(); - void setQueueSet(Set queues); - Set getTrackedAppSet(); - void setTrackedAppSet(Set apps); - void addTrackedApp(ApplicationId appId, String oldAppId); - void removeTrackedApp(String oldAppId); - void addAMRuntime(ApplicationId appId, - long traceStartTimeMS, long traceEndTimeMS, - long simulateStartTimeMS, long simulateEndTimeMS); - + Tracker getTracker(); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java new file mode 100644 index 0000000000..42a5c3c894 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.scheduler; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +import java.util.Set; + +@Private +@Unstable +public class Tracker { + private Set queueSet; + private Set trackedAppSet; + + public void setQueueSet(Set queues) { + queueSet = queues; + } + + public Set getQueueSet() { + return queueSet; + } + + public void setTrackedAppSet(Set apps) { + trackedAppSet = apps; + } + + public Set getTrackedAppSet() { + return trackedAppSet; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java index e5f7cd067b..085edc0085 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java @@ -43,6 +43,7 @@ import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; @Private @Unstable public class SLSUtils { + public static final int SHUTDOWN_HOOK_PRIORITY = 30; // hostname includes the network path and the host name. for example // "/default-rack/hostFoo" or "/coreSwitchA/TORSwitchB/hostBar". diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java index 33d48466c2..2d2ffc5506 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java @@ -107,12 +107,12 @@ public class SLSWebApp extends HttpServlet { public SLSWebApp(SchedulerWrapper wrapper, int metricsAddressPort) { this.wrapper = wrapper; - metrics = wrapper.getMetrics(); handleOperTimecostHistogramMap = new HashMap(); queueAllocatedMemoryCounterMap = new HashMap(); queueAllocatedVCoresCounterMap = new HashMap(); schedulerMetrics = wrapper.getSchedulerMetrics(); + metrics = schedulerMetrics.getMetrics(); port = metricsAddressPort; } @@ -226,7 +226,7 @@ public class SLSWebApp extends HttpServlet { response.setStatus(HttpServletResponse.SC_OK); // queues {0} - Set queues = wrapper.getQueueSet(); + Set queues = wrapper.getTracker().getQueueSet(); StringBuilder queueInfo = new StringBuilder(); int i = 0; @@ -265,7 +265,7 @@ public class SLSWebApp extends HttpServlet { // tracked queues {0} StringBuilder trackedQueueInfo = new StringBuilder(); - Set trackedQueues = wrapper.getQueueSet(); + Set trackedQueues = wrapper.getTracker().getQueueSet(); for(String queue : trackedQueues) { trackedQueueInfo.append(""); @@ -273,7 +273,7 @@ public class SLSWebApp extends HttpServlet { // tracked apps {1} StringBuilder trackedAppInfo = new StringBuilder(); - Set trackedApps = wrapper.getTrackedAppSet(); + Set trackedApps = wrapper.getTracker().getTrackedAppSet(); for(String job : trackedApps) { trackedAppInfo.append(""); @@ -422,7 +422,7 @@ public class SLSWebApp extends HttpServlet { // allocated resource for each queue Map queueAllocatedMemoryMap = new HashMap(); Map queueAllocatedVCoresMap = new HashMap(); - for (String queue : wrapper.getQueueSet()) { + for (String queue : wrapper.getTracker().getQueueSet()) { // memory String key = "counter.queue." + queue + ".allocated.memory"; if (! queueAllocatedMemoryCounterMap.containsKey(queue) && @@ -462,7 +462,7 @@ public class SLSWebApp extends HttpServlet { .append(",\"cluster.available.memory\":").append(availableMemoryGB) .append(",\"cluster.available.vcores\":").append(availableVCoresGB); - for (String queue : wrapper.getQueueSet()) { + for (String queue : wrapper.getTracker().getQueueSet()) { sb.append(",\"queue.").append(queue).append(".allocated.memory\":") .append(queueAllocatedMemoryMap.get(queue)); sb.append(",\"queue.").append(queue).append(".allocated.vcores\":") diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index f0d8e6f507..ca3d1958a3 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -22,37 +22,56 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; -import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; -import org.apache.hadoop.yarn.sls.scheduler.FairSchedulerMetrics; -import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; +import org.apache.hadoop.yarn.sls.scheduler.*; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; +@RunWith(Parameterized.class) public class TestAMSimulator { private ResourceManager rm; private YarnConfiguration conf; private Path metricOutputDir; + private Class slsScheduler; + private Class scheduler; + + @Parameterized.Parameters + public static Collection params() { + return Arrays.asList(new Object[][] { + {SLSFairScheduler.class, FairScheduler.class}, + {SLSCapacityScheduler.class, CapacityScheduler.class} + }); + } + + public TestAMSimulator(Class slsScheduler, Class scheduler) { + this.slsScheduler = slsScheduler; + this.scheduler = scheduler; + } + @Before public void setup() { createMetricOutputDir(); conf = new YarnConfiguration(); conf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricOutputDir.toString()); - conf.set(YarnConfiguration.RM_SCHEDULER, - "org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper"); - conf.set(SLSConfiguration.RM_SCHEDULER, - "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + conf.set(YarnConfiguration.RM_SCHEDULER, slsScheduler.getName()); + conf.set(SLSConfiguration.RM_SCHEDULER, scheduler.getName()); conf.setBoolean(SLSConfiguration.METRICS_SWITCH, true); rm = new ResourceManager(); rm.init(conf); @@ -76,15 +95,17 @@ public class TestAMSimulator { } private void verifySchedulerMetrics(String appId) { - SchedulerWrapper schedulerWrapper = (SchedulerWrapper) - rm.getResourceScheduler(); - MetricRegistry metricRegistry = schedulerWrapper.getMetrics(); - for (FairSchedulerMetrics.Metric metric : - FairSchedulerMetrics.Metric.values()) { - String key = "variable.app." + appId + "." + metric.getValue() - + ".memory"; - Assert.assertTrue(metricRegistry.getGauges().containsKey(key)); - Assert.assertNotNull(metricRegistry.getGauges().get(key).getValue()); + if (scheduler.equals(FairScheduler.class)) { + SchedulerMetrics schedulerMetrics = ((SchedulerWrapper) + rm.getResourceScheduler()).getSchedulerMetrics(); + MetricRegistry metricRegistry = schedulerMetrics.getMetrics(); + for (FairSchedulerMetrics.Metric metric : + FairSchedulerMetrics.Metric.values()) { + String key = "variable.app." + appId + "." + metric.getValue() + + ".memory"; + Assert.assertTrue(metricRegistry.getGauges().containsKey(key)); + Assert.assertNotNull(metricRegistry.getGauges().get(key).getValue()); + } } } diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java index f9a393298e..2f10f7dbc2 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java @@ -21,26 +21,50 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; +import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.Collection; + +@RunWith(Parameterized.class) public class TestNMSimulator { private final int GB = 1024; private ResourceManager rm; private YarnConfiguration conf; + private Class slsScheduler; + private Class scheduler; + + @Parameterized.Parameters + public static Collection params() { + return Arrays.asList(new Object[][] { + {SLSFairScheduler.class, FairScheduler.class}, + {SLSCapacityScheduler.class, CapacityScheduler.class} + }); + } + + public TestNMSimulator(Class slsScheduler, Class scheduler) { + this.slsScheduler = slsScheduler; + this.scheduler = scheduler; + } + @Before public void setup() { conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RM_SCHEDULER, - "org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper"); - conf.set(SLSConfiguration.RM_SCHEDULER, - "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + conf.set(YarnConfiguration.RM_SCHEDULER, slsScheduler.getName()); + conf.set(SLSConfiguration.RM_SCHEDULER, scheduler.getName()); conf.setBoolean(SLSConfiguration.METRICS_SWITCH, false); rm = new ResourceManager(); rm.init(conf);