YARN-5588. [YARN-3926] Add support for resource profiles in distributed shell. Contributed by Varun Vasudev.

This commit is contained in:
Sunil G 2017-02-27 21:44:14 +05:30 committed by Wangda Tan
parent 6708ac3301
commit 7805deed48
9 changed files with 287 additions and 58 deletions

View File

@ -150,17 +150,21 @@ public static Resource toResource(ProfileCapability capability,
.checkArgument(capability != null, "Capability cannot be null");
Preconditions.checkArgument(resourceProfilesMap != null,
"Resource profiles map cannot be null");
Resource none = Resource.newInstance(0, 0);
Resource resource = Resource.newInstance(0, 0);
if (resourceProfilesMap.containsKey(capability.getProfileName())) {
resource = Resource
.newInstance(resourceProfilesMap.get(capability.getProfileName()));
String profileName = capability.getProfileName();
if (profileName.isEmpty()) {
profileName = DEFAULT_PROFILE;
}
if (resourceProfilesMap.containsKey(profileName)) {
resource = Resource.newInstance(resourceProfilesMap.get(profileName));
}
if(capability.getProfileCapabilityOverride()!= null) {
if (capability.getProfileCapabilityOverride() != null &&
!capability.getProfileCapabilityOverride().equals(none)) {
for (Map.Entry<String, ResourceInformation> entry : capability
.getProfileCapabilityOverride().getResources().entrySet()) {
if (entry.getValue() != null && entry.getValue().getValue() != 0) {
if (entry.getValue() != null && entry.getValue().getValue() >= 0) {
resource.setResourceInformation(entry.getKey(), entry.getValue());
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.exceptions;
/**
* This exception is thrown when the client requests information about
* ResourceProfiles in the
* {@link org.apache.hadoop.yarn.api.ApplicationClientProtocol} but resource
* profiles is not enabled on the RM.
*
*/
public class ResourceProfilesNotEnabledException extends YarnException {
private static final long serialVersionUID = 13498237L;
public ResourceProfilesNotEnabledException(Throwable cause) {
super(cause);
}
public ResourceProfilesNotEnabledException(String message) {
super(message);
}
public ResourceProfilesNotEnabledException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -87,6 +87,7 @@
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
@ -103,6 +104,7 @@
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
@ -231,12 +233,18 @@ public enum DSEntity {
@VisibleForTesting
protected int numTotalContainers = 1;
// Memory to request for the container on which the shell command will run
private long containerMemory = 10;
private static final long DEFAULT_CONTAINER_MEMORY = 10;
private long containerMemory = DEFAULT_CONTAINER_MEMORY;
// VirtualCores to request for the container on which the shell command will run
private int containerVirtualCores = 1;
private static final int DEFAULT_CONTAINER_VCORES = 1;
private int containerVirtualCores = DEFAULT_CONTAINER_VCORES;
// Priority of the request
private int requestPriority;
// Resource profile for the container
private String containerResourceProfile = "";
Map<String, Resource> resourceProfiles;
// Counter for completed containers ( complete denotes successful or failed )
private AtomicInteger numCompletedContainers = new AtomicInteger();
// Allocated container count so that we know how many containers has the RM
@ -407,6 +415,8 @@ public boolean init(String[] args) throws ParseException, IOException {
"Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true,
"Amount of virtual cores to be requested to run the shell command");
opts.addOption("container_resource_profile", true,
"Resource profile to be requested to run the shell command");
opts.addOption("num_containers", true,
"No. of containers on which the shell command needs to be executed");
opts.addOption("priority", true, "Application Priority. Default 0");
@ -548,9 +558,11 @@ public boolean init(String[] args) throws ParseException, IOException {
}
containerMemory = Integer.parseInt(cliParser.getOptionValue(
"container_memory", "10"));
"container_memory", "-1"));
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
"container_vcores", "1"));
"container_vcores", "-1"));
containerResourceProfile =
cliParser.getOptionValue("container_resource_profile", "");
numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
"num_containers", "1"));
if (numTotalContainers == 0) {
@ -669,6 +681,7 @@ public void run() throws YarnException, IOException, InterruptedException {
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
resourceProfiles = response.getResourceProfiles();
// Dump out information about cluster capability as seen by the
// resource manager
long maxMem = response.getMaximumResourceCapability().getMemorySize();
@ -1227,12 +1240,8 @@ private ContainerRequest setupContainerAskForRM() {
Priority pri = Priority.newInstance(requestPriority);
// Set up resource type requirements
// For now, memory and CPU are supported so we set memory and cpu requirements
Resource capability = Resource.newInstance(containerMemory,
containerVirtualCores);
ContainerRequest request = new ContainerRequest(capability, null, null,
pri);
ContainerRequest request =
new ContainerRequest(createProfileCapability(), null, null, pri);
LOG.info("Requested container ask: " + request.toString());
return request;
}
@ -1496,4 +1505,36 @@ public TimelinePutResponse run() throws Exception {
}
}
private ProfileCapability createProfileCapability()
throws YarnRuntimeException {
if (containerMemory < -1 || containerMemory == 0) {
throw new YarnRuntimeException("Value of AM memory '" + containerMemory
+ "' has to be greater than 0");
}
if (containerVirtualCores < -1 || containerVirtualCores == 0) {
throw new YarnRuntimeException(
"Value of AM vcores '" + containerVirtualCores
+ "' has to be greater than 0");
}
Resource resourceCapability =
Resource.newInstance(containerMemory, containerVirtualCores);
if (resourceProfiles == null) {
containerMemory = containerMemory == -1 ? DEFAULT_CONTAINER_MEMORY :
containerMemory;
containerVirtualCores =
containerVirtualCores == -1 ? DEFAULT_CONTAINER_VCORES :
containerVirtualCores;
resourceCapability.setMemorySize(containerMemory);
resourceCapability.setVirtualCores(containerVirtualCores);
}
String profileName = containerResourceProfile;
if ("".equals(containerResourceProfile) && resourceProfiles != null) {
profileName = "default";
}
ProfileCapability capability =
ProfileCapability.newInstance(profileName, resourceCapability);
return capability;
}
}

View File

@ -66,10 +66,12 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@ -79,8 +81,9 @@
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ResourceProfilesNotEnabledException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
@ -119,6 +122,11 @@
public class Client {
private static final Log LOG = LogFactory.getLog(Client.class);
private static final int DEFAULT_AM_MEMORY = 100;
private static final int DEFAULT_AM_VCORES = 1;
private static final int DEFAULT_CONTAINER_MEMORY = 10;
private static final int DEFAULT_CONTAINER_VCORES = 1;
// Configuration
private Configuration conf;
@ -130,9 +138,12 @@ public class Client {
// Queue for App master
private String amQueue = "";
// Amt. of memory resource to request for to run the App Master
private long amMemory = 100;
private long amMemory = DEFAULT_AM_MEMORY;
// Amt. of virtual core resource to request for to run the App Master
private int amVCores = 1;
private int amVCores = DEFAULT_AM_VCORES;
// AM resource profile
private String amResourceProfile = "";
// Application master jar file
private String appMasterJar = "";
@ -151,9 +162,11 @@ public class Client {
private int shellCmdPriority = 0;
// Amt of memory to request for container in which shell script will be executed
private int containerMemory = 10;
private long containerMemory = DEFAULT_CONTAINER_MEMORY;
// Amt. of virtual cores to request for container in which shell script will be executed
private int containerVirtualCores = 1;
private int containerVirtualCores = DEFAULT_CONTAINER_VCORES;
// container resource profile
private String containerResourceProfile = "";
// No. of containers in which the shell script needs to be executed
private int numContainers = 1;
private String nodeLabelExpression = null;
@ -256,6 +269,7 @@ public Client(Configuration conf) throws Exception {
opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
opts.addOption("jar", true, "Jar file containing the application master");
opts.addOption("master_resource_profile", true, "Resource profile for the application master");
opts.addOption("shell_command", true, "Shell command to be executed by " +
"the Application Master. Can only specify either --shell_command " +
"or --shell_script");
@ -269,6 +283,7 @@ public Client(Configuration conf) throws Exception {
opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
opts.addOption("container_resource_profile", true, "Resource profile for the shell command");
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
opts.addOption("log_properties", true, "log4j.properties file");
opts.addOption("keep_containers_across_application_attempts", false,
@ -372,17 +387,11 @@ public boolean init(String[] args) throws ParseException {
appName = cliParser.getOptionValue("appname", "DistributedShell");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "default");
amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "100"));
amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
if (amMemory < 0) {
throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
+ " Specified memory=" + amMemory);
}
if (amVCores < 0) {
throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
+ " Specified virtual cores=" + amVCores);
}
amMemory =
Integer.parseInt(cliParser.getOptionValue("master_memory", "-1"));
amVCores =
Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1"));
amResourceProfile = cliParser.getOptionValue("master_resource_profile", "");
if (!cliParser.hasOption("jar")) {
throw new IllegalArgumentException("No jar file specified for application master");
@ -423,17 +432,18 @@ public boolean init(String[] args) throws ParseException {
}
shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
containerMemory =
Integer.parseInt(cliParser.getOptionValue("container_memory", "-1"));
containerVirtualCores =
Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1"));
containerResourceProfile =
cliParser.getOptionValue("container_resource_profile", "");
numContainers =
Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
+ " exiting."
+ " Specified containerMemory=" + containerMemory
+ ", containerVirtualCores=" + containerVirtualCores
+ ", numContainer=" + numContainers);
if (numContainers < 1) {
throw new IllegalArgumentException("Invalid no. of containers specified,"
+ " exiting. Specified numContainer=" + numContainers);
}
nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null);
@ -540,6 +550,32 @@ public boolean run() throws IOException, YarnException {
prepareTimelineDomain();
}
Map<String, Resource> profiles;
try {
profiles = yarnClient.getResourceProfiles();
} catch (ResourceProfilesNotEnabledException re) {
profiles = null;
}
List<String> appProfiles = new ArrayList<>(2);
appProfiles.add(amResourceProfile);
appProfiles.add(containerResourceProfile);
for (String appProfile : appProfiles) {
if (appProfile != null && !appProfile.isEmpty()) {
if (profiles == null) {
String message = "Resource profiles is not enabled";
LOG.error(message);
throw new IOException(message);
}
if (!profiles.containsKey(appProfile)) {
String message = "Unknown resource profile '" + appProfile
+ "'. Valid resource profiles are " + profiles.keySet();
LOG.error(message);
throw new IOException(message);
}
}
}
// Get a new application id
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
@ -573,6 +609,13 @@ public boolean run() throws IOException, YarnException {
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
setAMResourceCapability(appContext, amMemory, amVCores, amResourceProfile,
amPriority, profiles);
setContainerResources(containerMemory, containerVirtualCores, profiles);
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
@ -696,8 +739,16 @@ public boolean run() throws IOException, YarnException {
// Set class name
vargs.add(appMasterMainClass);
// Set params for Application Master
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
if (containerMemory > 0) {
vargs.add("--container_memory " + String.valueOf(containerMemory));
}
if (containerVirtualCores > 0) {
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
}
if (containerResourceProfile != null && !containerResourceProfile
.isEmpty()) {
vargs.add("--container_resource_profile " + containerResourceProfile);
}
vargs.add("--num_containers " + String.valueOf(numContainers));
if (null != nodeLabelExpression) {
appContext.setNodeLabelExpression(nodeLabelExpression);
@ -730,12 +781,6 @@ public boolean run() throws IOException, YarnException {
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
localResources, env, commands, null, null, null);
// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);
// Service data is a binary blob that can be passed to the application
// Not needed in this scenario
// amContainer.setServiceData(serviceData);
@ -933,4 +978,63 @@ private void prepareTimelineDomain() {
timelineClient.stop();
}
}
private void setAMResourceCapability(ApplicationSubmissionContext appContext,
long memory, int vcores, String profile, int priority,
Map<String, Resource> profiles) throws IllegalArgumentException {
if (memory < -1 || memory == 0) {
throw new IllegalArgumentException("Invalid memory specified for"
+ " application master, exiting. Specified memory=" + memory);
}
if (vcores < -1 || vcores == 0) {
throw new IllegalArgumentException("Invalid virtual cores specified for"
+ " application master, exiting. Specified virtual cores=" + vcores);
}
String tmp = profile;
if (profile.isEmpty()) {
tmp = "default";
}
if (appContext.getAMContainerResourceRequest() == null) {
appContext.setAMContainerResourceRequest(ResourceRequest
.newInstance(Priority.newInstance(priority), "*",
Resources.clone(Resources.none()), 1));
}
if (appContext.getAMContainerResourceRequest().getProfileCapability()
== null) {
appContext.getAMContainerResourceRequest().setProfileCapability(
ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0)));
}
Resource capability = Resource.newInstance(0, 0);
// set amMemory because it's used to set Xmx param
if (profiles == null) {
amMemory = memory == -1 ? DEFAULT_AM_MEMORY : memory;
amVCores = vcores == -1 ? DEFAULT_AM_VCORES : vcores;
capability.setMemorySize(amMemory);
capability.setVirtualCores(amVCores);
} else {
amMemory = memory == -1 ? profiles.get(tmp).getMemorySize() : memory;
amVCores = vcores == -1 ? profiles.get(tmp).getVirtualCores() : vcores;
capability.setMemorySize(memory);
capability.setVirtualCores(vcores);
}
appContext.getAMContainerResourceRequest().getProfileCapability()
.setProfileCapabilityOverride(capability);
}
private void setContainerResources(long memory, int vcores,
Map<String, Resource> profiles) throws IllegalArgumentException {
if (memory < -1 || memory == 0) {
throw new IllegalArgumentException(
"Container memory '" + memory + "' has to be greated than 0");
}
if (vcores < -1 || vcores == 0) {
throw new IllegalArgumentException(
"Container vcores '" + vcores + "' has to be greated than 0");
}
if (profiles == null) {
containerMemory = memory == -1 ? DEFAULT_CONTAINER_MEMORY : memory;
containerVirtualCores = vcores == -1 ? DEFAULT_CONTAINER_VCORES : vcores;
}
}
}

View File

@ -1122,6 +1122,7 @@ public void testDSShellWithInvalidArgs() throws Exception {
"1"
};
client.init(args);
client.run();
Assert.fail("Exception is expected");
} catch (IllegalArgumentException e) {
Assert.assertTrue("The throw exception is not expected",
@ -1349,4 +1350,32 @@ private int verifyContainerLog(int containerNum,
}
return numOfWords;
}
@Test
public void testDistributedShellResourceProfiles() throws Exception {
String[][] args = {
{"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command",
Shell.WINDOWS ? "dir" : "ls", "--container_resource_profile",
"maximum" },
{"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command",
Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
"default" },
{"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command",
Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
"default", "--container_resource_profile", "maximum" }
};
for (int i = 0; i < args.length; ++i) {
LOG.info("Initializing DS Client");
Client client = new Client(new Configuration(yarnCluster.getConfig()));
Assert.assertTrue(client.init(args[i]));
LOG.info("Running DS Client");
try {
client.run();
Assert.fail("Client run should throw error");
} catch (Exception e) {
continue;
}
}
}
}

View File

@ -534,7 +534,7 @@ private void verifyMatches(
List<? extends Collection<ContainerRequest>> matches,
int matchSize) {
assertEquals(1, matches.size());
assertEquals(matches.get(0).size(), matchSize);
assertEquals(matchSize, matches.get(0).size());
}
@Test (timeout=60000)

View File

@ -141,6 +141,7 @@
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
import org.apache.hadoop.yarn.exceptions.ResourceProfilesNotEnabledException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -1803,7 +1804,8 @@ private Map<String, Resource> getResourceProfiles() throws YarnException {
.getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
if (!resourceProfilesEnabled) {
throw new YarnException("Resource profiles are not enabled");
throw new ResourceProfilesNotEnabledException(
"Resource profiles are not enabled");
}
return resourceProfilesManager.getResourceProfiles();
}

View File

@ -49,9 +49,9 @@ public class ResourceProfilesManagerImpl implements ResourceProfilesManager {
private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
private static final String VCORES = ResourceInformation.VCORES.getName();
private static final String DEFAULT_PROFILE = "default";
private static final String MINIMUM_PROFILE = "minimum";
private static final String MAXIMUM_PROFILE = "maximum";
public static final String DEFAULT_PROFILE = "default";
public static final String MINIMUM_PROFILE = "minimum";
public static final String MAXIMUM_PROFILE = "maximum";
private static final String[] MANDATORY_PROFILES =
{ DEFAULT_PROFILE, MINIMUM_PROFILE, MAXIMUM_PROFILE };

View File

@ -219,9 +219,15 @@ public Resource getMaxAllowedAllocation() {
return configuredMaxAllocation;
}
return Resources.createResource(
Math.min(configuredMaxAllocation.getMemorySize(), maxNodeMemory),
Math.min(configuredMaxAllocation.getVirtualCores(), maxNodeVCores));
Resource ret = Resources.clone(configuredMaxAllocation);
if (ret.getMemorySize() > maxNodeMemory) {
ret.setMemorySize(maxNodeMemory);
}
if (ret.getVirtualCores() > maxNodeVCores) {
ret.setVirtualCores(maxNodeVCores);
}
return ret;
} finally {
readLock.unlock();
}