From cc938e99ec0904824c8072184eff75619fcaf040 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Tue, 21 Mar 2017 15:21:11 -0700 Subject: [PATCH] YARN-6326. Shouldn't use AppAttemptIds to fetch applications while AM Simulator tracks app in SLS (yufeigu via rkanter) --- .../yarn/sls/appmaster/AMSimulator.java | 9 +- .../sls/scheduler/FairSchedulerMetrics.java | 314 ++++++++---------- .../scheduler/ResourceSchedulerWrapper.java | 11 +- .../sls/scheduler/SLSCapacityScheduler.java | 9 +- .../yarn/sls/scheduler/SchedulerMetrics.java | 71 ++-- .../yarn/sls/scheduler/SchedulerWrapper.java | 25 +- .../yarn/sls/appmaster/TestAMSimulator.java | 56 +++- 7 files changed, 253 insertions(+), 242 deletions(-) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 5b03d51425..0573bae0ec 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -62,10 +62,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Logger; @@ -335,13 +331,12 @@ public abstract class AMSimulator extends TaskRunner.Task { private void trackApp() { if (isTracked) { ((SchedulerWrapper) rm.getResourceScheduler()) - .addTrackedApp(appAttemptId, oldAppId); + .addTrackedApp(appId, oldAppId); } } public void untrackApp() { if (isTracked) { - ((SchedulerWrapper) rm.getResourceScheduler()) - .removeTrackedApp(appAttemptId, oldAppId); + ((SchedulerWrapper) rm.getResourceScheduler()).removeTrackedApp(oldAppId); } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java index 3b539fa6be..08362b1c60 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java @@ -18,16 +18,17 @@ package org.apache.hadoop.yarn.sls.scheduler; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair - .FSAppAttempt; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; +import org.apache.hadoop.yarn.sls.SLSRunner; import com.codahale.metrics.Gauge; -import org.apache.hadoop.yarn.sls.SLSRunner; @Private @Unstable @@ -37,114 +38,131 @@ public class FairSchedulerMetrics extends SchedulerMetrics { private int totalVCores = Integer.MAX_VALUE; private boolean maxReset = false; + @VisibleForTesting + public enum Metric { + DEMAND("demand"), + USAGE("usage"), + MINSHARE("minshare"), + MAXSHARE("maxshare"), + FAIRSHARE("fairshare"); + + private String value; + + Metric(String value) { + this.value = value; + } + + @VisibleForTesting + public String getValue() { + return value; + } + } + public FairSchedulerMetrics() { super(); - appTrackedMetrics.add("demand.memory"); - appTrackedMetrics.add("demand.vcores"); - appTrackedMetrics.add("usage.memory"); - appTrackedMetrics.add("usage.vcores"); - appTrackedMetrics.add("minshare.memory"); - appTrackedMetrics.add("minshare.vcores"); - appTrackedMetrics.add("maxshare.memory"); - appTrackedMetrics.add("maxshare.vcores"); - appTrackedMetrics.add("fairshare.memory"); - appTrackedMetrics.add("fairshare.vcores"); - queueTrackedMetrics.add("demand.memory"); - queueTrackedMetrics.add("demand.vcores"); - queueTrackedMetrics.add("usage.memory"); - queueTrackedMetrics.add("usage.vcores"); - queueTrackedMetrics.add("minshare.memory"); - queueTrackedMetrics.add("minshare.vcores"); - queueTrackedMetrics.add("maxshare.memory"); - queueTrackedMetrics.add("maxshare.vcores"); - queueTrackedMetrics.add("fairshare.memory"); - queueTrackedMetrics.add("fairshare.vcores"); + + for (Metric metric: Metric.values()) { + appTrackedMetrics.add(metric.value + ".memory"); + appTrackedMetrics.add(metric.value + ".vcores"); + queueTrackedMetrics.add(metric.value + ".memory"); + queueTrackedMetrics.add(metric.value + ".vcores"); + } } - + + private long getMemorySize(Schedulable schedulable, Metric metric) { + if (schedulable != null) { + switch (metric) { + case DEMAND: + return schedulable.getDemand().getMemorySize(); + case USAGE: + return schedulable.getResourceUsage().getMemorySize(); + case MINSHARE: + return schedulable.getMinShare().getMemorySize(); + case MAXSHARE: + return schedulable.getMaxShare().getMemorySize(); + case FAIRSHARE: + return schedulable.getFairShare().getMemorySize(); + default: + return 0L; + } + } + + return 0L; + } + + private int getVirtualCores(Schedulable schedulable, Metric metric) { + if (schedulable != null) { + switch (metric) { + case DEMAND: + return schedulable.getDemand().getVirtualCores(); + case USAGE: + return schedulable.getResourceUsage().getVirtualCores(); + case MINSHARE: + return schedulable.getMinShare().getVirtualCores(); + case MAXSHARE: + return schedulable.getMaxShare().getVirtualCores(); + case FAIRSHARE: + return schedulable.getFairShare().getVirtualCores(); + default: + return 0; + } + } + + return 0; + } + + private void registerAppMetrics(ApplicationId appId, String oldAppId, + Metric metric) { + metrics.register( + "variable.app." + oldAppId + "." + metric.value + ".memory", + new Gauge() { + @Override + public Long getValue() { + return getMemorySize((FSAppAttempt)getSchedulerAppAttempt(appId), + metric); + } + } + ); + + metrics.register( + "variable.app." + oldAppId + "." + metric.value + ".vcores", + new Gauge() { + @Override + public Integer getValue() { + return getVirtualCores((FSAppAttempt)getSchedulerAppAttempt(appId), + metric); + } + } + ); + } + @Override - public void trackApp(ApplicationAttemptId appAttemptId, String oldAppId) { - super.trackApp(appAttemptId, oldAppId); - FairScheduler fair = (FairScheduler) scheduler; - final FSAppAttempt app = fair.getSchedulerApp(appAttemptId); - metrics.register("variable.app." + oldAppId + ".demand.memory", - new Gauge() { - @Override - public Long getValue() { - return app.getDemand().getMemorySize(); + public void trackApp(ApplicationId appId, String oldAppId) { + super.trackApp(appId, oldAppId); + + for (Metric metric: Metric.values()) { + registerAppMetrics(appId, oldAppId, metric); + } + } + + private void registerQueueMetrics(FSQueue queue, Metric metric) { + metrics.register( + "variable.queue." + queue.getName() + "." + metric.value + ".memory", + new Gauge() { + @Override + public Long getValue() { + return getMemorySize(queue, metric); + } } - } ); - metrics.register("variable.app." + oldAppId + ".demand.vcores", - new Gauge() { - @Override - public Integer getValue() { - return app.getDemand().getVirtualCores(); + metrics.register( + "variable.queue." + queue.getName() + "." + metric.value + ".vcores", + new Gauge() { + @Override + public Integer getValue() { + return getVirtualCores(queue, metric); + } } - } - ); - metrics.register("variable.app." + oldAppId + ".usage.memory", - new Gauge() { - @Override - public Long getValue() { - return app.getResourceUsage().getMemorySize(); - } - } - ); - metrics.register("variable.app." + oldAppId + ".usage.vcores", - new Gauge() { - @Override - public Integer getValue() { - return app.getResourceUsage().getVirtualCores(); - } - } - ); - metrics.register("variable.app." + oldAppId + ".minshare.memory", - new Gauge() { - @Override - public Long getValue() { - return app.getMinShare().getMemorySize(); - } - } - ); - metrics.register("variable.app." + oldAppId + ".minshare.vcores", - new Gauge() { - @Override - public Long getValue() { - return app.getMinShare().getMemorySize(); - } - } - ); - metrics.register("variable.app." + oldAppId + ".maxshare.memory", - new Gauge() { - @Override - public Long getValue() { - return Math.min(app.getMaxShare().getMemorySize(), totalMemoryMB); - } - } - ); - metrics.register("variable.app." + oldAppId + ".maxshare.vcores", - new Gauge() { - @Override - public Integer getValue() { - return Math.min(app.getMaxShare().getVirtualCores(), totalVCores); - } - } - ); - metrics.register("variable.app." + oldAppId + ".fairshare.memory", - new Gauge() { - @Override - public Integer getValue() { - return app.getFairShare().getVirtualCores(); - } - } - ); - metrics.register("variable.app." + oldAppId + ".fairshare.vcores", - new Gauge() { - @Override - public Integer getValue() { - return app.getFairShare().getVirtualCores(); - } - } ); } @@ -153,54 +171,11 @@ public class FairSchedulerMetrics extends SchedulerMetrics { trackedQueues.add(queueName); FairScheduler fair = (FairScheduler) scheduler; final FSQueue queue = fair.getQueueManager().getQueue(queueName); - metrics.register("variable.queue." + queueName + ".demand.memory", - new Gauge() { - @Override - public Long getValue() { - return queue.getDemand().getMemorySize(); - } - } - ); - metrics.register("variable.queue." + queueName + ".demand.vcores", - new Gauge() { - @Override - public Integer getValue() { - return queue.getDemand().getVirtualCores(); - } - } - ); - metrics.register("variable.queue." + queueName + ".usage.memory", - new Gauge() { - @Override - public Long getValue() { - return queue.getResourceUsage().getMemorySize(); - } - } - ); - metrics.register("variable.queue." + queueName + ".usage.vcores", - new Gauge() { - @Override - public Integer getValue() { - return queue.getResourceUsage().getVirtualCores(); - } - } - ); - metrics.register("variable.queue." + queueName + ".minshare.memory", - new Gauge() { - @Override - public Long getValue() { - return queue.getMinShare().getMemorySize(); - } - } - ); - metrics.register("variable.queue." + queueName + ".minshare.vcores", - new Gauge() { - @Override - public Integer getValue() { - return queue.getMinShare().getVirtualCores(); - } - } - ); + registerQueueMetrics(queue, Metric.DEMAND); + registerQueueMetrics(queue, Metric.USAGE); + registerQueueMetrics(queue, Metric.MINSHARE); + registerQueueMetrics(queue, Metric.FAIRSHARE); + metrics.register("variable.queue." + queueName + ".maxshare.memory", new Gauge() { @Override @@ -233,36 +208,17 @@ public class FairSchedulerMetrics extends SchedulerMetrics { } } ); - metrics.register("variable.queue." + queueName + ".fairshare.memory", - new Gauge() { - @Override - public Long getValue() { - return queue.getFairShare().getMemorySize(); - } - } - ); - metrics.register("variable.queue." + queueName + ".fairshare.vcores", - new Gauge() { - @Override - public Integer getValue() { - return queue.getFairShare().getVirtualCores(); - } - } - ); } @Override public void untrackQueue(String queueName) { trackedQueues.remove(queueName); - metrics.remove("variable.queue." + queueName + ".demand.memory"); - metrics.remove("variable.queue." + queueName + ".demand.vcores"); - metrics.remove("variable.queue." + queueName + ".usage.memory"); - metrics.remove("variable.queue." + queueName + ".usage.vcores"); - metrics.remove("variable.queue." + queueName + ".minshare.memory"); - metrics.remove("variable.queue." + queueName + ".minshare.vcores"); - metrics.remove("variable.queue." + queueName + ".maxshare.memory"); - metrics.remove("variable.queue." + queueName + ".maxshare.vcores"); - metrics.remove("variable.queue." + queueName + ".fairshare.memory"); - metrics.remove("variable.queue." + queueName + ".fairshare.vcores"); + + for (Metric metric: Metric.values()) { + metrics.remove("variable.queue." + queueName + "." + + metric.value + ".memory"); + metrics.remove("variable.queue." + queueName + "." + + metric.value + ".vcores"); + } } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index df8323a081..a4b8e642ba 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -70,7 +70,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -793,17 +792,15 @@ final public class ResourceSchedulerWrapper } // API open to out classes - public void addTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId) { + public void addTrackedApp(ApplicationId appId, String oldAppId) { if (metricsON) { - schedulerMetrics.trackApp(appAttemptId, oldAppId); + schedulerMetrics.trackApp(appId, oldAppId); } } - public void removeTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId) { + public void removeTrackedApp(String oldAppId) { if (metricsON) { - schedulerMetrics.untrackApp(appAttemptId, oldAppId); + schedulerMetrics.untrackApp(oldAppId); } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index cd4377e1ee..6ea2ab0f1e 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -839,17 +839,16 @@ public class SLSCapacityScheduler extends CapacityScheduler implements } // API open to out classes - public void addTrackedApp(ApplicationAttemptId appAttemptId, + public void addTrackedApp(ApplicationId appId, String oldAppId) { if (metricsON) { - schedulerMetrics.trackApp(appAttemptId, oldAppId); + schedulerMetrics.trackApp(appId, oldAppId); } } - public void removeTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId) { + public void removeTrackedApp(String oldAppId) { if (metricsON) { - schedulerMetrics.untrackApp(appAttemptId, oldAppId); + schedulerMetrics.untrackApp(oldAppId); } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java index ecf516d7c9..8645a697c1 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java @@ -23,11 +23,11 @@ import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .SchedulerAppReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; @@ -42,42 +42,61 @@ public abstract class SchedulerMetrics { protected Set queueTrackedMetrics; public SchedulerMetrics() { - appTrackedMetrics = new HashSet(); + appTrackedMetrics = new HashSet<>(); appTrackedMetrics.add("live.containers"); appTrackedMetrics.add("reserved.containers"); - queueTrackedMetrics = new HashSet(); + queueTrackedMetrics = new HashSet<>(); } public void init(ResourceScheduler scheduler, MetricRegistry metrics) { this.scheduler = scheduler; - this.trackedQueues = new HashSet(); + this.trackedQueues = new HashSet<>(); this.metrics = metrics; } - - public void trackApp(final ApplicationAttemptId appAttemptId, - String oldAppId) { + + protected SchedulerApplicationAttempt getSchedulerAppAttempt( + ApplicationId appId) { + AbstractYarnScheduler yarnScheduler = (AbstractYarnScheduler)scheduler; + SchedulerApplication app = (SchedulerApplication)yarnScheduler + .getSchedulerApplications().get(appId); + if (app == null) { + return null; + } + return app.getCurrentAppAttempt(); + } + + public void trackApp(final ApplicationId appId, String oldAppId) { metrics.register("variable.app." + oldAppId + ".live.containers", - new Gauge() { - @Override - public Integer getValue() { - SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId); - return app.getLiveContainers().size(); + new Gauge() { + @Override + public Integer getValue() { + SchedulerApplicationAttempt appAttempt = + getSchedulerAppAttempt(appId); + if (appAttempt != null) { + return appAttempt.getLiveContainers().size(); + } else { + return 0; + } + } } - } ); metrics.register("variable.app." + oldAppId + ".reserved.containers", - new Gauge() { - @Override - public Integer getValue() { - SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId); - return app.getReservedContainers().size(); + new Gauge() { + @Override + public Integer getValue() { + SchedulerApplicationAttempt appAttempt = + getSchedulerAppAttempt(appId); + if (appAttempt != null) { + return appAttempt.getReservedContainers().size(); + } else { + return 0; + } + } } - } ); } - - public void untrackApp(ApplicationAttemptId appAttemptId, - String oldAppId) { + + public void untrackApp(String oldAppId) { for (String m : appTrackedMetrics) { metrics.remove("variable.app." + oldAppId + "." + m); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java index 524b8bf23e..962b137a7a 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java @@ -21,7 +21,6 @@ import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import com.codahale.metrics.MetricRegistry; @@ -30,18 +29,16 @@ import com.codahale.metrics.MetricRegistry; @Unstable public interface SchedulerWrapper { - public MetricRegistry getMetrics(); - public SchedulerMetrics getSchedulerMetrics(); - public Set getQueueSet(); - public void setQueueSet(Set queues); - public Set getTrackedAppSet(); - public void setTrackedAppSet(Set apps); - public void addTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId); - public void removeTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId); - public void addAMRuntime(ApplicationId appId, - long traceStartTimeMS, long traceEndTimeMS, - long simulateStartTimeMS, long simulateEndTimeMS); + MetricRegistry getMetrics(); + SchedulerMetrics getSchedulerMetrics(); + Set getQueueSet(); + void setQueueSet(Set queues); + Set getTrackedAppSet(); + void setTrackedAppSet(Set apps); + void addTrackedApp(ApplicationId appId, String oldAppId); + void removeTrackedApp(String oldAppId); + void addAMRuntime(ApplicationId appId, + long traceStartTimeMS, long traceEndTimeMS, + long simulateStartTimeMS, long simulateEndTimeMS); } diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index 83482c3368..f0d8e6f507 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -17,32 +17,43 @@ */ package org.apache.hadoop.yarn.sls.appmaster; +import com.codahale.metrics.MetricRegistry; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import org.apache.hadoop.yarn.sls.scheduler.FairSchedulerMetrics; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; public class TestAMSimulator { private ResourceManager rm; private YarnConfiguration conf; + private Path metricOutputDir; @Before public void setup() { + createMetricOutputDir(); + conf = new YarnConfiguration(); + conf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricOutputDir.toString()); conf.set(YarnConfiguration.RM_SCHEDULER, "org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper"); conf.set(SLSConfiguration.RM_SCHEDULER, "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); - conf.setBoolean(SLSConfiguration.METRICS_SWITCH, false); + conf.setBoolean(SLSConfiguration.METRICS_SWITCH, true); rm = new ResourceManager(); rm.init(conf); rm.start(); @@ -64,14 +75,49 @@ public class TestAMSimulator { } } + private void verifySchedulerMetrics(String appId) { + SchedulerWrapper schedulerWrapper = (SchedulerWrapper) + rm.getResourceScheduler(); + MetricRegistry metricRegistry = schedulerWrapper.getMetrics(); + for (FairSchedulerMetrics.Metric metric : + FairSchedulerMetrics.Metric.values()) { + String key = "variable.app." + appId + "." + metric.getValue() + + ".memory"; + Assert.assertTrue(metricRegistry.getGauges().containsKey(key)); + Assert.assertNotNull(metricRegistry.getGauges().get(key).getValue()); + } + } + + private void createMetricOutputDir() { + Path testDir = Paths.get(System.getProperty("test.build.data")); + try { + metricOutputDir = Files.createTempDirectory(testDir, "output"); + } catch (IOException e) { + Assert.fail(e.toString()); + } + } + + private void deleteMetricOutputDir() { + try { + FileUtils.deleteDirectory(metricOutputDir.toFile()); + } catch (IOException e) { + Assert.fail(e.toString()); + } + } + @Test public void testAMSimulator() throws Exception { // Register one app MockAMSimulator app = new MockAMSimulator(); - List containers = new ArrayList(); - app.init(1, 1000, containers, rm, null, 0, 1000000l, "user1", "default", - false, "app1"); + String appId = "app1"; + String queue = "default"; + List containers = new ArrayList<>(); + app.init(1, 1000, containers, rm, null, 0, 1000000L, "user1", queue, + true, appId); app.firstStep(); + + verifySchedulerMetrics(appId); + Assert.assertEquals(1, rm.getRMContext().getRMApps().size()); Assert.assertNotNull(rm.getRMContext().getRMApps().get(app.appId)); @@ -82,5 +128,7 @@ public class TestAMSimulator { @After public void tearDown() { rm.stop(); + + deleteMetricOutputDir(); } } \ No newline at end of file