From 78860372bd8048168c6aa27a9526c40f5869cf2c Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Tue, 17 Apr 2018 13:11:34 -0700 Subject: [PATCH] YARN-8134. Support specifying node resources in SLS. Contributed by Abhishek Modi. --- hadoop-tools/hadoop-sls/pom.xml | 1 + .../org/apache/hadoop/yarn/sls/SLSRunner.java | 35 +++++++++++++------ .../hadoop/yarn/sls/utils/SLSUtils.java | 24 ++++++++++--- .../hadoop/yarn/sls/utils/TestSLSUtils.java | 25 +++++++++++++ .../test/resources/nodes-with-resources.json | 19 ++++++++++ 5 files changed, 89 insertions(+), 15 deletions(-) create mode 100644 hadoop-tools/hadoop-sls/src/test/resources/nodes-with-resources.json diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml index 10f6294aa4..a42f8decd8 100644 --- a/hadoop-tools/hadoop-sls/pom.xml +++ b/hadoop-tools/hadoop-sls/pom.xml @@ -137,6 +137,7 @@ src/test/resources/syn_stream.json src/test/resources/inputsls.json src/test/resources/nodes.json + src/test/resources/nodes-with-resources.json src/test/resources/exit-invariants.txt src/test/resources/ongoing-invariants.txt diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 8a522fed82..8504b9d9a6 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -292,21 +292,30 @@ private void startNM() throws YarnException, IOException { SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO, SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT); // nm information (fetch from topology file, or from sls/rumen json file) - Set nodeSet = new HashSet(); + Map nodeResourceMap = new HashMap<>(); + Set nodeSet; if (nodeFile.isEmpty()) { for (String inputTrace : inputTraces) { - switch (inputType) { case SLS: - nodeSet.addAll(SLSUtils.parseNodesFromSLSTrace(inputTrace)); + nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace); + for (String node : nodeSet) { + nodeResourceMap.put(node, null); + } break; case RUMEN: - nodeSet.addAll(SLSUtils.parseNodesFromRumenTrace(inputTrace)); + nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace); + for (String node : nodeSet) { + nodeResourceMap.put(node, null); + } break; case SYNTH: stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); - nodeSet.addAll(SLSUtils.generateNodes(stjp.getNumNodes(), - stjp.getNumNodes()/stjp.getNodesPerRack())); + nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(), + stjp.getNumNodes()/stjp.getNodesPerRack()); + for (String node : nodeSet) { + nodeResourceMap.put(node, null); + } break; default: throw new YarnException("Input configuration not recognized, " @@ -314,20 +323,26 @@ private void startNM() throws YarnException, IOException { } } } else { - nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile)); + nodeResourceMap = SLSUtils.parseNodesFromNodeFile(nodeFile, + nodeManagerResource); } - if (nodeSet.size() == 0) { + if (nodeResourceMap.size() == 0) { throw new YarnException("No node! Please configure nodes."); } // create NM simulators Random random = new Random(); Set rackSet = new HashSet(); - for (String hostName : nodeSet) { + for (Map.Entry entry : nodeResourceMap.entrySet()) { // we randomize the heartbeat start time from zero to 1 interval NMSimulator nm = new NMSimulator(); - nm.init(hostName, nodeManagerResource, random.nextInt(heartbeatInterval), + Resource nmResource = nodeManagerResource; + String hostName = entry.getKey(); + if (entry.getValue() != null) { + nmResource = entry.getValue(); + } + nm.init(hostName, nmResource, random.nextInt(heartbeatInterval), heartbeatInterval, rm, resourceUtilizationRatio); nmMap.put(nm.getNode().getNodeID(), nm); runner.schedule(nm); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java index e914fe733a..f2129d0141 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; + +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -39,7 +41,11 @@ import org.apache.hadoop.tools.rumen.LoggedJob; import org.apache.hadoop.tools.rumen.LoggedTask; import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; @Private @Unstable @@ -145,9 +151,9 @@ private static void addNodes(Set nodeSet, Map jsonEntry) { /** * parse the input node file, return each host name */ - public static Set parseNodesFromNodeFile(String nodeFile) - throws IOException { - Set nodeSet = new HashSet(); + public static Map parseNodesFromNodeFile(String nodeFile, + Resource nmDefaultResource) throws IOException { + Map nodeResourceMap = new HashMap<>(); JsonFactory jsonF = new JsonFactory(); ObjectMapper mapper = new ObjectMapper(); Reader input = @@ -160,13 +166,21 @@ public static Set parseNodesFromNodeFile(String nodeFile) List tasks = (List) jsonE.get("nodes"); for (Object o : tasks) { Map jsonNode = (Map) o; - nodeSet.add(rack + "/" + jsonNode.get("node")); + Resource nodeResource = Resources.clone(nmDefaultResource); + ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); + for (ResourceInformation info : infors) { + if (jsonNode.get(info.getName()) != null) { + nodeResource.setResourceValue(info.getName(), + Integer.parseInt(jsonNode.get(info.getName()).toString())); + } + } + nodeResourceMap.put(rack + "/" + jsonNode.get("node"), nodeResource); } } } finally { input.close(); } - return nodeSet; + return nodeResourceMap; } public static Set generateNodes(int numNodes, diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java index 30964a1bce..5e586b13be 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java @@ -18,10 +18,13 @@ package org.apache.hadoop.yarn.sls.utils; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; import java.util.HashSet; +import java.util.Map; import java.util.Set; public class TestSLSUtils { @@ -39,6 +42,28 @@ public void testGetRackHostname() { Assert.assertEquals(rackHostname[1], "node1"); } + @Test + public void testParseNodesFromNodeFile() throws Exception { + String nodeFile = "src/test/resources/nodes.json"; + Map nodeResourceMap = SLSUtils.parseNodesFromNodeFile( + nodeFile, Resources.createResource(1024, 2)); + Assert.assertEquals(20, nodeResourceMap.size()); + + nodeFile = "src/test/resources/nodes-with-resources.json"; + nodeResourceMap = SLSUtils.parseNodesFromNodeFile( + nodeFile, Resources.createResource(1024, 2)); + Assert.assertEquals(4, + nodeResourceMap.size()); + Assert.assertEquals(2048, + nodeResourceMap.get("/rack1/node1").getMemorySize()); + Assert.assertEquals(6, + nodeResourceMap.get("/rack1/node1").getVirtualCores()); + Assert.assertEquals(1024, + nodeResourceMap.get("/rack1/node2").getMemorySize()); + Assert.assertEquals(2, + nodeResourceMap.get("/rack1/node2").getVirtualCores()); + } + @Test public void testGenerateNodes() { Set nodes = SLSUtils.generateNodes(3, 3); diff --git a/hadoop-tools/hadoop-sls/src/test/resources/nodes-with-resources.json b/hadoop-tools/hadoop-sls/src/test/resources/nodes-with-resources.json new file mode 100644 index 0000000000..003918114a --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/test/resources/nodes-with-resources.json @@ -0,0 +1,19 @@ +{ + "rack": "rack1", + "nodes": [ + { + "node": "node1", + "memory-mb" : 2048, + "vcores" : 6 + }, + { + "node": "node2" + }, + { + "node": "node3" + }, + { + "node": "node4" + } + ] +}