diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index e85973258a..1e83e40559 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -60,6 +60,7 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -298,30 +299,20 @@ private void startNM() throws YarnException, IOException, SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO, SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT); // nm information (fetch from topology file, or from sls/rumen json file) - Map nodeResourceMap = new HashMap<>(); - Set nodeSet; + Set nodeSet = null; if (nodeFile.isEmpty()) { for (String inputTrace : inputTraces) { switch (inputType) { case SLS: nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace); - for (String node : nodeSet) { - nodeResourceMap.put(node, null); - } break; case RUMEN: nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace); - for (String node : nodeSet) { - nodeResourceMap.put(node, null); - } break; case SYNTH: stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(), stjp.getNumNodes()/stjp.getNodesPerRack()); - for (String node : nodeSet) { - nodeResourceMap.put(node, null); - } break; default: throw new YarnException("Input configuration not recognized, " @@ -329,11 +320,11 @@ private void startNM() throws YarnException, IOException, } } } else { - nodeResourceMap = SLSUtils.parseNodesFromNodeFile(nodeFile, + nodeSet = SLSUtils.parseNodesFromNodeFile(nodeFile, nodeManagerResource); } - if (nodeResourceMap.size() == 0) { + if (nodeSet == null || nodeSet.isEmpty()) { throw new YarnException("No node! Please configure nodes."); } @@ -344,20 +335,21 @@ private void startNM() throws YarnException, IOException, SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); ExecutorService executorService = Executors. newFixedThreadPool(threadPoolSize); - for (Map.Entry entry : nodeResourceMap.entrySet()) { + for (NodeDetails nodeDetails : nodeSet) { executorService.submit(new Runnable() { @Override public void run() { try { // we randomize the heartbeat start time from zero to 1 interval NMSimulator nm = new NMSimulator(); Resource nmResource = nodeManagerResource; - String hostName = entry.getKey(); - if (entry.getValue() != null) { - nmResource = entry.getValue(); + String hostName = nodeDetails.getHostname(); + if (nodeDetails.getNodeResource() != null) { + nmResource = nodeDetails.getNodeResource(); } + Set nodeLabels = nodeDetails.getLabels(); nm.init(hostName, nmResource, random.nextInt(heartbeatInterval), - heartbeatInterval, rm, resourceUtilizationRatio); + heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels); nmMap.put(nm.getNode().getNodeID(), nm); runner.schedule(nm); rackSet.add(nm.getNode().getRackName()); @@ -452,6 +444,11 @@ private void createAMForJob(Map jsonJob) throws YarnException { jsonJob.get(SLSConfiguration.JOB_END_MS).toString()); } + String jobLabelExpr = null; + if (jsonJob.containsKey(SLSConfiguration.JOB_LABEL_EXPR)) { + jobLabelExpr = jsonJob.get(SLSConfiguration.JOB_LABEL_EXPR).toString(); + } + String user = (String) jsonJob.get(SLSConfiguration.JOB_USER); if (user == null) { user = "default"; @@ -481,7 +478,8 @@ private void createAMForJob(Map jsonJob) throws YarnException { for (int i = 0; i < jobCount; i++) { runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, - getTaskContainers(jsonJob), getAMContainerResource(jsonJob)); + getTaskContainers(jsonJob), getAMContainerResource(jsonJob), + jobLabelExpr); } } @@ -730,7 +728,7 @@ private void startAMFromSynthGenerator() throws YarnException, IOException { runNewAM(job.getType(), user, jobQueue, oldJobId, jobStartTimeMS, jobFinishTimeMS, containerList, reservationId, - job.getDeadline(), getAMContainerResource(null), + job.getDeadline(), getAMContainerResource(null), null, job.getParams()); } } @@ -775,15 +773,24 @@ private void runNewAM(String jobType, String user, Resource amContainerResource) { runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS, jobFinishTimeMS, containerList, null, -1, - amContainerResource, null); + amContainerResource, null, null); } private void runNewAM(String jobType, String user, String jobQueue, String oldJobId, long jobStartTimeMS, long jobFinishTimeMS, List containerList, - ReservationId reservationId, long deadline, Resource amContainerResource, - Map params) { + Resource amContainerResource, String labelExpr) { + runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS, + jobFinishTimeMS, containerList, null, -1, + amContainerResource, labelExpr, null); + } + @SuppressWarnings("checkstyle:parameternumber") + private void runNewAM(String jobType, String user, + String jobQueue, String oldJobId, long jobStartTimeMS, + long jobFinishTimeMS, List containerList, + ReservationId reservationId, long deadline, Resource amContainerResource, + String labelExpr, Map params) { AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( amClassMap.get(jobType), new Configuration()); @@ -799,7 +806,7 @@ private void runNewAM(String jobType, String user, AM_ID++; amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, - runner.getStartTimeMS(), amContainerResource, params); + runner.getStartTimeMS(), amContainerResource, labelExpr, params); if(reservationId != null) { // if we have a ReservationId, delegate reservation creation to // AMSim (reservation shape is impl specific) @@ -985,4 +992,42 @@ static void printUsage() { System.err.println(); } + /** + * Class to encapsulate all details about the node. + */ + @Private + @Unstable + public static class NodeDetails { + private String hostname; + private Resource nodeResource; + private Set labels; + + public NodeDetails(String nodeHostname) { + this.hostname = nodeHostname; + } + + public String getHostname() { + return hostname; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + public Resource getNodeResource() { + return nodeResource; + } + + public void setNodeResource(Resource nodeResource) { + this.nodeResource = nodeResource; + } + + public Set getLabels() { + return labels; + } + + public void setLabels(Set labels) { + this.labels = labels; + } + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 8e1c256c63..5f34cfccfb 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -88,6 +88,8 @@ public abstract class AMSimulator extends TaskRunner.Task { private int responseId = 0; // user name private String user; + // nodelabel expression + private String nodeLabelExpression; // queue name protected String queue; // am type @@ -123,7 +125,8 @@ public void init(int heartbeatInterval, List containerList, ResourceManager resourceManager, SLSRunner slsRunnner, long startTime, long finishTime, String simUser, String simQueue, boolean tracked, String oldApp, long baseTimeMS, - Resource amResource, Map params) { + Resource amResource, String nodeLabelExpr, + Map params) { super.init(startTime, startTime + 1000000L * heartbeatInterval, heartbeatInterval); this.user = simUser; @@ -136,6 +139,7 @@ public void init(int heartbeatInterval, this.traceStartTimeMS = startTime; this.traceFinishTimeMS = finishTime; this.amContainerResource = amResource; + this.nodeLabelExpression = nodeLabelExpr; } /** @@ -327,6 +331,9 @@ private void submitApp(ReservationId reservationId) conLauContext.setServiceData(new HashMap<>()); appSubContext.setAMContainerSpec(conLauContext); appSubContext.setResource(amContainerResource); + if (nodeLabelExpression != null) { + appSubContext.setNodeLabelExpression(nodeLabelExpression); + } if(reservationId != null) { appSubContext.setReservationID(reservationId); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index 6f0f85ff90..71fc5b2772 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -126,10 +126,11 @@ public void init(int heartbeatInterval, List containerList, ResourceManager rm, SLSRunner se, long traceStartTime, long traceFinishTime, String user, String queue, boolean isTracked, String oldAppId, long baselineStartTimeMS, - Resource amContainerResource, Map params) { + Resource amContainerResource, String nodeLabelExpr, + Map params) { super.init(heartbeatInterval, containerList, rm, se, traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, - baselineStartTimeMS, amContainerResource, params); + baselineStartTimeMS, amContainerResource, nodeLabelExpr, params); amtype = "mapreduce"; // get map/reduce tasks diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java index b41f5f2029..862e5ec0ac 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java @@ -96,10 +96,11 @@ public void init(int heartbeatInterval, List containerList, ResourceManager rm, SLSRunner se, long traceStartTime, long traceFinishTime, String user, String queue, boolean isTracked, String oldAppId, long baselineStartTimeMS, - Resource amContainerResource, Map params) { + Resource amContainerResource, String nodeLabelExpr, + Map params) { super.init(heartbeatInterval, containerList, rm, se, traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, - amContainerResource, params); + amContainerResource, nodeLabelExpr, params); amtype = "stream"; allStreams.addAll(containerList); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java index ea73befc17..09f653f375 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java @@ -104,6 +104,7 @@ public static Resource getAMContainerResource(Configuration conf) { public static final String JOB_START_MS = JOB_PREFIX + "start.ms"; public static final String JOB_END_MS = JOB_PREFIX + "end.ms"; public static final String JOB_QUEUE_NAME = JOB_PREFIX + "queue.name"; + public static final String JOB_LABEL_EXPR = JOB_PREFIX + "label.expression"; public static final String JOB_USER = JOB_PREFIX + "user"; public static final String JOB_COUNT = JOB_PREFIX + "count"; public static final String JOB_TASKS = JOB_PREFIX + "tasks"; diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java index 428a8397ed..6a8430ef41 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; @@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -78,7 +80,7 @@ public class NMSimulator extends TaskRunner.Task { public void init(String nodeIdStr, Resource nodeResource, int dispatchTime, int heartBeatInterval, ResourceManager pRm, - float pResourceUtilizationRatio) + float pResourceUtilizationRatio, Set labels) throws IOException, YarnException { super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval, heartBeatInterval); @@ -102,6 +104,7 @@ public void init(String nodeIdStr, Resource nodeResource, int dispatchTime, Records.newRecord(RegisterNodeManagerRequest.class); req.setNodeId(node.getNodeID()); req.setResource(node.getTotalCapability()); + req.setNodeLabels(labels); req.setHttpPort(80); RegisterNodeManagerResponse response = this.rm.getResourceTrackerService() .registerNodeManager(req); @@ -109,6 +112,14 @@ public void init(String nodeIdStr, Resource nodeResource, int dispatchTime, this.resourceUtilizationRatio = pResourceUtilizationRatio; } + public void init(String nodeIdStr, Resource nodeResource, int dispatchTime, + int heartBeatInterval, ResourceManager pRm, + float pResourceUtilizationRatio) + throws IOException, YarnException { + init(nodeIdStr, nodeResource, dispatchTime, heartBeatInterval, pRm, + pResourceUtilizationRatio, null); + } + @Override public void firstStep() { // do nothing diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java index f2129d0141..8bb4871e5b 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java @@ -23,7 +23,6 @@ import java.io.InputStreamReader; import java.io.Reader; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -41,8 +40,11 @@ import org.apache.hadoop.tools.rumen.LoggedJob; import org.apache.hadoop.tools.rumen.LoggedTask; import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.client.util.YarnClientUtils; +import org.apache.hadoop.yarn.sls.SLSRunner.NodeDetails; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -52,6 +54,10 @@ public class SLSUtils { public final static String DEFAULT_JOB_TYPE = "mapreduce"; + private static final String LABEL_FORMAT_ERR_MSG = + "Input format for adding node-labels is not correct, it should be " + + "labelName1[(exclusive=true/false)],labelName2[] .."; + // hostname includes the network path and the host name. for example // "/default-rack/hostFoo" or "/coreSwitchA/TORSwitchB/hostBar". // the function returns two Strings, the first element is the network @@ -66,9 +72,9 @@ public static String[] getRackHostName(String hostname) { /** * parse the rumen trace file, return each host name */ - public static Set parseNodesFromRumenTrace(String jobTrace) - throws IOException { - Set nodeSet = new HashSet(); + public static Set parseNodesFromRumenTrace( + String jobTrace) throws IOException { + Set nodeSet = new HashSet<>(); File fin = new File(jobTrace); Configuration conf = new Configuration(); @@ -85,7 +91,8 @@ public static Set parseNodesFromRumenTrace(String jobTrace) } LoggedTaskAttempt taskAttempt = mapTask.getAttempts() .get(mapTask.getAttempts().size() - 1); - nodeSet.add(taskAttempt.getHostName().getValue()); + nodeSet.add(new NodeDetails( + taskAttempt.getHostName().getValue())); } for(LoggedTask reduceTask : job.getReduceTasks()) { if (reduceTask.getAttempts().size() == 0) { @@ -93,7 +100,8 @@ public static Set parseNodesFromRumenTrace(String jobTrace) } LoggedTaskAttempt taskAttempt = reduceTask.getAttempts() .get(reduceTask.getAttempts().size() - 1); - nodeSet.add(taskAttempt.getHostName().getValue()); + nodeSet.add(new NodeDetails( + taskAttempt.getHostName().getValue())); } } } finally { @@ -106,9 +114,9 @@ public static Set parseNodesFromRumenTrace(String jobTrace) /** * parse the sls trace file, return each host name */ - public static Set parseNodesFromSLSTrace(String jobTrace) - throws IOException { - Set nodeSet = new HashSet<>(); + public static Set parseNodesFromSLSTrace( + String jobTrace) throws IOException { + Set nodeSet = new HashSet<>(); JsonFactory jsonF = new JsonFactory(); ObjectMapper mapper = new ObjectMapper(); Reader input = @@ -124,7 +132,8 @@ public static Set parseNodesFromSLSTrace(String jobTrace) return nodeSet; } - private static void addNodes(Set nodeSet, Map jsonEntry) { + private static void addNodes(Set nodeSet, + Map jsonEntry) { if (jsonEntry.containsKey(SLSConfiguration.NUM_NODES)) { int numNodes = Integer.parseInt( jsonEntry.get(SLSConfiguration.NUM_NODES).toString()); @@ -142,7 +151,7 @@ private static void addNodes(Set nodeSet, Map jsonEntry) { Map jsonTask = (Map) o; String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST); if (hostname != null) { - nodeSet.add(hostname); + nodeSet.add(new NodeDetails(hostname)); } } } @@ -150,10 +159,11 @@ private static void addNodes(Set nodeSet, Map jsonEntry) { /** * parse the input node file, return each host name + * sample input: label1(exclusive=true),label2(exclusive=false),label3 */ - public static Map parseNodesFromNodeFile(String nodeFile, - Resource nmDefaultResource) throws IOException { - Map nodeResourceMap = new HashMap<>(); + public static Set parseNodesFromNodeFile( + String nodeFile, Resource nmDefaultResource) throws IOException { + Set nodeSet = new HashSet<>(); JsonFactory jsonF = new JsonFactory(); ObjectMapper mapper = new ObjectMapper(); Reader input = @@ -166,6 +176,8 @@ public static Map parseNodesFromNodeFile(String nodeFile, List tasks = (List) jsonE.get("nodes"); for (Object o : tasks) { Map jsonNode = (Map) o; + NodeDetails nodeDetails = new NodeDetails( + rack + "/" + jsonNode.get("node")); Resource nodeResource = Resources.clone(nmDefaultResource); ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); for (ResourceInformation info : infors) { @@ -174,18 +186,25 @@ public static Map parseNodesFromNodeFile(String nodeFile, Integer.parseInt(jsonNode.get(info.getName()).toString())); } } - nodeResourceMap.put(rack + "/" + jsonNode.get("node"), nodeResource); + nodeDetails.setNodeResource(nodeResource); + if (jsonNode.get("labels") != null) { + Set nodeLabels = new HashSet<>( + YarnClientUtils.buildNodeLabelsFromStr( + jsonNode.get("labels").toString())); + nodeDetails.setLabels(nodeLabels); + } + nodeSet.add(nodeDetails); } } } finally { input.close(); } - return nodeResourceMap; + return nodeSet; } - public static Set generateNodes(int numNodes, + public static Set generateNodes(int numNodes, int numRacks){ - Set nodeSet = new HashSet<>(); + Set nodeSet = new HashSet<>(); if (numRacks < 1) { numRacks = 1; } @@ -195,7 +214,8 @@ public static Set generateNodes(int numNodes, } for (int i = 0; i < numNodes; i++) { - nodeSet.add("/rack" + i % numRacks + "/node" + i); + nodeSet.add(new NodeDetails( + "/rack" + i % numRacks + "/node" + i)); } return nodeSet; } diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index bc8ea70e46..2efa846441 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -19,10 +19,13 @@ import com.codahale.metrics.MetricRegistry; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.client.cli.RMAdminCLI; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; @@ -42,6 +45,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.concurrent.ConcurrentMap; @RunWith(Parameterized.class) public class TestAMSimulator { @@ -73,6 +77,7 @@ public void setup() { conf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricOutputDir.toString()); conf.set(YarnConfiguration.RM_SCHEDULER, slsScheduler.getName()); conf.set(SLSConfiguration.RM_SCHEDULER, scheduler.getName()); + conf.set(YarnConfiguration.NODE_LABELS_ENABLED, "true"); conf.setBoolean(SLSConfiguration.METRICS_SWITCH, true); rm = new ResourceManager(); rm.init(conf); @@ -140,7 +145,7 @@ public void testAMSimulator() throws Exception { String queue = "default"; List containers = new ArrayList<>(); app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, - appId, 0, SLSConfiguration.getAMContainerResource(conf), null); + appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null); app.firstStep(); verifySchedulerMetrics(appId); @@ -152,6 +157,34 @@ public void testAMSimulator() throws Exception { app.lastStep(); } + @Test + public void testAMSimulatorWithNodeLabels() throws Exception { + if (scheduler.equals(CapacityScheduler.class)) { + // add label to the cluster + RMAdminCLI rmAdminCLI = new RMAdminCLI(conf); + String[] args = {"-addToClusterNodeLabels", "label1"}; + rmAdminCLI.run(args); + + MockAMSimulator app = new MockAMSimulator(); + String appId = "app1"; + String queue = "default"; + List containers = new ArrayList<>(); + app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, + appId, 0, SLSConfiguration.getAMContainerResource(conf), + "label1", null); + app.firstStep(); + + verifySchedulerMetrics(appId); + + ConcurrentMap rmApps = + rm.getRMContext().getRMApps(); + Assert.assertEquals(1, rmApps.size()); + RMApp rmApp = rmApps.get(app.appId); + Assert.assertNotNull(rmApp); + Assert.assertEquals("label1", rmApp.getAmNodeLabelExpression()); + } + } + @After public void tearDown() { if (rm != null) { diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java index 5e586b13be..c59c2af81b 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java @@ -18,13 +18,13 @@ package org.apache.hadoop.yarn.sls.utils; -import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.sls.SLSRunner.NodeDetails; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; import java.util.HashSet; -import java.util.Map; import java.util.Set; public class TestSLSUtils { @@ -45,28 +45,54 @@ public void testGetRackHostname() { @Test public void testParseNodesFromNodeFile() throws Exception { String nodeFile = "src/test/resources/nodes.json"; - Map nodeResourceMap = SLSUtils.parseNodesFromNodeFile( + Set nodeDetails = SLSUtils.parseNodesFromNodeFile( nodeFile, Resources.createResource(1024, 2)); - Assert.assertEquals(20, nodeResourceMap.size()); + Assert.assertEquals(20, nodeDetails.size()); nodeFile = "src/test/resources/nodes-with-resources.json"; - nodeResourceMap = SLSUtils.parseNodesFromNodeFile( + nodeDetails = SLSUtils.parseNodesFromNodeFile( nodeFile, Resources.createResource(1024, 2)); - Assert.assertEquals(4, - nodeResourceMap.size()); - Assert.assertEquals(2048, - nodeResourceMap.get("/rack1/node1").getMemorySize()); - Assert.assertEquals(6, - nodeResourceMap.get("/rack1/node1").getVirtualCores()); - Assert.assertEquals(1024, - nodeResourceMap.get("/rack1/node2").getMemorySize()); - Assert.assertEquals(2, - nodeResourceMap.get("/rack1/node2").getVirtualCores()); + Assert.assertEquals(4, nodeDetails.size()); + for (NodeDetails nodeDetail : nodeDetails) { + if (nodeDetail.getHostname().equals("/rack1/node1")) { + Assert.assertEquals(2048, + nodeDetail.getNodeResource().getMemorySize()); + Assert.assertEquals(6, + nodeDetail.getNodeResource().getVirtualCores()); + } else if (nodeDetail.getHostname().equals("/rack1/node2")) { + Assert.assertEquals(1024, + nodeDetail.getNodeResource().getMemorySize()); + Assert.assertEquals(2, + nodeDetail.getNodeResource().getVirtualCores()); + Assert.assertNull(nodeDetail.getLabels()); + } else if (nodeDetail.getHostname().equals("/rack1/node3")) { + Assert.assertEquals(1024, + nodeDetail.getNodeResource().getMemorySize()); + Assert.assertEquals(2, + nodeDetail.getNodeResource().getVirtualCores()); + Assert.assertEquals(2, nodeDetail.getLabels().size()); + for (NodeLabel nodeLabel : nodeDetail.getLabels()) { + if (nodeLabel.getName().equals("label1")) { + Assert.assertTrue(nodeLabel.isExclusive()); + } else if(nodeLabel.getName().equals("label2")) { + Assert.assertFalse(nodeLabel.isExclusive()); + } else { + Assert.assertTrue("Unexepected label", false); + } + } + } else if (nodeDetail.getHostname().equals("/rack1/node4")) { + Assert.assertEquals(6144, + nodeDetail.getNodeResource().getMemorySize()); + Assert.assertEquals(12, + nodeDetail.getNodeResource().getVirtualCores()); + Assert.assertEquals(2, nodeDetail.getLabels().size()); + } + } } @Test public void testGenerateNodes() { - Set nodes = SLSUtils.generateNodes(3, 3); + Set nodes = SLSUtils.generateNodes(3, 3); Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size()); Assert.assertEquals("Number of racks is wrong.", 3, getNumRack(nodes)); @@ -83,10 +109,10 @@ public void testGenerateNodes() { Assert.assertEquals("Number of racks is wrong.", 1, getNumRack(nodes)); } - private int getNumRack(Set nodes) { + private int getNumRack(Set nodes) { Set racks = new HashSet<>(); - for (String node : nodes) { - String[] rackHostname = SLSUtils.getRackHostName(node); + for (NodeDetails node : nodes) { + String[] rackHostname = SLSUtils.getRackHostName(node.getHostname()); racks.add(rackHostname[0]); } return racks.size(); diff --git a/hadoop-tools/hadoop-sls/src/test/resources/nodes-with-resources.json b/hadoop-tools/hadoop-sls/src/test/resources/nodes-with-resources.json index 003918114a..dc5f0203c5 100644 --- a/hadoop-tools/hadoop-sls/src/test/resources/nodes-with-resources.json +++ b/hadoop-tools/hadoop-sls/src/test/resources/nodes-with-resources.json @@ -10,10 +10,14 @@ "node": "node2" }, { - "node": "node3" + "node": "node3", + "labels": "label1, label2(exclusive=false)" }, { - "node": "node4" + "node": "node4", + "labels": "label1, label2(exclusive=false)", + "memory-mb" : 6144, + "vcores" : 12 } ] } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 8d1d56b3f9..a24c39864f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.PrintStream; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -54,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.RMHAServiceTarget; +import org.apache.hadoop.yarn.client.util.YarnClientUtils; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -82,7 +82,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; + +import static org.apache.hadoop.yarn.client.util.YarnClientUtils.NO_LABEL_ERR_MSG; @Private @Unstable @@ -91,15 +92,10 @@ public class RMAdminCLI extends HAAdmin { private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); static CommonNodeLabelsManager localNodeLabelsManager = null; - private static final String NO_LABEL_ERR_MSG = - "No cluster node-labels are specified"; private static final String NO_MAPPING_ERR_MSG = "No node-to-labels mappings are specified"; private static final String INVALID_TIMEOUT_ERR_MSG = "Invalid timeout specified : "; - private static final String ADD_LABEL_FORMAT_ERR_MSG = - "Input format for adding node-labels is not correct, it should be " - + "labelName1[(exclusive=true/false)],LabelName2[] .."; private static final Pattern RESOURCE_TYPES_ARGS_PATTERN = Pattern.compile("^[0-9]*$"); @@ -533,65 +529,6 @@ private int getGroups(String[] usernames) throws IOException { } return localNodeLabelsManager; } - - private List buildNodeLabelsFromStr(String args) { - List nodeLabels = new ArrayList<>(); - for (String p : args.split(",")) { - if (!p.trim().isEmpty()) { - String labelName = p; - - // Try to parse exclusive - boolean exclusive = NodeLabel.DEFAULT_NODE_LABEL_EXCLUSIVITY; - int leftParenthesisIdx = p.indexOf("("); - int rightParenthesisIdx = p.indexOf(")"); - - if ((leftParenthesisIdx == -1 && rightParenthesisIdx != -1) - || (leftParenthesisIdx != -1 && rightParenthesisIdx == -1)) { - // Parenthese not match - throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG); - } - - if (leftParenthesisIdx > 0 && rightParenthesisIdx > 0) { - if (leftParenthesisIdx > rightParenthesisIdx) { - // Parentese not match - throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG); - } - - String property = p.substring(p.indexOf("(") + 1, p.indexOf(")")); - if (property.contains("=")) { - String key = property.substring(0, property.indexOf("=")).trim(); - String value = - property - .substring(property.indexOf("=") + 1, property.length()) - .trim(); - - // Now we only support one property, which is exclusive, so check if - // key = exclusive and value = {true/false} - if (key.equals("exclusive") - && ImmutableSet.of("true", "false").contains(value)) { - exclusive = Boolean.parseBoolean(value); - } else { - throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG); - } - } else if (!property.trim().isEmpty()) { - throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG); - } - } - - // Try to get labelName if there's "(..)" - if (labelName.contains("(")) { - labelName = labelName.substring(0, labelName.indexOf("(")).trim(); - } - - nodeLabels.add(NodeLabel.newInstance(labelName, exclusive)); - } - } - - if (nodeLabels.isEmpty()) { - throw new IllegalArgumentException(NO_LABEL_ERR_MSG); - } - return nodeLabels; - } private Set buildNodeLabelNamesFromStr(String args) { Set labels = new HashSet(); @@ -624,7 +561,7 @@ private int handleAddToClusterNodeLabels(String[] args, String cmd, return exitCode; } - List labels = buildNodeLabelsFromStr( + List labels = YarnClientUtils.buildNodeLabelsFromStr( cliParser.getOptionValue("addToClusterNodeLabels")); if (cliParser.hasOption("directlyAccessNodeLabelStore")) { getNodeLabelManagerInstance(getConf()).addToCluserNodeLabels(labels); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/YarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/YarnClientUtils.java index 1e3112aff7..17176752f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/YarnClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/YarnClientUtils.java @@ -19,8 +19,13 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -29,6 +34,14 @@ * YARN clients. */ public abstract class YarnClientUtils { + + private static final String ADD_LABEL_FORMAT_ERR_MSG = + "Input format for adding node-labels is not correct, it should be " + + "labelName1[(exclusive=true/false)],LabelName2[] .."; + + public static final String NO_LABEL_ERR_MSG = + "No cluster node-labels are specified"; + /** * Look up and return the resource manager's principal. This method * automatically does the _HOST replacement in the principal and @@ -79,6 +92,70 @@ public static String getRmPrincipal(String rmPrincipal, Configuration conf) return SecurityUtil.getServerPrincipal(rmPrincipal, hostname); } + /** + * Creates node labels from string + * @param args nodelabels string to be parsed + * @return list of node labels + */ + public static List buildNodeLabelsFromStr(String args) { + List nodeLabels = new ArrayList<>(); + for (String p : args.split(",")) { + if (!p.trim().isEmpty()) { + String labelName = p; + + // Try to parse exclusive + boolean exclusive = NodeLabel.DEFAULT_NODE_LABEL_EXCLUSIVITY; + int leftParenthesisIdx = p.indexOf("("); + int rightParenthesisIdx = p.indexOf(")"); + + if ((leftParenthesisIdx == -1 && rightParenthesisIdx != -1) + || (leftParenthesisIdx != -1 && rightParenthesisIdx == -1)) { + // Parentheses not match + throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG); + } + + if (leftParenthesisIdx > 0 && rightParenthesisIdx > 0) { + if (leftParenthesisIdx > rightParenthesisIdx) { + // Parentheses not match + throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG); + } + + String property = p.substring(p.indexOf("(") + 1, p.indexOf(")")); + if (property.contains("=")) { + String key = property.substring(0, property.indexOf("=")).trim(); + String value = + property + .substring(property.indexOf("=") + 1, property.length()) + .trim(); + + // Now we only support one property, which is exclusive, so check if + // key = exclusive and value = {true/false} + if (key.equals("exclusive") + && ImmutableSet.of("true", "false").contains(value)) { + exclusive = Boolean.parseBoolean(value); + } else { + throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG); + } + } else if (!property.trim().isEmpty()) { + throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG); + } + } + + // Try to get labelName if there's "(..)" + if (labelName.contains("(")) { + labelName = labelName.substring(0, labelName.indexOf("(")).trim(); + } + + nodeLabels.add(NodeLabel.newInstance(labelName, exclusive)); + } + } + + if (nodeLabels.isEmpty()) { + throw new IllegalArgumentException(NO_LABEL_ERR_MSG); + } + return nodeLabels; + } + /** * Returns a {@link YarnConfiguration} built from the {@code conf} parameter * that is guaranteed to have the {@link YarnConfiguration#RM_HA_ID}