From 99880d0a16727c770da053464da87960c5b02065 Mon Sep 17 00:00:00 2001 From: Yufei Gu Date: Fri, 27 Oct 2017 14:41:37 -0700 Subject: [PATCH] YARN-6505. Define the strings used in SLS JSON input file format. (Contributed by Gergely Novak) --- .../org/apache/hadoop/yarn/sls/SLSRunner.java | 85 ++++++++++--------- .../yarn/sls/conf/SLSConfiguration.java | 34 +++++++- .../hadoop/yarn/sls/utils/SLSUtils.java | 16 ++-- .../site/markdown/SchedulerLoadSimulator.md | 2 +- 4 files changed, 90 insertions(+), 47 deletions(-) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index dfdf7c95fc..9d6c3aa754 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -173,8 +173,8 @@ private void init(Configuration tempConf) throws ClassNotFoundException { // map for (Map.Entry e : tempConf) { String key = e.getKey().toString(); - if (key.startsWith(SLSConfiguration.AM_TYPE)) { - String amType = key.substring(SLSConfiguration.AM_TYPE.length()); + if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) { + String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length()); amClassMap.put(amType, Class.forName(tempConf.get(key))); } } @@ -384,33 +384,36 @@ private void startAMFromSLSTrace(String inputTrace) throws IOException { } private void createAMForJob(Map jsonJob) throws YarnException { - long jobStartTime = Long.parseLong(jsonJob.get("job.start.ms").toString()); + long jobStartTime = Long.parseLong( + jsonJob.get(SLSConfiguration.JOB_START_MS).toString()); long jobFinishTime = 0; - if (jsonJob.containsKey("job.end.ms")) { - jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString()); + if (jsonJob.containsKey(SLSConfiguration.JOB_END_MS)) { + jobFinishTime = Long.parseLong( + jsonJob.get(SLSConfiguration.JOB_END_MS).toString()); } - String user = (String) jsonJob.get("job.user"); + String user = (String) jsonJob.get(SLSConfiguration.JOB_USER); if (user == null) { user = "default"; } - String queue = jsonJob.get("job.queue.name").toString(); + String queue = jsonJob.get(SLSConfiguration.JOB_QUEUE_NAME).toString(); increaseQueueAppNum(queue); - String amType = (String)jsonJob.get("am.type"); + String amType = (String)jsonJob.get(SLSConfiguration.AM_TYPE); if (amType == null) { amType = SLSUtils.DEFAULT_JOB_TYPE; } int jobCount = 1; - if (jsonJob.containsKey("job.count")) { - jobCount = Integer.parseInt(jsonJob.get("job.count").toString()); + if (jsonJob.containsKey(SLSConfiguration.JOB_COUNT)) { + jobCount = Integer.parseInt( + jsonJob.get(SLSConfiguration.JOB_COUNT).toString()); } jobCount = Math.max(jobCount, 1); - String oldAppId = (String)jsonJob.get("job.id"); + String oldAppId = (String)jsonJob.get(SLSConfiguration.JOB_ID); // Job id is generated automatically if this job configuration allows // multiple job instances if(jobCount > 1) { @@ -426,7 +429,7 @@ private void createAMForJob(Map jsonJob) throws YarnException { private List getTaskContainers(Map jsonJob) throws YarnException { List containers = new ArrayList<>(); - List tasks = (List) jsonJob.get("job.tasks"); + List tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS); if (tasks == null || tasks.size() == 0) { throw new YarnException("No task for the job!"); } @@ -434,17 +437,22 @@ private List getTaskContainers(Map jsonJob) for (Object o : tasks) { Map jsonTask = (Map) o; - String hostname = (String) jsonTask.get("container.host"); + String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST); long duration = 0; - if (jsonTask.containsKey("duration.ms")) { - duration = Integer.parseInt(jsonTask.get("duration.ms").toString()); - } else if (jsonTask.containsKey("container.start.ms") && - jsonTask.containsKey("container.end.ms")) { - long taskStart = Long.parseLong(jsonTask.get("container.start.ms") - .toString()); - long taskFinish = Long.parseLong(jsonTask.get("container.end.ms") - .toString()); + if (jsonTask.containsKey(SLSConfiguration.TASK_DURATION_MS)) { + duration = Integer.parseInt( + jsonTask.get(SLSConfiguration.TASK_DURATION_MS).toString()); + } else if (jsonTask.containsKey(SLSConfiguration.DURATION_MS)) { + // Also support "duration.ms" for backward compatibility + duration = Integer.parseInt( + jsonTask.get(SLSConfiguration.DURATION_MS).toString()); + } else if (jsonTask.containsKey(SLSConfiguration.TASK_START_MS) && + jsonTask.containsKey(SLSConfiguration.TASK_END_MS)) { + long taskStart = Long.parseLong( + jsonTask.get(SLSConfiguration.TASK_START_MS).toString()); + long taskFinish = Long.parseLong( + jsonTask.get(SLSConfiguration.TASK_END_MS).toString()); duration = taskFinish - taskStart; } if (duration <= 0) { @@ -453,32 +461,33 @@ private List getTaskContainers(Map jsonJob) } Resource res = getDefaultContainerResource(); - if (jsonTask.containsKey("container.memory")) { - int containerMemory = - Integer.parseInt(jsonTask.get("container.memory").toString()); + if (jsonTask.containsKey(SLSConfiguration.TASK_MEMORY)) { + int containerMemory = Integer.parseInt( + jsonTask.get(SLSConfiguration.TASK_MEMORY).toString()); res.setMemorySize(containerMemory); } - if (jsonTask.containsKey("container.vcores")) { - int containerVCores = - Integer.parseInt(jsonTask.get("container.vcores").toString()); + if (jsonTask.containsKey(SLSConfiguration.CONTAINER_VCORES)) { + int containerVCores = Integer.parseInt( + jsonTask.get(SLSConfiguration.CONTAINER_VCORES).toString()); res.setVirtualCores(containerVCores); } int priority = DEFAULT_MAPPER_PRIORITY; - if (jsonTask.containsKey("container.priority")) { - priority = Integer.parseInt(jsonTask.get("container.priority") - .toString()); + if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) { + priority = Integer.parseInt( + jsonTask.get(SLSConfiguration.TASK_PRIORITY).toString()); } String type = "map"; - if (jsonTask.containsKey("container.type")) { - type = jsonTask.get("container.type").toString(); + if (jsonTask.containsKey(SLSConfiguration.TASK_TYPE)) { + type = jsonTask.get(SLSConfiguration.TASK_TYPE).toString(); } int count = 1; - if (jsonTask.containsKey("count")) { - count = Integer.parseInt(jsonTask.get("count").toString()); + if (jsonTask.containsKey(SLSConfiguration.COUNT)) { + count = Integer.parseInt( + jsonTask.get(SLSConfiguration.COUNT).toString()); } count = Math.max(count, 1); @@ -708,14 +717,14 @@ private Resource getAMContainerResource(Map jsonJob) { return amContainerResource; } - if (jsonJob.containsKey("am.memory")) { + if (jsonJob.containsKey(SLSConfiguration.AM_MEMORY)) { amContainerResource.setMemorySize( - Long.parseLong(jsonJob.get("am.memory").toString())); + Long.parseLong(jsonJob.get(SLSConfiguration.AM_MEMORY).toString())); } - if (jsonJob.containsKey("am.vcores")) { + if (jsonJob.containsKey(SLSConfiguration.AM_VCORES)) { amContainerResource.setVirtualCores( - Integer.parseInt(jsonJob.get("am.vcores").toString())); + Integer.parseInt(jsonJob.get(SLSConfiguration.AM_VCORES).toString())); } return amContainerResource; } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java index 038f2021ee..58f0c8c849 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java @@ -62,12 +62,15 @@ public class SLSConfiguration { public static final String AM_HEARTBEAT_INTERVAL_MS = AM_PREFIX + "heartbeat.interval.ms"; public static final int AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000; - public static final String AM_TYPE = AM_PREFIX + "type."; + public static final String AM_TYPE = AM_PREFIX + "type"; + public static final String AM_TYPE_PREFIX = AM_TYPE + "."; + public static final String AM_MEMORY = AM_PREFIX + "memory"; public static final String AM_CONTAINER_MEMORY = AM_PREFIX + "container.memory"; public static final int AM_CONTAINER_MEMORY_DEFAULT = 1024; + public static final String AM_VCORES = AM_PREFIX + "vcores"; public static final String AM_CONTAINER_VCORES = AM_PREFIX + "container.vcores"; public static final int AM_CONTAINER_VCORES_DEFAULT = 1; @@ -85,4 +88,33 @@ public static Resource getAMContainerResource(Configuration conf) { conf.getLong(AM_CONTAINER_MEMORY, AM_CONTAINER_MEMORY_DEFAULT), conf.getInt(AM_CONTAINER_VCORES, AM_CONTAINER_VCORES_DEFAULT)); } + + // input file + + // nodes + public static final String NUM_NODES = "num.nodes"; + public static final String NUM_RACKS = "num.racks"; + + // job + public static final String JOB_PREFIX = "job."; + public static final String JOB_ID = JOB_PREFIX + "id"; + public static final String JOB_START_MS = JOB_PREFIX + "start.ms"; + public static final String JOB_END_MS = JOB_PREFIX + "end.ms"; + public static final String JOB_QUEUE_NAME = JOB_PREFIX + "queue.name"; + public static final String JOB_USER = JOB_PREFIX + "user"; + public static final String JOB_COUNT = JOB_PREFIX + "count"; + public static final String JOB_TASKS = JOB_PREFIX + "tasks"; + + // task + public static final String COUNT = "count"; + public static final String TASK_CONTAINER = "container."; + public static final String TASK_HOST = TASK_CONTAINER + "host"; + public static final String TASK_START_MS = TASK_CONTAINER + "start.ms"; + public static final String TASK_END_MS = TASK_CONTAINER + "end.ms"; + public static final String DURATION_MS = "duration.ms"; + public static final String TASK_DURATION_MS = TASK_CONTAINER + DURATION_MS; + public static final String TASK_PRIORITY = TASK_CONTAINER + "priority"; + public static final String TASK_TYPE = TASK_CONTAINER + "type"; + public static final String TASK_MEMORY = TASK_CONTAINER + "memory"; + } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java index dbc2dab4b5..e914fe733a 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java @@ -39,6 +39,7 @@ import org.apache.hadoop.tools.rumen.LoggedJob; import org.apache.hadoop.tools.rumen.LoggedTask; import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; @Private @Unstable @@ -118,21 +119,22 @@ public static Set parseNodesFromSLSTrace(String jobTrace) } private static void addNodes(Set nodeSet, Map jsonEntry) { - if (jsonEntry.containsKey("num.nodes")) { - int numNodes = Integer.parseInt(jsonEntry.get("num.nodes").toString()); + if (jsonEntry.containsKey(SLSConfiguration.NUM_NODES)) { + int numNodes = Integer.parseInt( + jsonEntry.get(SLSConfiguration.NUM_NODES).toString()); int numRacks = 1; - if (jsonEntry.containsKey("num.racks")) { + if (jsonEntry.containsKey(SLSConfiguration.NUM_RACKS)) { numRacks = Integer.parseInt( - jsonEntry.get("num.racks").toString()); + jsonEntry.get(SLSConfiguration.NUM_RACKS).toString()); } nodeSet.addAll(generateNodes(numNodes, numRacks)); } - if (jsonEntry.containsKey("job.tasks")) { - List tasks = (List) jsonEntry.get("job.tasks"); + if (jsonEntry.containsKey(SLSConfiguration.JOB_TASKS)) { + List tasks = (List) jsonEntry.get(SLSConfiguration.JOB_TASKS); for (Object o : tasks) { Map jsonTask = (Map) o; - String hostname = (String) jsonTask.get("container.host"); + String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST); if (hostname != null) { nodeSet.add(hostname); } diff --git a/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md b/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md index d3f91f4af1..0bab9a6738 100644 --- a/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md +++ b/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md @@ -344,7 +344,7 @@ Here we provide an example format of the sls json file, which contains 2 jobs. T "container.host" : "/default-rack/node1", // host the container asks for "container.start.ms" : 6664, // container start time, optional "container.end.ms" : 23707, // container finish time, optional - "duration.ms": 50000, // duration of the container, optional if start and end time is specified + "container.duration.ms": 50000, // duration of the container, optional if start and end time is specified "container.priority" : 20, // priority of the container, optional, the default value is 20 "container.type" : "map" // type of the container, could be "map" or "reduce", optional, the default value is "map" }, {