From 3082552b3b991df846caf572b58e44308ddf8eeb Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Thu, 4 May 2017 17:21:46 -0700 Subject: [PATCH] YARN-6522. Make SLS JSON input file format simple and scalable (yufeigu via rkanter) --- .../org/apache/hadoop/yarn/sls/SLSRunner.java | 100 +++++++++++++----- .../yarn/sls/appmaster/AMSimulator.java | 42 ++++---- .../sls/synthetic/SynthTraceJobProducer.java | 2 +- .../hadoop/yarn/sls/utils/SLSUtils.java | 49 ++++++--- .../site/markdown/SchedulerLoadSimulator.md | 28 +++-- .../hadoop/yarn/sls/utils/TestSLSUtils.java | 30 ++++++ 6 files changed, 181 insertions(+), 70 deletions(-) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 9d35d1b228..ddd35ef475 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -119,6 +119,9 @@ public class SLSRunner extends Configured implements Tool { // logger public final static Logger LOG = Logger.getLogger(SLSRunner.class); + private final static int DEFAULT_MAPPER_PRIORITY = 20; + private final static int DEFAULT_REDUCER_PRIORITY = 10; + /** * The type of trace in input. */ @@ -247,8 +250,8 @@ private void startNM() throws YarnException, IOException { break; case SYNTH: stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); - nodeSet.addAll(SLSUtils.generateNodesFromSynth(stjp.getNumNodes(), - stjp.getNodesPerRack())); + nodeSet.addAll(SLSUtils.generateNodes(stjp.getNumNodes(), + stjp.getNumNodes()/stjp.getNodesPerRack())); break; default: throw new YarnException("Input configuration not recognized, " @@ -259,6 +262,10 @@ private void startNM() throws YarnException, IOException { nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile)); } + if (nodeSet.size() == 0) { + throw new YarnException("No node! Please configure nodes."); + } + // create NM simulators Random random = new Random(); Set rackSet = new HashSet(); @@ -348,7 +355,11 @@ private void startAMFromSLSTrace(String inputTrace) throws IOException { private void createAMForJob(Map jsonJob) throws YarnException { long jobStartTime = Long.parseLong(jsonJob.get("job.start.ms").toString()); - long jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString()); + + long jobFinishTime = 0; + if (jsonJob.containsKey("job.end.ms")) { + jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString()); + } String user = (String) jsonJob.get("job.user"); if (user == null) { @@ -358,25 +369,49 @@ private void createAMForJob(Map jsonJob) throws YarnException { String queue = jsonJob.get("job.queue.name").toString(); increaseQueueAppNum(queue); - String oldAppId = jsonJob.get("job.id").toString(); + String oldAppId = (String)jsonJob.get("job.id"); + if (oldAppId == null) { + oldAppId = Integer.toString(AM_ID); + } - // tasks + String amType = (String)jsonJob.get("am.type"); + if (amType == null) { + amType = SLSUtils.DEFAULT_JOB_TYPE; + } + + runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, + getTaskContainers(jsonJob), null); + } + + private List getTaskContainers(Map jsonJob) + throws YarnException { + List containers = new ArrayList<>(); List tasks = (List) jsonJob.get("job.tasks"); if (tasks == null || tasks.size() == 0) { throw new YarnException("No task for the job!"); } - List containerList = new ArrayList<>(); for (Object o : tasks) { Map jsonTask = (Map) o; - String hostname = jsonTask.get("container.host").toString(); - long taskStart = Long.parseLong(jsonTask.get("container.start.ms") - .toString()); - long taskFinish = Long.parseLong(jsonTask.get("container.end.ms") - .toString()); - long lifeTime = taskFinish - taskStart; - // Set memory and vcores from job trace file + String hostname = (String) jsonTask.get("container.host"); + + long duration = 0; + if (jsonTask.containsKey("duration.ms")) { + duration = Integer.parseInt(jsonTask.get("duration.ms").toString()); + } else if (jsonTask.containsKey("container.start.ms") && + jsonTask.containsKey("container.end.ms")) { + long taskStart = Long.parseLong(jsonTask.get("container.start.ms") + .toString()); + long taskFinish = Long.parseLong(jsonTask.get("container.end.ms") + .toString()); + duration = taskFinish - taskStart; + } + if (duration <= 0) { + throw new YarnException("Duration of a task shouldn't be less or equal" + + " to 0!"); + } + Resource res = getDefaultContainerResource(); if (jsonTask.containsKey("container.memory")) { int containerMemory = @@ -390,17 +425,30 @@ private void createAMForJob(Map jsonJob) throws YarnException { res.setVirtualCores(containerVCores); } - int priority = Integer.parseInt(jsonTask.get("container.priority") - .toString()); - String type = jsonTask.get("container.type").toString(); - containerList.add( - new ContainerSimulator(res, lifeTime, hostname, priority, type)); + int priority = DEFAULT_MAPPER_PRIORITY; + if (jsonTask.containsKey("container.priority")) { + priority = Integer.parseInt(jsonTask.get("container.priority") + .toString()); + } + + String type = "map"; + if (jsonTask.containsKey("container.type")) { + type = jsonTask.get("container.type").toString(); + } + + int count = 1; + if (jsonTask.containsKey("count")) { + count = Integer.parseInt(jsonTask.get("count").toString()); + } + count = Math.max(count, 1); + + for (int i = 0; i < count; i++) { + containers.add( + new ContainerSimulator(res, duration, hostname, priority, type)); + } } - // create a new AM - String amType = jsonJob.get("am.type").toString(); - runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, - containerList, null); + return containers; } /** @@ -463,7 +511,7 @@ private void createAMForJob(LoggedJob job, long baselineTimeMs) taskAttempt.getStartTime(); containerList.add( new ContainerSimulator(getDefaultContainerResource(), - containerLifeTime, hostname, 10, "map")); + containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map")); } // reducer @@ -479,7 +527,7 @@ private void createAMForJob(LoggedJob job, long baselineTimeMs) taskAttempt.getStartTime(); containerList.add( new ContainerSimulator(getDefaultContainerResource(), - containerLifeTime, hostname, 20, "reduce")); + containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce")); } // Only supports the default job type currently @@ -559,7 +607,7 @@ private void startAMFromSynthGenerator() throws YarnException, IOException { Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), (int) tai.getTaskInfo().getTaskVCores()); containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, 10, "map")); + containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map")); maxMapRes = Resources.componentwiseMax(maxMapRes, containerResource); maxMapDur = containerLifeTime > maxMapDur ? containerLifeTime : maxMapDur; @@ -579,7 +627,7 @@ private void startAMFromSynthGenerator() throws YarnException, IOException { Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), (int) tai.getTaskInfo().getTaskVCores()); containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, 20, "reduce")); + containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce")); maxRedRes = Resources.componentwiseMax(maxRedRes, containerResource); maxRedDur = containerLifeTime > maxRedDur ? containerLifeTime : maxRedDur; diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 45a3c072bc..70c557929e 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -400,26 +400,28 @@ protected List packageRequests( Map nodeLocalRequestMap = new HashMap(); ResourceRequest anyRequest = null; for (ContainerSimulator cs : csList) { - String rackHostNames[] = SLSUtils.getRackHostName(cs.getHostname()); - // check rack local - String rackname = "/" + rackHostNames[0]; - if (rackLocalRequestMap.containsKey(rackname)) { - rackLocalRequestMap.get(rackname).setNumContainers( - rackLocalRequestMap.get(rackname).getNumContainers() + 1); - } else { - ResourceRequest request = createResourceRequest( - cs.getResource(), rackname, priority, 1); - rackLocalRequestMap.put(rackname, request); - } - // check node local - String hostname = rackHostNames[1]; - if (nodeLocalRequestMap.containsKey(hostname)) { - nodeLocalRequestMap.get(hostname).setNumContainers( - nodeLocalRequestMap.get(hostname).getNumContainers() + 1); - } else { - ResourceRequest request = createResourceRequest( - cs.getResource(), hostname, priority, 1); - nodeLocalRequestMap.put(hostname, request); + if (cs.getHostname() != null) { + String[] rackHostNames = SLSUtils.getRackHostName(cs.getHostname()); + // check rack local + String rackname = "/" + rackHostNames[0]; + if (rackLocalRequestMap.containsKey(rackname)) { + rackLocalRequestMap.get(rackname).setNumContainers( + rackLocalRequestMap.get(rackname).getNumContainers() + 1); + } else { + ResourceRequest request = + createResourceRequest(cs.getResource(), rackname, priority, 1); + rackLocalRequestMap.put(rackname, request); + } + // check node local + String hostname = rackHostNames[1]; + if (nodeLocalRequestMap.containsKey(hostname)) { + nodeLocalRequestMap.get(hostname).setNumContainers( + nodeLocalRequestMap.get(hostname).getNumContainers() + 1); + } else { + ResourceRequest request = + createResourceRequest(cs.getResource(), hostname, priority, 1); + nodeLocalRequestMap.put(hostname, request); + } } // any if (anyRequest == null) { diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java index 3d2ec947fb..c89e4e26a5 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java @@ -131,7 +131,7 @@ public long getSeed() { } public int getNodesPerRack() { - return trace.nodes_per_rack; + return trace.nodes_per_rack < 1 ? 1: trace.nodes_per_rack; } public int getNumNodes() { diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java index e27b36ffdc..dbc2dab4b5 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java @@ -101,7 +101,7 @@ public static Set parseNodesFromRumenTrace(String jobTrace) */ public static Set parseNodesFromSLSTrace(String jobTrace) throws IOException { - Set nodeSet = new HashSet(); + Set nodeSet = new HashSet<>(); JsonFactory jsonF = new JsonFactory(); ObjectMapper mapper = new ObjectMapper(); Reader input = @@ -109,13 +109,7 @@ public static Set parseNodesFromSLSTrace(String jobTrace) try { Iterator i = mapper.readValues(jsonF.createParser(input), Map.class); while (i.hasNext()) { - Map jsonE = i.next(); - List tasks = (List) jsonE.get("job.tasks"); - for (Object o : tasks) { - Map jsonTask = (Map) o; - String hostname = jsonTask.get("container.host").toString(); - nodeSet.add(hostname); - } + addNodes(nodeSet, i.next()); } } finally { input.close(); @@ -123,6 +117,29 @@ public static Set parseNodesFromSLSTrace(String jobTrace) return nodeSet; } + private static void addNodes(Set nodeSet, Map jsonEntry) { + if (jsonEntry.containsKey("num.nodes")) { + int numNodes = Integer.parseInt(jsonEntry.get("num.nodes").toString()); + int numRacks = 1; + if (jsonEntry.containsKey("num.racks")) { + numRacks = Integer.parseInt( + jsonEntry.get("num.racks").toString()); + } + nodeSet.addAll(generateNodes(numNodes, numRacks)); + } + + if (jsonEntry.containsKey("job.tasks")) { + List tasks = (List) jsonEntry.get("job.tasks"); + for (Object o : tasks) { + Map jsonTask = (Map) o; + String hostname = (String) jsonTask.get("container.host"); + if (hostname != null) { + nodeSet.add(hostname); + } + } + } + } + /** * parse the input node file, return each host name */ @@ -150,11 +167,19 @@ public static Set parseNodesFromNodeFile(String nodeFile) return nodeSet; } - public static Set generateNodesFromSynth( - int numNodes, int nodesPerRack) { - Set nodeSet = new HashSet(); + public static Set generateNodes(int numNodes, + int numRacks){ + Set nodeSet = new HashSet<>(); + if (numRacks < 1) { + numRacks = 1; + } + + if (numRacks > numNodes) { + numRacks = numNodes; + } + for (int i = 0; i < numNodes; i++) { - nodeSet.add("/rack" + i % nodesPerRack + "/node" + i); + nodeSet.add("/rack" + i % numRacks + "/node" + i); } return nodeSet; } diff --git a/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md b/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md index f0e3b8c433..6e00e9a470 100644 --- a/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md +++ b/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md @@ -328,18 +328,24 @@ Appendix Here we provide an example format of the sls json file, which contains 2 jobs. The first job has 3 map tasks and the second one has 2 map tasks. { - "am.type" : "mapreduce", - "job.start.ms" : 0, - "job.end.ms" : 95375, - "job.queue.name" : "sls_queue_1", - "job.id" : "job_1", - "job.user" : "default", + "num.nodes": 3, // total number of nodes in the cluster + "num.racks": 1 // total number of racks in the cluster, it divides num.nodes into the racks evenly, optional, the default value is 1 + } + { + "am.type" : "mapreduce", // type of AM, optional, the default value is "mapreduce" + "job.start.ms" : 0, // job start time + "job.end.ms" : 95375, // job finish time, optional, the default value is 0 + "job.queue.name" : "sls_queue_1", // the queue job will be submitted to + "job.id" : "job_1", // the job id used to track the job, optional, the default value is an zero-based integer increasing with number of jobs + "job.user" : "default", // user, optional, the default value is "default" "job.tasks" : [ { - "container.host" : "/default-rack/node1", - "container.start.ms" : 6664, - "container.end.ms" : 23707, - "container.priority" : 20, - "container.type" : "map" + "count": 1, // number of tasks, optional, the default value is 1 + "container.host" : "/default-rack/node1", // host the container asks for + "container.start.ms" : 6664, // container start time, optional + "container.end.ms" : 23707, // container finish time, optional + "duration.ms": 50000, // duration of the container, optional if start and end time is specified + "container.priority" : 20, // priority of the container, optional, the default value is 20 + "container.type" : "map" // type of the container, could be "map" or "reduce", optional, the default value is "map" }, { "container.host" : "/default-rack/node3", "container.start.ms" : 6665, diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java index f4eda67958..30964a1bce 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java @@ -21,6 +21,9 @@ import org.junit.Assert; import org.junit.Test; +import java.util.HashSet; +import java.util.Set; + public class TestSLSUtils { @Test @@ -36,4 +39,31 @@ public void testGetRackHostname() { Assert.assertEquals(rackHostname[1], "node1"); } + @Test + public void testGenerateNodes() { + Set nodes = SLSUtils.generateNodes(3, 3); + Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size()); + Assert.assertEquals("Number of racks is wrong.", 3, getNumRack(nodes)); + + nodes = SLSUtils.generateNodes(3, 1); + Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size()); + Assert.assertEquals("Number of racks is wrong.", 1, getNumRack(nodes)); + + nodes = SLSUtils.generateNodes(3, 4); + Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size()); + Assert.assertEquals("Number of racks is wrong.", 3, getNumRack(nodes)); + + nodes = SLSUtils.generateNodes(3, 0); + Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size()); + Assert.assertEquals("Number of racks is wrong.", 1, getNumRack(nodes)); + } + + private int getNumRack(Set nodes) { + Set racks = new HashSet<>(); + for (String node : nodes) { + String[] rackHostname = SLSUtils.getRackHostName(node); + racks.add(rackHostname[0]); + } + return racks.size(); + } }