From 475f933b41276b1bdeeec09e30369120f7eccdb8 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Tue, 25 Apr 2017 16:25:56 -0700 Subject: [PATCH] YARN-6423. Queue metrics doesn't work for Fair Scheduler in SLS (yufeigu via rkanter) --- .../org/apache/hadoop/yarn/sls/SLSRunner.java | 404 +++++++++--------- .../sls/scheduler/SLSCapacityScheduler.java | 9 + .../yarn/sls/scheduler/SLSFairScheduler.java | 9 + .../yarn/sls/scheduler/SchedulerWrapper.java | 4 + .../hadoop/yarn/sls/utils/SLSUtils.java | 2 +- 5 files changed, 226 insertions(+), 202 deletions(-) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 523d22a90e..9d35d1b228 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -69,12 +69,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; -import org.apache.hadoop.yarn.sls.scheduler.*; +import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; import org.apache.hadoop.yarn.sls.synthetic.SynthJob; import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; import org.apache.hadoop.yarn.sls.utils.SLSUtils; @@ -197,10 +200,6 @@ private void startRM() throws ClassNotFoundException, YarnException { Configuration rmConf = new YarnConfiguration(getConf()); String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); - // For CapacityScheduler we use a sub-classing instead of wrapping - // to allow scheduler-specific invocations from monitors to work - // this can be used for other schedulers as well if we care to - // exercise/track behaviors that are not common to the scheduler api if (Class.forName(schedulerClass) == CapacityScheduler.class) { rmConf.set(YarnConfiguration.RM_SCHEDULER, SLSCapacityScheduler.class.getName()); @@ -300,226 +299,207 @@ private void waitForNodesRunning() throws InterruptedException { @SuppressWarnings("unchecked") private void startAM() throws YarnException, IOException { - // application/container configuration - int heartbeatInterval = - getConf().getInt(SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, - SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); - int containerMemoryMB = - getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB, - SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); - int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES, - SLSConfiguration.CONTAINER_VCORES_DEFAULT); - Resource containerResource = - BuilderUtils.newResource(containerMemoryMB, containerVCores); - - // application workload switch (inputType) { case SLS: - startAMFromSLSTraces(containerResource, heartbeatInterval); + for (String inputTrace : inputTraces) { + startAMFromSLSTrace(inputTrace); + } break; case RUMEN: - startAMFromRumenTraces(containerResource, heartbeatInterval); + long baselineTimeMS = 0; + for (String inputTrace : inputTraces) { + startAMFromRumenTrace(inputTrace, baselineTimeMS); + } break; case SYNTH: - startAMFromSynthGenerator(heartbeatInterval); + startAMFromSynthGenerator(); break; default: throw new YarnException("Input configuration not recognized, " + "trace type should be SLS, RUMEN, or SYNTH"); } + numAMs = amMap.size(); remainingApps = numAMs; } /** - * parse workload information from sls trace files + * Parse workload from a SLS trace file. */ @SuppressWarnings("unchecked") - private void startAMFromSLSTraces(Resource containerResource, - int heartbeatInterval) throws IOException { - // parse from sls traces + private void startAMFromSLSTrace(String inputTrace) throws IOException { JsonFactory jsonF = new JsonFactory(); ObjectMapper mapper = new ObjectMapper(); - for (String inputTrace : inputTraces) { - Reader input = - new InputStreamReader(new FileInputStream(inputTrace), "UTF-8"); - try { - Iterator i = - mapper.readValues(jsonF.createParser(input), Map.class); - while (i.hasNext()) { - Map jsonJob = i.next(); - // load job information - long jobStartTime = - Long.parseLong(jsonJob.get("job.start.ms").toString()); - long jobFinishTime = - Long.parseLong(jsonJob.get("job.end.ms").toString()); + try (Reader input = new InputStreamReader( + new FileInputStream(inputTrace), "UTF-8")) { + Iterator jobIter = mapper.readValues( + jsonF.createParser(input), Map.class); - String user = (String) jsonJob.get("job.user"); - if (user == null) { - user = "default"; - } - String queue = jsonJob.get("job.queue.name").toString(); - - String oldAppId = jsonJob.get("job.id").toString(); - boolean isTracked = trackedApps.contains(oldAppId); - int queueSize = - queueAppNumMap.containsKey(queue) ? queueAppNumMap.get(queue) : 0; - queueSize++; - queueAppNumMap.put(queue, queueSize); - // tasks - List tasks = (List) jsonJob.get("job.tasks"); - if (tasks == null || tasks.size() == 0) { - continue; - } - List containerList = - new ArrayList(); - for (Object o : tasks) { - Map jsonTask = (Map) o; - String hostname = jsonTask.get("container.host").toString(); - long taskStart = - Long.parseLong(jsonTask.get("container.start.ms").toString()); - long taskFinish = - Long.parseLong(jsonTask.get("container.end.ms").toString()); - long lifeTime = taskFinish - taskStart; - - // Set memory and vcores from job trace file - Resource res = Resources.clone(containerResource); - if (jsonTask.containsKey("container.memory")) { - int containerMemory = - Integer.parseInt(jsonTask.get("container.memory").toString()); - res.setMemorySize(containerMemory); - } - - if (jsonTask.containsKey("container.vcores")) { - int containerVCores = - Integer.parseInt(jsonTask.get("container.vcores").toString()); - res.setVirtualCores(containerVCores); - } - - int priority = - Integer.parseInt(jsonTask.get("container.priority").toString()); - String type = jsonTask.get("container.type").toString(); - containerList.add(new ContainerSimulator(res, lifeTime, hostname, - priority, type)); - } - - // create a new AM - String amType = jsonJob.get("am.type").toString(); - AMSimulator amSim = (AMSimulator) ReflectionUtils - .newInstance(amClassMap.get(amType), new Configuration()); - if (amSim != null) { - amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this, - jobStartTime, jobFinishTime, user, queue, isTracked, oldAppId, - null, runner.getStartTimeMS()); - runner.schedule(amSim); - maxRuntime = Math.max(maxRuntime, jobFinishTime); - numTasks += containerList.size(); - amMap.put(oldAppId, amSim); - } + while (jobIter.hasNext()) { + try { + createAMForJob(jobIter.next()); + } catch (Exception e) { + LOG.error("Failed to create an AM: " + e.getMessage()); } - } finally { - input.close(); } } } + private void createAMForJob(Map jsonJob) throws YarnException { + long jobStartTime = Long.parseLong(jsonJob.get("job.start.ms").toString()); + long jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString()); + + String user = (String) jsonJob.get("job.user"); + if (user == null) { + user = "default"; + } + + String queue = jsonJob.get("job.queue.name").toString(); + increaseQueueAppNum(queue); + + String oldAppId = jsonJob.get("job.id").toString(); + + // tasks + List tasks = (List) jsonJob.get("job.tasks"); + if (tasks == null || tasks.size() == 0) { + throw new YarnException("No task for the job!"); + } + + List containerList = new ArrayList<>(); + for (Object o : tasks) { + Map jsonTask = (Map) o; + String hostname = jsonTask.get("container.host").toString(); + long taskStart = Long.parseLong(jsonTask.get("container.start.ms") + .toString()); + long taskFinish = Long.parseLong(jsonTask.get("container.end.ms") + .toString()); + long lifeTime = taskFinish - taskStart; + + // Set memory and vcores from job trace file + Resource res = getDefaultContainerResource(); + if (jsonTask.containsKey("container.memory")) { + int containerMemory = + Integer.parseInt(jsonTask.get("container.memory").toString()); + res.setMemorySize(containerMemory); + } + + if (jsonTask.containsKey("container.vcores")) { + int containerVCores = + Integer.parseInt(jsonTask.get("container.vcores").toString()); + res.setVirtualCores(containerVCores); + } + + int priority = Integer.parseInt(jsonTask.get("container.priority") + .toString()); + String type = jsonTask.get("container.type").toString(); + containerList.add( + new ContainerSimulator(res, lifeTime, hostname, priority, type)); + } + + // create a new AM + String amType = jsonJob.get("am.type").toString(); + runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, + containerList, null); + } + /** - * parse workload information from rumen trace files + * Parse workload from a rumen trace file. */ @SuppressWarnings("unchecked") - private void startAMFromRumenTraces(Resource containerResource, - int heartbeatInterval) throws IOException { + private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS) + throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); - long baselineTimeMS = 0; - for (String inputTrace : inputTraces) { - File fin = new File(inputTrace); - JobTraceReader reader = - new JobTraceReader(new Path(fin.getAbsolutePath()), conf); - try { - LoggedJob job = null; - while ((job = reader.getNext()) != null) { - // only support MapReduce currently - String jobType = "mapreduce"; - String user = - job.getUser() == null ? "default" : job.getUser().getValue(); - String jobQueue = job.getQueue().getValue(); - String oldJobId = job.getJobID().toString(); - long jobStartTimeMS = job.getSubmitTime(); - long jobFinishTimeMS = job.getFinishTime(); - if (baselineTimeMS == 0) { - baselineTimeMS = jobStartTimeMS; - } - jobStartTimeMS -= baselineTimeMS; - jobFinishTimeMS -= baselineTimeMS; - if (jobStartTimeMS < 0) { - LOG.warn("Warning: reset job " + oldJobId + " start time to 0."); - jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; - jobStartTimeMS = 0; - } + File fin = new File(inputTrace); - boolean isTracked = trackedApps.contains(oldJobId); - int queueSize = queueAppNumMap.containsKey(jobQueue) - ? queueAppNumMap.get(jobQueue) : 0; - queueSize++; - queueAppNumMap.put(jobQueue, queueSize); + try (JobTraceReader reader = new JobTraceReader( + new Path(fin.getAbsolutePath()), conf)) { + LoggedJob job = reader.getNext(); - List containerList = - new ArrayList(); - // map tasks - for (LoggedTask mapTask : job.getMapTasks()) { - if (mapTask.getAttempts().size() == 0) { - continue; - } - LoggedTaskAttempt taskAttempt = - mapTask.getAttempts().get(mapTask.getAttempts().size() - 1); - String hostname = taskAttempt.getHostName().getValue(); - long containerLifeTime = - taskAttempt.getFinishTime() - taskAttempt.getStartTime(); - containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, 10, "map")); - } - - // reduce tasks - for (LoggedTask reduceTask : job.getReduceTasks()) { - if (reduceTask.getAttempts().size() == 0) { - continue; - } - LoggedTaskAttempt taskAttempt = reduceTask.getAttempts() - .get(reduceTask.getAttempts().size() - 1); - String hostname = taskAttempt.getHostName().getValue(); - long containerLifeTime = - taskAttempt.getFinishTime() - taskAttempt.getStartTime(); - containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, 20, "reduce")); - } - - // create a new AM - AMSimulator amSim = (AMSimulator) ReflectionUtils - .newInstance(amClassMap.get(jobType), conf); - if (amSim != null) { - amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this, - jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, - oldJobId, null, runner.getStartTimeMS()); - runner.schedule(amSim); - maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); - numTasks += containerList.size(); - amMap.put(oldJobId, amSim); - } + while (job != null) { + try { + createAMForJob(job, baselineTimeMS); + } catch (Exception e) { + LOG.error("Failed to create an AM: " + e.getMessage()); } - } finally { - reader.close(); + + job = reader.getNext(); } } } + private void createAMForJob(LoggedJob job, long baselineTimeMs) + throws YarnException { + String user = job.getUser() == null ? "default" : + job.getUser().getValue(); + String jobQueue = job.getQueue().getValue(); + String oldJobId = job.getJobID().toString(); + long jobStartTimeMS = job.getSubmitTime(); + long jobFinishTimeMS = job.getFinishTime(); + if (baselineTimeMs == 0) { + baselineTimeMs = job.getSubmitTime(); + } + jobStartTimeMS -= baselineTimeMs; + jobFinishTimeMS -= baselineTimeMs; + if (jobStartTimeMS < 0) { + LOG.warn("Warning: reset job " + oldJobId + " start time to 0."); + jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; + jobStartTimeMS = 0; + } + + increaseQueueAppNum(jobQueue); + + List containerList = new ArrayList<>(); + // mapper + for (LoggedTask mapTask : job.getMapTasks()) { + if (mapTask.getAttempts().size() == 0) { + throw new YarnException("Invalid map task, no attempt for a mapper!"); + } + LoggedTaskAttempt taskAttempt = + mapTask.getAttempts().get(mapTask.getAttempts().size() - 1); + String hostname = taskAttempt.getHostName().getValue(); + long containerLifeTime = taskAttempt.getFinishTime() - + taskAttempt.getStartTime(); + containerList.add( + new ContainerSimulator(getDefaultContainerResource(), + containerLifeTime, hostname, 10, "map")); + } + + // reducer + for (LoggedTask reduceTask : job.getReduceTasks()) { + if (reduceTask.getAttempts().size() == 0) { + throw new YarnException( + "Invalid reduce task, no attempt for a reducer!"); + } + LoggedTaskAttempt taskAttempt = + reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1); + String hostname = taskAttempt.getHostName().getValue(); + long containerLifeTime = taskAttempt.getFinishTime() - + taskAttempt.getStartTime(); + containerList.add( + new ContainerSimulator(getDefaultContainerResource(), + containerLifeTime, hostname, 20, "reduce")); + } + + // Only supports the default job type currently + runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, + jobStartTimeMS, jobFinishTimeMS, containerList, null); + } + + private Resource getDefaultContainerResource() { + int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB, + SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); + int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES, + SLSConfiguration.CONTAINER_VCORES_DEFAULT); + return Resources.createResource(containerMemory, containerVCores); + } + /** * parse workload information from synth-generator trace files. */ @SuppressWarnings("unchecked") - private void startAMFromSynthGenerator(int heartbeatInterval) - throws IOException { + private void startAMFromSynthGenerator() throws YarnException, IOException { Configuration localConf = new Configuration(); localConf.set("fs.defaultFS", "file:///"); long baselineTimeMS = 0; @@ -540,7 +520,6 @@ private void startAMFromSynthGenerator(int heartbeatInterval) // creation while ((job = (SynthJob) stjp.getNextJob()) != null) { // only support MapReduce currently - String jobType = "mapreduce"; String user = job.getUser(); String jobQueue = job.getQueueName(); String oldJobId = job.getJobID().toString(); @@ -560,11 +539,7 @@ private void startAMFromSynthGenerator(int heartbeatInterval) jobStartTimeMS = 0; } - boolean isTracked = trackedApps.contains(oldJobId); - int queueSize = queueAppNumMap.containsKey(jobQueue) - ? queueAppNumMap.get(jobQueue) : 0; - queueSize++; - queueAppNumMap.put(jobQueue, queueSize); + increaseQueueAppNum(jobQueue); List containerList = new ArrayList(); @@ -625,18 +600,9 @@ private void startAMFromSynthGenerator(int heartbeatInterval) job.getQueueName()); } - // create a new AM - AMSimulator amSim = (AMSimulator) ReflectionUtils - .newInstance(amClassMap.get(jobType), localConf); - if (amSim != null) { - amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this, - jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, - oldJobId, rr, runner.getStartTimeMS()); - runner.schedule(amSim); - maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); - numTasks += containerList.size(); - amMap.put(oldJobId, amSim); - } + + runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, + jobStartTimeMS, jobFinishTimeMS, containerList, rr); } } finally { stjp.close(); @@ -644,6 +610,42 @@ private void startAMFromSynthGenerator(int heartbeatInterval) } + private void increaseQueueAppNum(String queue) throws YarnException { + SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler(); + String queueName = wrapper.getRealQueueName(queue); + Integer appNum = queueAppNumMap.get(queueName); + if (appNum == null) { + appNum = 1; + } else { + appNum++; + } + + queueAppNumMap.put(queueName, appNum); + } + + private void runNewAM(String jobType, String user, + String jobQueue, String oldJobId, long jobStartTimeMS, + long jobFinishTimeMS, List containerList, + ReservationSubmissionRequest rr) { + + AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( + amClassMap.get(jobType), new Configuration()); + + if (amSim != null) { + int heartbeatInterval = getConf().getInt( + SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, + SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); + boolean isTracked = trackedApps.contains(oldJobId); + amSim.init(AM_ID++, heartbeatInterval, containerList, + rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, + isTracked, oldJobId, rr, runner.getStartTimeMS()); + runner.schedule(amSim); + maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); + numTasks += containerList.size(); + amMap.put(oldJobId, amSim); + } + } + private void printSimulationInfo() { if (printSimulation) { // node diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 56190df432..108c2bc8b1 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -360,4 +361,12 @@ public SchedulerMetrics getSchedulerMetrics() { public Configuration getConf() { return conf; } + + public String getRealQueueName(String queue) throws YarnException { + if (getQueue(queue) == null) { + throw new YarnException("Can't find the queue by the given name: " + queue + + "! Please check if queue " + queue + " is in the allocation file."); + } + return getQueue(queue).getQueueName(); + } } \ No newline at end of file diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java index f740f5a170..81f66488cd 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -333,5 +334,13 @@ public void serviceStop() throws Exception { } super.serviceStop(); } + + public String getRealQueueName(String queue) throws YarnException { + if (!getQueueManager().exists(queue)) { + throw new YarnException("Can't find the queue by the given name: " + queue + + "! Please check if queue " + queue + " is in the allocation file."); + } + return getQueueManager().getQueue(queue).getQueueName(); + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java index 406f0508ec..7112b1a6fd 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java @@ -19,10 +19,14 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.exceptions.YarnException; @Private @Unstable public interface SchedulerWrapper { SchedulerMetrics getSchedulerMetrics(); + Tracker getTracker(); + + String getRealQueueName(String queue) throws YarnException; } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java index eaa59ddfd0..e27b36ffdc 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java @@ -43,7 +43,7 @@ @Private @Unstable public class SLSUtils { - public static final int SHUTDOWN_HOOK_PRIORITY = 30; + public final static String DEFAULT_JOB_TYPE = "mapreduce"; // hostname includes the network path and the host name. for example // "/default-rack/hostFoo" or "/coreSwitchA/TORSwitchB/hostBar".