From 08a77a765ba635da1cc44f36b103116605a517ee Mon Sep 17 00:00:00 2001 From: 9uapaw Date: Fri, 25 Mar 2022 18:48:56 +0100 Subject: [PATCH] YARN-10548. Decouple AM runner logic from SLSRunner. Contributed by Szilard Nemeth. --- .../org/apache/hadoop/yarn/sls/AMRunner.java | 297 ++++++++++++++++++ .../org/apache/hadoop/yarn/sls/SLSRunner.java | 250 +++------------ .../sls/resourcemanager/MockAMLauncher.java | 17 +- 3 files changed, 354 insertions(+), 210 deletions(-) create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java new file mode 100644 index 0000000000..da95c687ee --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.rumen.JobTraceReader; +import org.apache.hadoop.tools.rumen.LoggedJob; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.SLSRunner.TraceType; +import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.synthetic.SynthJob; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; +import org.apache.hadoop.yarn.util.UTCClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + + +public class AMRunner { + private static final Logger LOG = LoggerFactory.getLogger(AMRunner.class); + static int REMAINING_APPS = 0; + + private final Configuration conf; + private int AM_ID; + private Map amMap; + private Map appIdAMSim; + private Set trackedApps; + private Map amClassMap; + private TraceType inputType; + private String[] inputTraces; + private SynthTraceJobProducer stjp; + private TaskRunner runner; + private SLSRunner slsRunner; + private int numAMs, numTasks; + private long maxRuntime; + private ResourceManager rm; + + public AMRunner(TaskRunner runner, SLSRunner slsRunner) { + this.runner = runner; + this.slsRunner = slsRunner; + this.conf = slsRunner.getConf(); + } + + + public void init(Configuration conf) throws ClassNotFoundException { + amMap = new ConcurrentHashMap<>(); + amClassMap = new HashMap<>(); + appIdAMSim = new ConcurrentHashMap<>(); + // map + for (Map.Entry e : conf) { + String key = e.getKey().toString(); + if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) { + String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length()); + amClassMap.put(amType, Class.forName(conf.get(key))); + } + } + } + + public void startAM() throws YarnException, IOException { + switch (inputType) { + case SLS: + for (String inputTrace : inputTraces) { + startAMFromSLSTrace(inputTrace); + } + break; + case RUMEN: + long baselineTimeMS = 0; + for (String inputTrace : inputTraces) { + startAMFromRumenTrace(inputTrace, baselineTimeMS); + } + break; + case SYNTH: + startAMFromSynthGenerator(); + break; + default: + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); + } + + numAMs = amMap.size(); + REMAINING_APPS = numAMs; + } + + /** + * Parse workload from a SLS trace file. + */ + private void startAMFromSLSTrace(String inputTrace) throws IOException { + JsonFactory jsonF = new JsonFactory(); + ObjectMapper mapper = new ObjectMapper(); + + try (Reader input = new InputStreamReader( + new FileInputStream(inputTrace), StandardCharsets.UTF_8)) { + JavaType type = mapper.getTypeFactory(). + constructMapType(Map.class, String.class, String.class); + Iterator> jobIter = mapper.readValues( + jsonF.createParser(input), type); + + while (jobIter.hasNext()) { + try { + Map jsonJob = jobIter.next(); + AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(jsonJob, slsRunner); + startAMs(amDef); + } catch (Exception e) { + LOG.error("Failed to create an AM: {}", e.getMessage()); + } + } + } + } + + /** + * parse workload information from synth-generator trace files. + */ + private void startAMFromSynthGenerator() throws YarnException, IOException { + Configuration localConf = new Configuration(); + localConf.set("fs.defaultFS", "file:///"); + // if we use the nodeFile this could have been not initialized yet. + if (stjp == null) { + stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0])); + } + + SynthJob job; + // we use stjp, a reference to the job producer instantiated during node + // creation + while ((job = (SynthJob) stjp.getNextJob()) != null) { + ReservationId reservationId = null; + if (job.hasDeadline()) { + reservationId = ReservationId + .newInstance(rm.getStartTime(), AM_ID); + } + AMDefinitionSynth amDef = AMDefinitionFactory.createFromSynth(job, slsRunner); + startAMs(amDef, reservationId, job.getParams(), job.getDeadline()); + } + } + + /** + * Parse workload from a rumen trace file. + */ + private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS) + throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", "file:///"); + File fin = new File(inputTrace); + + try (JobTraceReader reader = new JobTraceReader( + new Path(fin.getAbsolutePath()), conf)) { + LoggedJob job = reader.getNext(); + + while (job != null) { + try { + AMDefinitionRumen amDef = + AMDefinitionFactory.createFromRumenTrace(job, baselineTimeMS, + slsRunner); + startAMs(amDef); + } catch (Exception e) { + LOG.error("Failed to create an AM", e); + } + job = reader.getNext(); + } + } + } + + private void startAMs(AMDefinition amDef) { + for (int i = 0; i < amDef.getJobCount(); i++) { + JobDefinition jobDef = JobDefinition.Builder.create() + .withAmDefinition(amDef) + .withDeadline(-1) + .withReservationId(null) + .withParams(null) + .build(); + runNewAM(jobDef); + } + } + + private void startAMs(AMDefinition amDef, + ReservationId reservationId, + Map params, long deadline) { + for (int i = 0; i < amDef.getJobCount(); i++) { + JobDefinition jobDef = JobDefinition.Builder.create() + .withAmDefinition(amDef) + .withReservationId(reservationId) + .withParams(params) + .withDeadline(deadline) + .build(); + runNewAM(jobDef); + } + } + + private void runNewAM(JobDefinition jobDef) { + AMDefinition amDef = jobDef.getAmDefinition(); + String oldJobId = amDef.getOldAppId(); + AMSimulator amSim = + createAmSimulator(amDef.getAmType()); + + if (amSim != null) { + int heartbeatInterval = conf.getInt( + SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, + SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); + boolean isTracked = trackedApps.contains(oldJobId); + + if (oldJobId == null) { + oldJobId = Integer.toString(AM_ID); + } + AM_ID++; + amSim.init(amDef, rm, slsRunner, isTracked, runner.getStartTimeMS(), heartbeatInterval, appIdAMSim); + if (jobDef.getReservationId() != null) { + // if we have a ReservationId, delegate reservation creation to + // AMSim (reservation shape is impl specific) + UTCClock clock = new UTCClock(); + amSim.initReservation(jobDef.getReservationId(), jobDef.getDeadline(), clock.getTime()); + } + runner.schedule(amSim); + maxRuntime = Math.max(maxRuntime, amDef.getJobFinishTime()); + numTasks += amDef.getTaskContainers().size(); + amMap.put(oldJobId, amSim); + } + } + + private AMSimulator createAmSimulator(String jobType) { + return (AMSimulator) ReflectionUtils.newInstance( + amClassMap.get(jobType), new Configuration()); + } + + public AMSimulator getAMSimulator(ApplicationId appId) { + return appIdAMSim.get(appId); + } + + public void setInputType(TraceType inputType) { + this.inputType = inputType; + } + + public void setInputTraces(String[] inputTraces) { + this.inputTraces = inputTraces; + } + + public void setResourceManager(ResourceManager rm) { + this.rm = rm; + } + + public Set getTrackedApps() { + return trackedApps; + } + + public void setTrackedApps(Set trackApps) { + this.trackedApps = trackApps; + } + + public int getNumAMs() { + return numAMs; + } + + public int getNumTasks() { + return numTasks; + } + + public long getMaxRuntime() { + return maxRuntime; + } + + public Map getAmMap() { + return amMap; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 260a600137..48ad610310 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -58,16 +58,12 @@ import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.TableMapping; -import org.apache.hadoop.tools.rumen.JobTraceReader; -import org.apache.hadoop.tools.rumen.LoggedJob; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -84,19 +80,32 @@ import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; -import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; -import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; -import org.apache.hadoop.yarn.sls.synthetic.SynthJob; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; import org.apache.hadoop.yarn.sls.utils.SLSUtils; -import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.security.Security; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + @Private @Unstable public class SLSRunner extends Configured implements Tool { @@ -112,21 +121,12 @@ public class SLSRunner extends Configured implements Tool { private Resource nodeManagerResource; private String nodeFile; - // AM simulator - private int AM_ID; - private Map amMap; - private Map appIdAMSim; - private Set trackedApps; - private Map amClassMap; - private static int remainingApps = 0; - // metrics private String metricsOutputDir; private boolean printSimulation; // other simulation information - private int numNMs, numRacks, numAMs, numTasks; - private long maxRuntime; + private int numNMs, numRacks; private String tableMapping; private final static Map simulateInfoMap = new HashMap<>(); @@ -135,6 +135,7 @@ public class SLSRunner extends Configured implements Tool { public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class); private static boolean exitAtTheFinish = false; + private AMRunner amRunner; /** * The type of trace in input. @@ -151,7 +152,7 @@ public enum TraceType { private SynthTraceJobProducer stjp; public static int getRemainingApps() { - return remainingApps; + return AMRunner.REMAINING_APPS; } public SLSRunner() throws ClassNotFoundException { @@ -176,9 +177,7 @@ public void setConf(Configuration conf) { private void init(Configuration tempConf) throws ClassNotFoundException { nmMap = new ConcurrentHashMap<>(); queueAppNumMap = new HashMap<>(); - amMap = new ConcurrentHashMap<>(); - amClassMap = new HashMap<>(); - appIdAMSim = new ConcurrentHashMap<>(); + amRunner = new AMRunner(runner, this); // runner configuration setConf(tempConf); @@ -186,15 +185,8 @@ private void init(Configuration tempConf) throws ClassNotFoundException { poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); SLSRunner.runner.setQueueSize(poolSize); - // map - for (Map.Entry e : tempConf) { - String key = e.getKey().toString(); - if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) { - String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length()); - amClassMap.put(amType, Class.forName(tempConf.get(key))); - } - } + amRunner.init(tempConf); nodeManagerResource = getNodeManagerResource(); } @@ -227,14 +219,25 @@ public static Map getSimulateInfoMap() { return Collections.unmodifiableMap(simulateInfoMap); } + /** + * This is invoked before start. + * @param inType + * @param inTraces + * @param nodes + * @param outDir + * @param trackApps + * @param printsimulation + */ public void setSimulationParams(TraceType inType, String[] inTraces, String nodes, String outDir, Set trackApps, boolean printsimulation) { this.inputType = inType; this.inputTraces = inTraces.clone(); + this.amRunner.setInputType(this.inputType); + this.amRunner.setInputTraces(this.inputTraces); + this.amRunner.setTrackedApps(trackApps); this.nodeFile = nodes; - this.trackedApps = trackApps; this.printSimulation = printsimulation; metricsOutputDir = outDir; tableMapping = outDir + "/tableMapping.csv"; @@ -247,15 +250,16 @@ public void start() throws IOException, ClassNotFoundException, YarnException, // start resource manager startRM(); + amRunner.setResourceManager(rm); // start node managers startNM(); // start application masters - startAM(); + amRunner.startAM(); // set queue & tracked apps information ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() .setQueueSet(this.queueAppNumMap.keySet()); ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() - .setTrackedAppSet(this.trackedApps); + .setTrackedAppSet(amRunner.getTrackedApps()); // print out simulation info printSimulationInfo(); // blocked until all nodes RUNNING @@ -310,7 +314,7 @@ private void startRM() throws ClassNotFoundException, YarnException { rm = new ResourceManager() { @Override protected ApplicationMasterLauncher createAMLauncher() { - return new MockAMLauncher(se, this.rmContext, appIdAMSim); + return new MockAMLauncher(se, this.rmContext); } }; @@ -422,109 +426,6 @@ private void waitForNodesRunning() throws InterruptedException { System.currentTimeMillis() - startTimeMS); } - private void startAM() throws YarnException, IOException { - switch (inputType) { - case SLS: - for (String inputTrace : inputTraces) { - startAMFromSLSTrace(inputTrace); - } - break; - case RUMEN: - long baselineTimeMS = 0; - for (String inputTrace : inputTraces) { - startAMFromRumenTrace(inputTrace, baselineTimeMS); - } - break; - case SYNTH: - startAMFromSynthGenerator(); - break; - default: - throw new YarnException("Input configuration not recognized, " - + "trace type should be SLS, RUMEN, or SYNTH"); - } - - numAMs = amMap.size(); - remainingApps = numAMs; - } - - /** - * Parse workload from a SLS trace file. - */ - private void startAMFromSLSTrace(String inputTrace) throws IOException { - JsonFactory jsonF = new JsonFactory(); - ObjectMapper mapper = new ObjectMapper(); - - try (Reader input = new InputStreamReader( - new FileInputStream(inputTrace), StandardCharsets.UTF_8)) { - JavaType type = mapper.getTypeFactory(). - constructMapType(Map.class, String.class, String.class); - Iterator> jobIter = mapper.readValues( - jsonF.createParser(input), type); - - while (jobIter.hasNext()) { - try { - Map jsonJob = jobIter.next(); - AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(jsonJob, this); - startAMs(amDef); - } catch (Exception e) { - LOG.error("Failed to create an AM: {}", e.getMessage()); - } - } - } - } - - private void startAMs(AMDefinition amDef) { - for (int i = 0; i < amDef.getJobCount(); i++) { - JobDefinition jobDef = JobDefinition.Builder.create() - .withAmDefinition(amDef) - .withDeadline(-1) - .withReservationId(null) - .withParams(null) - .build(); - runNewAM(jobDef); - } - } - - private void startAMs(AMDefinition amDef, ReservationId reservationId, - Map params, long deadline) { - for (int i = 0; i < amDef.getJobCount(); i++) { - JobDefinition jobDef = JobDefinition.Builder.create() - .withAmDefinition(amDef) - .withReservationId(reservationId) - .withParams(params) - .withDeadline(deadline) - .build(); - runNewAM(jobDef); - } - } - - /** - * Parse workload from a rumen trace file. - */ - private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS) - throws IOException { - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", "file:///"); - File fin = new File(inputTrace); - - try (JobTraceReader reader = new JobTraceReader( - new Path(fin.getAbsolutePath()), conf)) { - LoggedJob job = reader.getNext(); - - while (job != null) { - try { - AMDefinitionRumen amDef = - AMDefinitionFactory.createFromRumenTrace(job, baselineTimeMS, - this); - startAMs(amDef); - } catch (Exception e) { - LOG.error("Failed to create an AM", e); - } - job = reader.getNext(); - } - } - } - Resource getDefaultContainerResource() { int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB, SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); @@ -533,31 +434,6 @@ Resource getDefaultContainerResource() { return Resources.createResource(containerMemory, containerVCores); } - /** - * parse workload information from synth-generator trace files. - */ - private void startAMFromSynthGenerator() throws YarnException, IOException { - Configuration localConf = new Configuration(); - localConf.set("fs.defaultFS", "file:///"); - // if we use the nodeFile this could have been not initialized yet. - if (stjp == null) { - stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); - } - - SynthJob job; - // we use stjp, a reference to the job producer instantiated during node - // creation - while ((job = (SynthJob) stjp.getNextJob()) != null) { - ReservationId reservationId = null; - if (job.hasDeadline()) { - reservationId = ReservationId - .newInstance(rm.getStartTime(), AM_ID); - } - AMDefinitionSynth amDef = AMDefinitionFactory.createFromSynth(job, this); - startAMs(amDef, reservationId, job.getParams(), job.getDeadline()); - } - } - void increaseQueueAppNum(String queue) throws YarnException { SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler(); String queueName = wrapper.getRealQueueName(queue); @@ -575,43 +451,12 @@ void increaseQueueAppNum(String queue) throws YarnException { } } - private AMSimulator createAmSimulator(String jobType) { - return (AMSimulator) ReflectionUtils.newInstance( - amClassMap.get(jobType), new Configuration()); - } - - private void runNewAM(JobDefinition jobDef) { - AMDefinition amDef = jobDef.getAmDefinition(); - String oldJobId = amDef.getOldAppId(); - AMSimulator amSim = - createAmSimulator(amDef.getAmType()); - - if (amSim != null) { - int heartbeatInterval = getConf().getInt( - SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, - SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); - boolean isTracked = trackedApps.contains(oldJobId); - - if (oldJobId == null) { - oldJobId = Integer.toString(AM_ID); - } - AM_ID++; - amSim.init(amDef, rm, this, isTracked, runner.getStartTimeMS(), heartbeatInterval, appIdAMSim); - if (jobDef.getReservationId() != null) { - // if we have a ReservationId, delegate reservation creation to - // AMSim (reservation shape is impl specific) - UTCClock clock = new UTCClock(); - amSim.initReservation(jobDef.getReservationId(), jobDef.getDeadline(), - clock.getTime()); - } - runner.schedule(amSim); - maxRuntime = Math.max(maxRuntime, amDef.getJobFinishTime()); - numTasks += amDef.getTaskContainers().size(); - amMap.put(oldJobId, amSim); - } - } - private void printSimulationInfo() { + final int numAMs = amRunner.getNumAMs(); + final int numTasks = amRunner.getNumTasks(); + final long maxRuntime = amRunner.getMaxRuntime(); + Map amMap = amRunner.getAmMap(); + if (printSimulation) { // node LOG.info("------------------------------------"); @@ -663,7 +508,10 @@ public Map getNmMap() { } public static void decreaseRemainingApps() { - remainingApps--; + AMRunner.REMAINING_APPS--; + if (AMRunner.REMAINING_APPS == 0) { + exitSLSRunner(); + } } public static void exitSLSRunner() { @@ -854,4 +702,8 @@ public ResourceManager getRm() { public SynthTraceJobProducer getStjp() { return stjp; } + + public AMSimulator getAMSimulatorByAppId(ApplicationId appId) { + return amRunner.getAMSimulator(appId); + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java index d28407669c..e46dea521c 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java @@ -44,15 +44,11 @@ public class MockAMLauncher extends ApplicationMasterLauncher private static final Logger LOG = LoggerFactory.getLogger( MockAMLauncher.class); - private Map appIdAMSim; + private SLSRunner slsRunner; - SLSRunner se; - - public MockAMLauncher(SLSRunner se, RMContext rmContext, - Map appIdAMSim) { + public MockAMLauncher(SLSRunner slsRunner, RMContext rmContext) { super(rmContext); - this.appIdAMSim = appIdAMSim; - this.se = se; + this.slsRunner = slsRunner; } @Override @@ -79,12 +75,11 @@ private void setupAMRMToken(RMAppAttempt appAttempt) { } @Override - @SuppressWarnings("unchecked") public void handle(AMLauncherEvent event) { ApplicationId appId = event.getAppAttempt().getAppAttemptId().getApplicationId(); // find AMSimulator - AMSimulator ams = appIdAMSim.get(appId); + AMSimulator ams = slsRunner.getAMSimulatorByAppId(appId); if (ams == null) { throw new YarnRuntimeException( "Didn't find any AMSimulator for applicationId=" + appId); @@ -103,7 +98,7 @@ public void handle(AMLauncherEvent event) { event.getAppAttempt().getMasterContainer()); LOG.info("Notify AM launcher launched:" + amContainer.getId()); - se.getNmMap().get(amContainer.getNodeId()) + slsRunner.getNmMap().get(amContainer.getNodeId()) .addNewContainer(amContainer, -1, appId); ams.getRanNodes().add(amContainer.getNodeId()); return; @@ -111,7 +106,7 @@ public void handle(AMLauncherEvent event) { throw new YarnRuntimeException(e); } case CLEANUP: - se.getNmMap().get(amContainer.getNodeId()) + slsRunner.getNmMap().get(amContainer.getNodeId()) .cleanupContainer(amContainer.getId()); break; default: