YARN-4830. Add support for resource types in the nodemanager. Contributed by Varun Vasudev.
This commit is contained in:
parent
7ba698997b
commit
759114b006
@ -308,7 +308,8 @@ public String toString() {
|
||||
continue;
|
||||
}
|
||||
if (entry.getKey().equals(ResourceInformation.VCORES.getName())
|
||||
&& entry.getValue().getUnits().equals("")) {
|
||||
&& entry.getValue().getUnits()
|
||||
.equals(ResourceInformation.VCORES.getUnits())) {
|
||||
continue;
|
||||
}
|
||||
sb.append(", ").append(entry.getKey()).append(": ")
|
||||
|
@ -64,6 +64,10 @@ public class YarnConfiguration extends Configuration {
|
||||
public static final String RESOURCE_TYPES_CONFIGURATION_FILE =
|
||||
"resource-types.xml";
|
||||
|
||||
@Private
|
||||
public static final String NODE_RESOURCES_CONFIGURATION_FILE =
|
||||
"node-resources.xml";
|
||||
|
||||
@Private
|
||||
public static final List<String> RM_CONFIGURATION_FILES =
|
||||
Collections.unmodifiableList(Arrays.asList(
|
||||
@ -74,6 +78,16 @@ public class YarnConfiguration extends Configuration {
|
||||
YARN_SITE_CONFIGURATION_FILE,
|
||||
CORE_SITE_CONFIGURATION_FILE));
|
||||
|
||||
@Private
|
||||
public static final List<String> NM_CONFIGURATION_FILES =
|
||||
Collections.unmodifiableList(Arrays.asList(
|
||||
NODE_RESOURCES_CONFIGURATION_FILE,
|
||||
DR_CONFIGURATION_FILE,
|
||||
CS_CONFIGURATION_FILE,
|
||||
HADOOP_POLICY_CONFIGURATION_FILE,
|
||||
YARN_SITE_CONFIGURATION_FILE,
|
||||
CORE_SITE_CONFIGURATION_FILE));
|
||||
|
||||
@Evolving
|
||||
public static final int APPLICATION_MAX_TAGS = 10;
|
||||
|
||||
@ -112,12 +126,15 @@ private static void addDeprecatedKeys() {
|
||||
public static final String YARN_PREFIX = "yarn.";
|
||||
|
||||
/////////////////////////////
|
||||
// Scheduler resource types configs
|
||||
// Resource types configs
|
||||
////////////////////////////
|
||||
|
||||
public static final String RESOURCE_TYPES =
|
||||
YarnConfiguration.YARN_PREFIX + "resource-types";
|
||||
|
||||
public static final String NM_RESOURCES_PREFIX =
|
||||
YarnConfiguration.NM_PREFIX + "resource-type.";
|
||||
|
||||
/** Delay before deleting resource to ease debugging of NM issues */
|
||||
public static final String DEBUG_NM_DELETE_DELAY_SEC =
|
||||
YarnConfiguration.NM_PREFIX + "delete.debug-delay-sec";
|
||||
|
@ -51,7 +51,8 @@ public synchronized InputStream getConfigurationInputStream(
|
||||
"Illegal argument! The parameter should not be null or empty");
|
||||
}
|
||||
Path filePath;
|
||||
if (YarnConfiguration.RM_CONFIGURATION_FILES.contains(name)) {
|
||||
if (YarnConfiguration.RM_CONFIGURATION_FILES.contains(name) ||
|
||||
YarnConfiguration.NM_CONFIGURATION_FILES.contains(name)) {
|
||||
filePath = new Path(this.configDir, name);
|
||||
if (!fs.exists(filePath)) {
|
||||
LOG.info(filePath + " not found");
|
||||
|
@ -39,7 +39,8 @@ public InputStream getConfigurationInputStream(Configuration bootstrapConf,
|
||||
if (name == null || name.isEmpty()) {
|
||||
throw new YarnException(
|
||||
"Illegal argument! The parameter should not be null or empty");
|
||||
} else if (YarnConfiguration.RM_CONFIGURATION_FILES.contains(name)) {
|
||||
} else if (YarnConfiguration.RM_CONFIGURATION_FILES.contains(name) ||
|
||||
YarnConfiguration.NM_CONFIGURATION_FILES.contains(name)) {
|
||||
return bootstrapConf.getConfResourceAsInputStream(name);
|
||||
}
|
||||
return new FileInputStream(name);
|
||||
|
@ -18,7 +18,8 @@
|
||||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
@ -38,6 +39,8 @@
|
||||
@Unstable
|
||||
public class ResourcePBImpl extends Resource {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ResourcePBImpl.class);
|
||||
|
||||
ResourceProto proto = ResourceProto.getDefaultInstance();
|
||||
ResourceProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
@ -92,10 +95,12 @@ public int getMemory() {
|
||||
@Override
|
||||
public long getMemorySize() {
|
||||
// memory should always be present
|
||||
initResourcesMap();
|
||||
initResources();
|
||||
ResourceInformation ri =
|
||||
this.getResourceInformation(ResourceInformation.MEMORY_MB.getName());
|
||||
return UnitsConversionUtil.convert(ri.getUnits(), "Mi", ri.getValue());
|
||||
return UnitsConversionUtil
|
||||
.convert(ri.getUnits(), ResourceInformation.MEMORY_MB.getUnits(),
|
||||
ri.getValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -115,21 +120,16 @@ public void setMemorySize(long memory) {
|
||||
@Override
|
||||
public int getVirtualCores() {
|
||||
// vcores should always be present
|
||||
initResourcesMap();
|
||||
initResources();
|
||||
return this.getResourceValue(ResourceInformation.VCORES.getName())
|
||||
.intValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setVirtualCores(int vCores) {
|
||||
try {
|
||||
setResourceValue(ResourceInformation.VCORES.getName(),
|
||||
Long.valueOf(vCores));
|
||||
} catch (ResourceNotFoundException re) {
|
||||
this.setResourceInformation(ResourceInformation.VCORES.getName(),
|
||||
ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
|
||||
(long) vCores));
|
||||
}
|
||||
setResourceInformation(ResourceInformation.VCORES.getName(),
|
||||
ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
|
||||
ResourceInformation.VCORES.getUnits(), (long) vCores));
|
||||
}
|
||||
|
||||
private void initResources() {
|
||||
@ -146,14 +146,16 @@ private void initResources() {
|
||||
Long value = entry.hasValue() ? entry.getValue() : 0L;
|
||||
ResourceInformation ri =
|
||||
ResourceInformation.newInstance(entry.getKey(), units, value, type);
|
||||
resources.put(ri.getName(), ri);
|
||||
}
|
||||
if(this.getMemory() != p.getMemory()) {
|
||||
setMemorySize(p.getMemory());
|
||||
}
|
||||
if(this.getVirtualCores() != p.getVirtualCores()) {
|
||||
setVirtualCores(p.getVirtualCores());
|
||||
if (resources.containsKey(ri.getName())) {
|
||||
resources.get(ri.getName()).setResourceType(ri.getResourceType());
|
||||
resources.get(ri.getName()).setUnits(ri.getUnits());
|
||||
resources.get(ri.getName()).setValue(value);
|
||||
} else {
|
||||
LOG.warn("Got unknown resource type: " + ri.getName() + "; skipping");
|
||||
}
|
||||
}
|
||||
this.setMemorySize(p.getMemory());
|
||||
this.setVirtualCores(p.getVirtualCores());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -167,7 +169,7 @@ public void setResourceInformation(String resource,
|
||||
if (!resource.equals(resourceInformation.getName())) {
|
||||
resourceInformation.setName(resource);
|
||||
}
|
||||
initResourcesMap();
|
||||
initResources();
|
||||
resources.put(resource, resourceInformation);
|
||||
}
|
||||
|
||||
@ -175,6 +177,7 @@ public void setResourceInformation(String resource,
|
||||
public void setResourceValue(String resource, Long value)
|
||||
throws ResourceNotFoundException {
|
||||
maybeInitBuilder();
|
||||
initResources();
|
||||
if (resource == null) {
|
||||
throw new IllegalArgumentException("resource type object cannot be null");
|
||||
}
|
||||
@ -182,9 +185,7 @@ public void setResourceValue(String resource, Long value)
|
||||
throw new ResourceNotFoundException(
|
||||
"Resource " + resource + " not found");
|
||||
}
|
||||
ResourceInformation ri = resources.get(resource);
|
||||
ri.setValue(value);
|
||||
resources.put(resource, ri);
|
||||
resources.get(resource).setValue(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -229,8 +230,10 @@ private void initResourcesMap() {
|
||||
synchronized private void mergeLocalToBuilder() {
|
||||
builder.clearResourceValueMap();
|
||||
if (resources != null && !resources.isEmpty()) {
|
||||
for (Map.Entry<String, ResourceInformation> entry : resources.entrySet()) {
|
||||
ResourceInformationProto.Builder e = ResourceInformationProto.newBuilder();
|
||||
for (Map.Entry<String, ResourceInformation> entry :
|
||||
resources.entrySet()) {
|
||||
ResourceInformationProto.Builder e =
|
||||
ResourceInformationProto.newBuilder();
|
||||
e.setKey(entry.getKey());
|
||||
e.setUnits(entry.getValue().getUnits());
|
||||
e.setType(
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.yarn.util.resource;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
@ -51,15 +52,21 @@ public class ResourceUtils {
|
||||
public static final String UNITS = ".units";
|
||||
public static final String TYPE = ".type";
|
||||
|
||||
private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
|
||||
private static final String VCORES = ResourceInformation.VCORES.getName();
|
||||
|
||||
private static final Set<String> DISALLOWED_NAMES = new HashSet<>();
|
||||
static {
|
||||
DISALLOWED_NAMES.add("memory");
|
||||
DISALLOWED_NAMES.add(ResourceInformation.MEMORY_MB.getName());
|
||||
DISALLOWED_NAMES.add(ResourceInformation.VCORES.getName());
|
||||
DISALLOWED_NAMES.add(MEMORY);
|
||||
DISALLOWED_NAMES.add(VCORES);
|
||||
}
|
||||
|
||||
private static volatile Object lock;
|
||||
private static Map<String, ResourceInformation> readOnlyResources;
|
||||
private static volatile Object nodeLock;
|
||||
private static Map<String, ResourceInformation> readOnlyNodeResources;
|
||||
|
||||
|
||||
static final Log LOG = LogFactory.getLog(ResourceUtils.class);
|
||||
|
||||
@ -69,22 +76,20 @@ private ResourceUtils() {
|
||||
private static void checkMandatatoryResources(
|
||||
Map<String, ResourceInformation> resourceInformationMap)
|
||||
throws YarnRuntimeException {
|
||||
String memory = ResourceInformation.MEMORY_MB.getName();
|
||||
String vcores = ResourceInformation.VCORES.getName();
|
||||
if (resourceInformationMap.containsKey(memory)) {
|
||||
ResourceInformation memInfo = resourceInformationMap.get(memory);
|
||||
if (resourceInformationMap.containsKey(MEMORY)) {
|
||||
ResourceInformation memInfo = resourceInformationMap.get(MEMORY);
|
||||
String memUnits = ResourceInformation.MEMORY_MB.getUnits();
|
||||
ResourceTypes memType = ResourceInformation.MEMORY_MB.getResourceType();
|
||||
if (!memInfo.getUnits().equals(memUnits) || !memInfo.getResourceType()
|
||||
.equals(memType)) {
|
||||
throw new YarnRuntimeException(
|
||||
"Attempt to re-define mandatory resource 'memory-mb'. It can only"
|
||||
+ " be of type 'COUNTABLE' and have units 'M'.");
|
||||
+ " be of type 'COUNTABLE' and have units 'Mi'.");
|
||||
}
|
||||
}
|
||||
|
||||
if (resourceInformationMap.containsKey(vcores)) {
|
||||
ResourceInformation vcoreInfo = resourceInformationMap.get(vcores);
|
||||
if (resourceInformationMap.containsKey(VCORES)) {
|
||||
ResourceInformation vcoreInfo = resourceInformationMap.get(VCORES);
|
||||
String vcoreUnits = ResourceInformation.VCORES.getUnits();
|
||||
ResourceTypes vcoreType = ResourceInformation.VCORES.getResourceType();
|
||||
if (!vcoreInfo.getUnits().equals(vcoreUnits) || !vcoreInfo
|
||||
@ -99,21 +104,21 @@ private static void checkMandatatoryResources(
|
||||
private static void addManadtoryResources(
|
||||
Map<String, ResourceInformation> res) {
|
||||
ResourceInformation ri;
|
||||
if (!res.containsKey(ResourceInformation.MEMORY_MB.getName())) {
|
||||
LOG.info("Adding resource type - name = " + ResourceInformation.MEMORY_MB
|
||||
.getName() + ", units = " + ResourceInformation.MEMORY_MB.getUnits()
|
||||
+ ", type = " + ResourceTypes.COUNTABLE);
|
||||
if (!res.containsKey(MEMORY)) {
|
||||
LOG.info("Adding resource type - name = " + MEMORY + ", units = "
|
||||
+ ResourceInformation.MEMORY_MB.getUnits() + ", type = "
|
||||
+ ResourceTypes.COUNTABLE);
|
||||
ri = ResourceInformation
|
||||
.newInstance(ResourceInformation.MEMORY_MB.getName(),
|
||||
.newInstance(MEMORY,
|
||||
ResourceInformation.MEMORY_MB.getUnits());
|
||||
res.put(ResourceInformation.MEMORY_MB.getName(), ri);
|
||||
res.put(MEMORY, ri);
|
||||
}
|
||||
if (!res.containsKey(ResourceInformation.VCORES.getName())) {
|
||||
LOG.info("Adding resource type - name = " + ResourceInformation.VCORES
|
||||
.getName() + ", units = , type = " + ResourceTypes.COUNTABLE);
|
||||
if (!res.containsKey(VCORES)) {
|
||||
LOG.info("Adding resource type - name = " + VCORES + ", units = , type = "
|
||||
+ ResourceTypes.COUNTABLE);
|
||||
ri =
|
||||
ResourceInformation.newInstance(ResourceInformation.VCORES.getName());
|
||||
res.put(ResourceInformation.VCORES.getName(), ri);
|
||||
ResourceInformation.newInstance(VCORES);
|
||||
res.put(VCORES, ri);
|
||||
}
|
||||
}
|
||||
|
||||
@ -122,6 +127,7 @@ static void initializeResourcesMap(Configuration conf,
|
||||
Map<String, ResourceInformation> resourceInformationMap) {
|
||||
|
||||
String[] resourceNames = conf.getStrings(YarnConfiguration.RESOURCE_TYPES);
|
||||
|
||||
if (resourceNames != null && resourceNames.length != 0) {
|
||||
for (String resourceName : resourceNames) {
|
||||
String resourceUnits = conf.get(
|
||||
@ -178,25 +184,13 @@ private static Map<String, ResourceInformation> getResourceTypes(
|
||||
conf = new YarnConfiguration();
|
||||
}
|
||||
try {
|
||||
InputStream ris = getConfInputStream(resourceFile, conf);
|
||||
addResourcesFileToConf(resourceFile, conf);
|
||||
LOG.debug("Found " + resourceFile + ", adding to configuration");
|
||||
conf.addResource(ris);
|
||||
initializeResourcesMap(conf, resources);
|
||||
return resources;
|
||||
} catch (FileNotFoundException fe) {
|
||||
LOG.info("Unable to find '" + resourceFile
|
||||
+ "'. Falling back to memory and vcores as resources", fe);
|
||||
initializeResourcesMap(conf, resources);
|
||||
} catch (IOException ie) {
|
||||
LOG.fatal(
|
||||
"Exception trying to read resource types configuration '"
|
||||
+ resourceFile + "'.", ie);
|
||||
throw new YarnRuntimeException(ie);
|
||||
} catch (YarnException ye) {
|
||||
LOG.fatal(
|
||||
"YARN Exception trying to read resource types configuration '"
|
||||
+ resourceFile + "'.", ye);
|
||||
throw new YarnRuntimeException(ye);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -205,8 +199,8 @@ private static Map<String, ResourceInformation> getResourceTypes(
|
||||
return readOnlyResources;
|
||||
}
|
||||
|
||||
static InputStream getConfInputStream(String resourceFile, Configuration conf)
|
||||
throws IOException, YarnException {
|
||||
private static InputStream getConfInputStream(String resourceFile,
|
||||
Configuration conf) throws IOException, YarnException {
|
||||
|
||||
ConfigurationProvider provider =
|
||||
ConfigurationProviderFactory.getConfigurationProvider(conf);
|
||||
@ -222,8 +216,112 @@ static InputStream getConfInputStream(String resourceFile, Configuration conf)
|
||||
return ris;
|
||||
}
|
||||
|
||||
private static void addResourcesFileToConf(String resourceFile,
|
||||
Configuration conf) throws FileNotFoundException {
|
||||
try {
|
||||
InputStream ris = getConfInputStream(resourceFile, conf);
|
||||
LOG.debug("Found " + resourceFile + ", adding to configuration");
|
||||
conf.addResource(ris);
|
||||
} catch (FileNotFoundException fe) {
|
||||
throw fe;
|
||||
} catch (IOException ie) {
|
||||
LOG.fatal("Exception trying to read resource types configuration '"
|
||||
+ resourceFile + "'.", ie);
|
||||
throw new YarnRuntimeException(ie);
|
||||
} catch (YarnException ye) {
|
||||
LOG.fatal("YARN Exception trying to read resource types configuration '"
|
||||
+ resourceFile + "'.", ye);
|
||||
throw new YarnRuntimeException(ye);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static void resetResourceTypes() {
|
||||
lock = null;
|
||||
}
|
||||
|
||||
private static String getUnits(String resourceValue) {
|
||||
String units;
|
||||
for (int i = 0; i < resourceValue.length(); i++) {
|
||||
if (Character.isAlphabetic(resourceValue.charAt(i))) {
|
||||
units = resourceValue.substring(i);
|
||||
if (StringUtils.isAlpha(units)) {
|
||||
return units;
|
||||
}
|
||||
}
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
/**
|
||||
* Function to get the resources for a node. This function will look at the
|
||||
* file {@link YarnConfiguration#NODE_RESOURCES_CONFIGURATION_FILE} to
|
||||
* determine the node resources.
|
||||
*
|
||||
* @param conf configuration file
|
||||
* @return a map to resource name to the ResourceInformation object. The map
|
||||
* is guaranteed to have entries for memory and vcores
|
||||
*/
|
||||
public static Map<String, ResourceInformation> getNodeResourceInformation(
|
||||
Configuration conf) {
|
||||
if (nodeLock == null) {
|
||||
synchronized (ResourceUtils.class) {
|
||||
if (nodeLock == null) {
|
||||
synchronized (ResourceUtils.class) {
|
||||
nodeLock = new Object();
|
||||
Map<String, ResourceInformation> nodeResources =
|
||||
initializeNodeResourceInformation(conf);
|
||||
addManadtoryResources(nodeResources);
|
||||
checkMandatatoryResources(nodeResources);
|
||||
readOnlyNodeResources = Collections.unmodifiableMap(nodeResources);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return readOnlyNodeResources;
|
||||
}
|
||||
|
||||
private static Map<String, ResourceInformation>
|
||||
initializeNodeResourceInformation(Configuration conf) {
|
||||
Map<String, ResourceInformation> nodeResources = new HashMap<>();
|
||||
try {
|
||||
addResourcesFileToConf(
|
||||
YarnConfiguration.NODE_RESOURCES_CONFIGURATION_FILE, conf);
|
||||
for (Map.Entry<String, String> entry : conf) {
|
||||
String key = entry.getKey();
|
||||
String value = entry.getValue();
|
||||
if (key.startsWith(YarnConfiguration.NM_RESOURCES_PREFIX)) {
|
||||
addResourceInformation(key, value, nodeResources);
|
||||
}
|
||||
}
|
||||
} catch (FileNotFoundException fe) {
|
||||
LOG.info("Couldn't find node resources file");
|
||||
}
|
||||
return nodeResources;
|
||||
}
|
||||
|
||||
private static void addResourceInformation(String prop, String value,
|
||||
Map<String, ResourceInformation> nodeResources) {
|
||||
String[] parts = prop.split("\\.");
|
||||
LOG.info("Found resource entry " + prop);
|
||||
if (parts.length == 4) {
|
||||
String resourceType = parts[3];
|
||||
if (!nodeResources.containsKey(resourceType)) {
|
||||
nodeResources
|
||||
.put(resourceType, ResourceInformation.newInstance(resourceType));
|
||||
}
|
||||
String units = getUnits(value);
|
||||
Long resourceValue =
|
||||
Long.valueOf(value.substring(0, value.length() - units.length()));
|
||||
nodeResources.get(resourceType).setValue(resourceValue);
|
||||
nodeResources.get(resourceType).setUnits(units);
|
||||
LOG.debug("Setting value for resource type " + resourceType + " to "
|
||||
+ resourceValue + " with units " + units);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
synchronized public static void resetNodeResources() {
|
||||
nodeLock = null;
|
||||
}
|
||||
}
|
||||
|
@ -21,9 +21,9 @@
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -245,4 +245,31 @@ public void testInitializeResourcesMapErrors() throws Exception {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetResourceInformation() throws Exception {
|
||||
|
||||
Configuration conf = new YarnConfiguration();
|
||||
Map<String, Resource> testRun = new HashMap<>();
|
||||
// testRun.put("node-resources-1.xml", Resource.newInstance(1024, 1));
|
||||
Resource test3Resources = Resource.newInstance(1024, 1);
|
||||
test3Resources.setResourceInformation("resource1",
|
||||
ResourceInformation.newInstance("resource1", "Gi", 5L));
|
||||
test3Resources.setResourceInformation("resource2",
|
||||
ResourceInformation.newInstance("resource2", "m", 2L));
|
||||
testRun.put("node-resources-2.xml", test3Resources);
|
||||
|
||||
for (Map.Entry<String, Resource> entry : testRun.entrySet()) {
|
||||
String resourceFile = entry.getKey();
|
||||
ResourceUtils.resetNodeResources();
|
||||
File dest;
|
||||
File source =
|
||||
new File(conf.getClassLoader().getResource(resourceFile).getFile());
|
||||
dest = new File(source.getParent(), "node-resources.xml");
|
||||
FileUtils.copyFile(source, dest);
|
||||
Map<String, ResourceInformation> actual =
|
||||
ResourceUtils.getNodeResourceInformation(conf);
|
||||
Assert.assertEquals(entry.getValue().getResources(), actual);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,29 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<!--
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License. See accompanying LICENSE file.
|
||||
-->
|
||||
|
||||
<configuration>
|
||||
|
||||
<property>
|
||||
<name>yarn.nodemanager.resource.memory-mb</name>
|
||||
<value>1024</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.nodemanager.resource.vcores</name>
|
||||
<value>1</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
@ -0,0 +1,39 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<!--
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License. See accompanying LICENSE file.
|
||||
-->
|
||||
|
||||
<configuration>
|
||||
|
||||
<property>
|
||||
<name>yarn.nodemanager.resource-type.memory-mb</name>
|
||||
<value>1024Mi</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.nodemanager.resource-type.vcores</name>
|
||||
<value>1</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.nodemanager.resource-type.resource1</name>
|
||||
<value>5Gi</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.nodemanager.resource-type.resource2</name>
|
||||
<value>2m</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
@ -174,18 +174,19 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
int memoryMb = NodeManagerHardwareUtils.getContainerMemoryMB(conf);
|
||||
this.totalResource = NodeManagerHardwareUtils.getNodeResources(conf);
|
||||
int memoryMb = totalResource.getMemory();
|
||||
float vMemToPMem =
|
||||
conf.getFloat(
|
||||
YarnConfiguration.NM_VMEM_PMEM_RATIO,
|
||||
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
|
||||
int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem);
|
||||
|
||||
int virtualCores = NodeManagerHardwareUtils.getVCores(conf);
|
||||
int virtualCores = totalResource.getVirtualCores();
|
||||
LOG.info("Nodemanager resources: memory set to " + memoryMb + "MB.");
|
||||
LOG.info("Nodemanager resources: vcores set to " + virtualCores + ".");
|
||||
LOG.info("Nodemanager resources: " + totalResource);
|
||||
|
||||
this.totalResource = Resource.newInstance(memoryMb, virtualCores);
|
||||
metrics.addResource(totalResource);
|
||||
|
||||
// Get actual node physical resources
|
||||
|
@ -21,10 +21,16 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Helper class to determine hardware related characteristics such as the
|
||||
@ -332,4 +338,50 @@ private static int getContainerMemoryMBInternal(ResourceCalculatorPlugin plugin,
|
||||
}
|
||||
return memoryMb;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the resources for the node.
|
||||
* @param configuration configuration file
|
||||
* @return the resources for the node
|
||||
*/
|
||||
public static Resource getNodeResources(Configuration configuration) {
|
||||
Configuration conf = new Configuration(configuration);
|
||||
String memory = ResourceInformation.MEMORY_MB.getName();
|
||||
String vcores = ResourceInformation.VCORES.getName();
|
||||
|
||||
Resource ret = Resource.newInstance(0, 0);
|
||||
Map<String, ResourceInformation> resourceInformation =
|
||||
ResourceUtils.getNodeResourceInformation(conf);
|
||||
for (Map.Entry<String, ResourceInformation> entry : resourceInformation
|
||||
.entrySet()) {
|
||||
ret.setResourceInformation(entry.getKey(), entry.getValue());
|
||||
LOG.debug("Setting key " + entry.getKey() + " to " + entry.getValue());
|
||||
}
|
||||
if (resourceInformation.containsKey(memory)) {
|
||||
Long value = resourceInformation.get(memory).getValue();
|
||||
if (value > Integer.MAX_VALUE) {
|
||||
throw new YarnRuntimeException("Value '" + value
|
||||
+ "' for resource memory is more than the maximum for an integer.");
|
||||
}
|
||||
ResourceInformation memResInfo = resourceInformation.get(memory);
|
||||
if(memResInfo.getValue() == 0) {
|
||||
ret.setMemory(getContainerMemoryMB(conf));
|
||||
LOG.debug("Set memory to " + ret.getMemory());
|
||||
}
|
||||
}
|
||||
if (resourceInformation.containsKey(vcores)) {
|
||||
Long value = resourceInformation.get(vcores).getValue();
|
||||
if (value > Integer.MAX_VALUE) {
|
||||
throw new YarnRuntimeException("Value '" + value
|
||||
+ "' for resource vcores is more than the maximum for an integer.");
|
||||
}
|
||||
ResourceInformation vcoresResInfo = resourceInformation.get(vcores);
|
||||
if(vcoresResInfo.getValue() == 0) {
|
||||
ret.setVirtualCores(getVCores(conf));
|
||||
LOG.debug("Set vcores to " + ret.getVirtualCores());
|
||||
}
|
||||
}
|
||||
LOG.debug("Node resource information map is " + ret);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
@ -376,10 +376,11 @@ public RegisterNodeManagerResponse registerNodeManager(
|
||||
// Check if this node has minimum allocations
|
||||
if (capability.getMemorySize() < minAllocMb
|
||||
|| capability.getVirtualCores() < minAllocVcores) {
|
||||
String message =
|
||||
"NodeManager from " + host
|
||||
+ " doesn't satisfy minimum allocations, Sending SHUTDOWN"
|
||||
+ " signal to the NodeManager.";
|
||||
String message = "NodeManager from " + host
|
||||
+ " doesn't satisfy minimum allocations, Sending SHUTDOWN"
|
||||
+ " signal to the NodeManager. Node capabilities are " + capability
|
||||
+ "; minimums are " + minAllocMb + "mb and " + minAllocVcores
|
||||
+ " vcores";
|
||||
LOG.info(message);
|
||||
response.setDiagnosticsMessage(message);
|
||||
response.setNodeAction(NodeAction.SHUTDOWN);
|
||||
|
Loading…
Reference in New Issue
Block a user