diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 270ef1ba15..9ba2138d3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -87,8 +88,11 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ProfileCapability; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.ExecutionType; @@ -99,6 +103,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.TimelineV2Client; @@ -274,6 +279,10 @@ public enum DSEntity { @VisibleForTesting protected AtomicInteger numRequestedContainers = new AtomicInteger(); + protected AtomicInteger numIgnore = new AtomicInteger(); + + protected AtomicInteger totalRetries = new AtomicInteger(10); + // Shell command to be executed private String shellCommand = ""; // Args to be passed to the shell command @@ -289,6 +298,9 @@ public enum DSEntity { // File length needed for local resource private long shellScriptPathLen = 0; + // Placement Specifications + private Map placementSpecs = null; + // Container retry options private ContainerRetryPolicy containerRetryPolicy = ContainerRetryPolicy.NEVER_RETRY; @@ -334,6 +346,7 @@ public enum DSEntity { private final String windows_command = "cmd /c"; private int yarnShellIdCounter = 1; + private final AtomicLong allocIdCounter = new AtomicLong(1); @VisibleForTesting protected final Set launchedContainers = @@ -457,6 +470,7 @@ public boolean init(String[] args) throws ParseException, IOException { "If container could retry, it specifies max retires"); opts.addOption("container_retry_interval", true, "Interval between each retry, unit is milliseconds"); + opts.addOption("placement_spec", true, "Placement specification"); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); @@ -487,6 +501,17 @@ public boolean init(String[] args) throws ParseException, IOException { dumpOutDebugInfo(); } + if (cliParser.hasOption("placement_spec")) { + String placementSpec = cliParser.getOptionValue("placement_spec"); + LOG.info("Placement Spec received [{}]", placementSpec); + parsePlacementSpecs(placementSpec); + LOG.info("Total num containers requested [{}]", numTotalContainers); + if (numTotalContainers == 0) { + throw new IllegalArgumentException( + "Cannot run distributed shell with no containers"); + } + } + Map envs = System.getenv(); if (!envs.containsKey(Environment.CONTAINER_ID.name())) { @@ -609,8 +634,11 @@ public boolean init(String[] args) throws ParseException, IOException { } containerResourceProfile = cliParser.getOptionValue("container_resource_profile", ""); - numTotalContainers = Integer.parseInt(cliParser.getOptionValue( - "num_containers", "1")); + + if (this.placementSpecs == null) { + numTotalContainers = Integer.parseInt(cliParser.getOptionValue( + "num_containers", "1")); + } if (numTotalContainers == 0) { throw new IllegalArgumentException( "Cannot run distributed shell with no containers"); @@ -642,6 +670,17 @@ public boolean init(String[] args) throws ParseException, IOException { return true; } + private void parsePlacementSpecs(String placementSpecifications) { + Map pSpecs = + PlacementSpec.parse(placementSpecifications); + this.placementSpecs = new HashMap<>(); + this.numTotalContainers = 0; + for (PlacementSpec pSpec : pSpecs.values()) { + this.numTotalContainers += pSpec.numContainers; + this.placementSpecs.put(pSpec.sourceTag, pSpec); + } + } + /** * Helper function to print usage * @@ -719,9 +758,19 @@ public void run() throws YarnException, IOException, InterruptedException { // Register self with ResourceManager // This will start heartbeating to the RM appMasterHostname = NetUtils.getHostname(); + Map, PlacementConstraint> placementConstraintMap = null; + if (this.placementSpecs != null) { + placementConstraintMap = new HashMap<>(); + for (PlacementSpec spec : this.placementSpecs.values()) { + if (spec.constraint != null) { + placementConstraintMap.put( + Collections.singleton(spec.sourceTag), spec.constraint); + } + } + } RegisterApplicationMasterResponse response = amRMClient .registerApplicationMaster(appMasterHostname, appMasterRpcPort, - appMasterTrackingUrl); + appMasterTrackingUrl, placementConstraintMap); resourceProfiles = response.getResourceProfiles(); ResourceUtils.reinitializeResources(response.getResourceTypes()); // Dump out information about cluster capability as seen by the @@ -765,9 +814,20 @@ public void run() throws YarnException, IOException, InterruptedException { // containers // Keep looping until all the containers are launched and shell script // executed on them ( regardless of success/failure). - for (int i = 0; i < numTotalContainersToRequest; ++i) { - ContainerRequest containerAsk = setupContainerAskForRM(); - amRMClient.addContainerRequest(containerAsk); + if (this.placementSpecs == null) { + for (int i = 0; i < numTotalContainersToRequest; ++i) { + ContainerRequest containerAsk = setupContainerAskForRM(); + amRMClient.addContainerRequest(containerAsk); + } + } else { + List schedReqs = new ArrayList<>(); + for (PlacementSpec pSpec : this.placementSpecs.values()) { + for (int i = 0; i < pSpec.numContainers; i++) { + SchedulingRequest sr = setupSchedulingRequest(pSpec); + schedReqs.add(sr); + } + } + amRMClient.addSchedulingRequests(schedReqs); } numRequestedContainers.set(numTotalContainers); } @@ -933,6 +993,12 @@ public void onContainersCompleted(List completedContainers) { numRequestedContainers.decrementAndGet(); // we do not need to release the container as it would be done // by the RM + + // Ignore these containers if placementspec is enabled + // for the time being. + if (placementSpecs != null) { + numIgnore.incrementAndGet(); + } } } else { // nothing to do @@ -962,14 +1028,18 @@ public void onContainersCompleted(List completedContainers) { int askCount = numTotalContainers - numRequestedContainers.get(); numRequestedContainers.addAndGet(askCount); - if (askCount > 0) { - for (int i = 0; i < askCount; ++i) { - ContainerRequest containerAsk = setupContainerAskForRM(); - amRMClient.addContainerRequest(containerAsk); + // Dont bother re-asking if we are using placementSpecs + if (placementSpecs == null) { + if (askCount > 0) { + for (int i = 0; i < askCount; ++i) { + ContainerRequest containerAsk = setupContainerAskForRM(); + amRMClient.addContainerRequest(containerAsk); + } } } - - if (numCompletedContainers.get() == numTotalContainers) { + + if (numCompletedContainers.get() + numIgnore.get() >= + numTotalContainers) { done = true; } } @@ -1028,6 +1098,23 @@ public void onContainersUpdated( } } + @Override + public void onRequestsRejected(List rejReqs) { + List reqsToRetry = new ArrayList<>(); + for (RejectedSchedulingRequest rejReq : rejReqs) { + LOG.info("Scheduling Request {} has been rejected. Reason {}", + rejReq.getRequest(), rejReq.getReason()); + reqsToRetry.add(rejReq.getRequest()); + } + totalRetries.addAndGet(-1 * reqsToRetry.size()); + if (totalRetries.get() <= 0) { + LOG.info("Exiting, since retries are exhausted !!"); + done = true; + } else { + amRMClient.addSchedulingRequests(reqsToRetry); + } + } + @Override public void onShutdownRequest() { done = true; @@ -1335,6 +1422,19 @@ private ContainerRequest setupContainerAskForRM() { return request; } + private SchedulingRequest setupSchedulingRequest(PlacementSpec spec) { + long allocId = allocIdCounter.incrementAndGet(); + SchedulingRequest sReq = SchedulingRequest.newInstance( + allocId, Priority.newInstance(requestPriority), + ExecutionTypeRequest.newInstance(), + Collections.singleton(spec.sourceTag), + ResourceSizing.newInstance( + createProfileCapability().getProfileCapabilityOverride()), null); + sReq.setPlacementConstraint(spec.constraint); + LOG.info("Scheduling Request made: " + sReq.toString()); + return sReq; + } + private boolean fileExist(String filePath) { return new File(filePath).exists(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index ef635d33b9..2aafa942a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -188,6 +188,8 @@ public class Client { // Whether to auto promote opportunistic containers private boolean autoPromoteContainers = false; + // Placement specification + private String placementSpec = ""; // log4j.properties file // if available, add to local resources and set into classpath private String log4jPropFile = ""; @@ -366,6 +368,10 @@ public Client(Configuration conf) throws Exception { "If container could retry, it specifies max retires"); opts.addOption("container_retry_interval", true, "Interval between each retry, unit is milliseconds"); + opts.addOption("placement_spec", true, + "Placement specification. Please note, if this option is specified," + + " The \"num_containers\" option will be ignored. All requested" + + " containers will be of type GUARANTEED" ); } /** @@ -419,6 +425,11 @@ public boolean init(String[] args) throws ParseException { keepContainers = true; } + if (cliParser.hasOption("placement_spec")) { + placementSpec = cliParser.getOptionValue("placement_spec"); + // Check if it is parsable + PlacementSpec.parse(this.placementSpec); + } appName = cliParser.getOptionValue("appname", "DistributedShell"); amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); amQueue = cliParser.getOptionValue("queue", "default"); @@ -834,6 +845,9 @@ public boolean run() throws IOException, YarnException { vargs.add("--container_resource_profile " + containerResourceProfile); } vargs.add("--num_containers " + String.valueOf(numContainers)); + if (placementSpec != null && placementSpec.length() > 0) { + vargs.add("--placement_spec " + placementSpec); + } if (null != nodeLabelExpression) { appContext.setNodeLabelExpression(nodeLabelExpression); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java new file mode 100644 index 0000000000..ed13ee0aa9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.applications.distributedshell; + +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Scanner; + +/** + * Class encapsulating a SourceTag, number of container and a Placement + * Constraint. + */ +public class PlacementSpec { + + private static final Logger LOG = + LoggerFactory.getLogger(PlacementSpec.class); + private static final String SPEC_DELIM = ":"; + private static final String KV_SPLIT_DELIM = "="; + private static final String SPEC_VAL_DELIM = ","; + private static final String IN = "in"; + private static final String NOT_IN = "notin"; + private static final String CARDINALITY = "cardinality"; + + public final String sourceTag; + public final int numContainers; + public final PlacementConstraint constraint; + + public PlacementSpec(String sourceTag, int numContainers, + PlacementConstraint constraint) { + this.sourceTag = sourceTag; + this.numContainers = numContainers; + this.constraint = constraint; + } + + // Placement specification should be of the form: + // PlacementSpec => ""|KeyVal;PlacementSpec + // KeyVal => SourceTag=Constraint + // SourceTag => String + // Constraint => NumContainers| + // NumContainers,"in",Scope,TargetTag| + // NumContainers,"notin",Scope,TargetTag| + // NumContainers,"cardinality",Scope,TargetTag,MinCard,MaxCard + // NumContainers => int (number of containers) + // Scope => "NODE"|"RACK" + // TargetTag => String (Target Tag) + // MinCard => int (min cardinality - needed if ConstraintType == cardinality) + // MaxCard => int (max cardinality - needed if ConstraintType == cardinality) + + /** + * Parser to convert a string representation of a placement spec to mapping + * from source tag to Placement Constraint. + * + * @param specs Placement spec. + * @return Mapping from source tag to placement constraint. + */ + public static Map parse(String specs) { + LOG.info("Parsing Placement Specs: [{}]", specs); + Scanner s = new Scanner(specs).useDelimiter(SPEC_DELIM); + Map pSpecs = new HashMap<>(); + while (s.hasNext()) { + String sp = s.next(); + LOG.info("Parsing Spec: [{}]", sp); + String[] specSplit = sp.split(KV_SPLIT_DELIM); + String sourceTag = specSplit[0]; + Scanner ps = new Scanner(specSplit[1]).useDelimiter(SPEC_VAL_DELIM); + int numContainers = ps.nextInt(); + if (!ps.hasNext()) { + pSpecs.put(sourceTag, + new PlacementSpec(sourceTag, numContainers, null)); + LOG.info("Creating Spec without constraint {}: num[{}]", + sourceTag, numContainers); + continue; + } + String cType = ps.next().toLowerCase(); + String scope = ps.next().toLowerCase(); + + String targetTag = ps.next(); + scope = scope.equals("rack") ? PlacementConstraints.RACK : + PlacementConstraints.NODE; + + PlacementConstraint pc; + if (cType.equals(IN)) { + pc = PlacementConstraints.build( + PlacementConstraints.targetIn(scope, + PlacementConstraints.PlacementTargets.allocationTag( + targetTag))); + LOG.info("Creating IN Constraint for source tag [{}], num[{}]: " + + "scope[{}], target[{}]", + sourceTag, numContainers, scope, targetTag); + } else if (cType.equals(NOT_IN)) { + pc = PlacementConstraints.build( + PlacementConstraints.targetNotIn(scope, + PlacementConstraints.PlacementTargets.allocationTag( + targetTag))); + LOG.info("Creating NOT_IN Constraint for source tag [{}], num[{}]: " + + "scope[{}], target[{}]", + sourceTag, numContainers, scope, targetTag); + } else if (cType.equals(CARDINALITY)) { + int minCard = ps.nextInt(); + int maxCard = ps.nextInt(); + pc = PlacementConstraints.build( + PlacementConstraints.targetCardinality(scope, minCard, maxCard, + PlacementConstraints.PlacementTargets.allocationTag( + targetTag))); + LOG.info("Creating CARDINALITY Constraint source tag [{}], num[{}]: " + + "scope[{}], min[{}], max[{}], target[{}]", + sourceTag, numContainers, scope, minCard, maxCard, targetTag); + } else { + throw new RuntimeException( + "Could not parse constraintType [" + cType + "]" + + " in [" + specSplit[1] + "]"); + } + pSpecs.put(sourceTag, new PlacementSpec(sourceTag, numContainers, pc)); + } + return pSpecs; + } +}