YARN-160. Enhanced NodeManager to automatically obtain cpu/memory values from underlying OS when configured to do so. Contributed by Varun Vasudev.

This commit is contained in:
Vinod Kumar Vavilapalli 2015-05-26 11:38:35 -07:00
parent 022f49d59e
commit 500a1d9c76
16 changed files with 696 additions and 107 deletions

View File

@ -88,6 +88,12 @@ public int getNumProcessors() {
return getConf().getInt(NUM_PROCESSORS, -1);
}
/** {@inheritDoc} */
@Override
public int getNumCores() {
return getNumProcessors();
}
/** {@inheritDoc} */
@Override
public long getCpuFrequency() {

View File

@ -120,6 +120,9 @@ Release 2.8.0 - UNRELEASED
YARN-3541. Add version info on timeline service / generic history web UI
and REST API. (Zhijie Shen via xgong)
YARN-160. Enhanced NodeManager to automatically obtain cpu/memory values from
underlying OS when configured to do so. (Varun Vasudev via vinodkv)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -804,10 +804,14 @@ private static void addDeprecatedKeys() {
public static final String YARN_TRACKING_URL_GENERATOR =
YARN_PREFIX + "tracking.url.generator";
/** Amount of memory in GB that can be allocated for containers.*/
/** Amount of memory in MB that can be allocated for containers.*/
public static final String NM_PMEM_MB = NM_PREFIX + "resource.memory-mb";
public static final int DEFAULT_NM_PMEM_MB = 8 * 1024;
/** Amount of memory in MB that has been reserved for non-yarn use. */
public static final String NM_SYSTEM_RESERVED_PMEM_MB = NM_PREFIX
+ "resource.system-reserved-memory-mb";
/** Specifies whether physical memory check is enabled. */
public static final String NM_PMEM_CHECK_ENABLED = NM_PREFIX
+ "pmem-check-enabled";
@ -827,12 +831,29 @@ private static void addDeprecatedKeys() {
public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores";
public static final int DEFAULT_NM_VCORES = 8;
/** Count logical processors(like hyperthreads) as cores. */
public static final String NM_COUNT_LOGICAL_PROCESSORS_AS_CORES = NM_PREFIX
+ "resource.count-logical-processors-as-cores";
public static final boolean DEFAULT_NM_COUNT_LOGICAL_PROCESSORS_AS_CORES =
false;
/** Multiplier to convert physical cores to vcores. */
public static final String NM_PCORES_VCORES_MULTIPLIER = NM_PREFIX
+ "resource.pcores-vcores-multiplier";
public static final float DEFAULT_NM_PCORES_VCORES_MULTIPLIER = 1.0f;
/** Percentage of overall CPU which can be allocated for containers. */
public static final String NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
NM_PREFIX + "resource.percentage-physical-cpu-limit";
public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
100;
/** Enable or disable node hardware capability detection. */
public static final String NM_ENABLE_HARDWARE_CAPABILITY_DETECTION =
NM_PREFIX + "resource.detect-hardware-capabilities";
public static final boolean DEFAULT_NM_ENABLE_HARDWARE_CAPABILITY_DETECTION =
false;
/**
* Prefix for disk configurations. Work in progress: This configuration
* parameter may be changed/removed in the future.

View File

@ -25,9 +25,11 @@
import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -58,16 +60,20 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
private static final String INACTIVE_STRING = "Inactive";
/**
* Patterns for parsing /proc/cpuinfo
* Patterns for parsing /proc/cpuinfo.
*/
private static final String PROCFS_CPUINFO = "/proc/cpuinfo";
private static final Pattern PROCESSOR_FORMAT =
Pattern.compile("^processor[ \t]:[ \t]*([0-9]*)");
private static final Pattern FREQUENCY_FORMAT =
Pattern.compile("^cpu MHz[ \t]*:[ \t]*([0-9.]*)");
private static final Pattern PHYSICAL_ID_FORMAT =
Pattern.compile("^physical id[ \t]*:[ \t]*([0-9]*)");
private static final Pattern CORE_ID_FORMAT =
Pattern.compile("^core id[ \t]*:[ \t]*([0-9]*)");
/**
* Pattern for parsing /proc/stat
* Pattern for parsing /proc/stat.
*/
private static final String PROCFS_STAT = "/proc/stat";
private static final Pattern CPU_TIME_FORMAT =
@ -78,21 +84,24 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
private String procfsMemFile;
private String procfsCpuFile;
private String procfsStatFile;
long jiffyLengthInMillis;
private long jiffyLengthInMillis;
private long ramSize = 0;
private long swapSize = 0;
private long ramSizeFree = 0; // free ram space on the machine (kB)
private long swapSizeFree = 0; // free swap space on the machine (kB)
private long inactiveSize = 0; // inactive cache memory (kB)
private int numProcessors = 0; // number of processors on the system
/* number of logical processors on the system. */
private int numProcessors = 0;
/* number of physical cores on the system. */
private int numCores = 0;
private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
boolean readMemInfoFile = false;
boolean readCpuInfoFile = false;
private boolean readMemInfoFile = false;
private boolean readCpuInfoFile = false;
/**
* Get current time
* Get current time.
* @return Unix time stamp in millisecond
*/
long getCurrentTime() {
@ -106,7 +115,7 @@ public LinuxResourceCalculatorPlugin() {
/**
* Constructor which allows assigning the /proc/ directories. This will be
* used only in unit tests
* used only in unit tests.
* @param procfsMemFile fake file for /proc/meminfo
* @param procfsCpuFile fake file for /proc/cpuinfo
* @param procfsStatFile fake file for /proc/stat
@ -124,14 +133,14 @@ public LinuxResourceCalculatorPlugin(String procfsMemFile,
}
/**
* Read /proc/meminfo, parse and compute memory information only once
* Read /proc/meminfo, parse and compute memory information only once.
*/
private void readProcMemInfoFile() {
readProcMemInfoFile(false);
}
/**
* Read /proc/meminfo, parse and compute memory information
* Read /proc/meminfo, parse and compute memory information.
* @param readAgain if false, read only on the first time
*/
private void readProcMemInfoFile(boolean readAgain) {
@ -141,18 +150,20 @@ private void readProcMemInfoFile(boolean readAgain) {
}
// Read "/proc/memInfo" file
BufferedReader in = null;
InputStreamReader fReader = null;
BufferedReader in;
InputStreamReader fReader;
try {
fReader = new InputStreamReader(
new FileInputStream(procfsMemFile), Charset.forName("UTF-8"));
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// shouldn't happen....
LOG.warn("Couldn't read " + procfsMemFile
+ "; can't determine memory settings");
return;
}
Matcher mat = null;
Matcher mat;
try {
String str = in.readLine();
@ -193,27 +204,31 @@ private void readProcMemInfoFile(boolean readAgain) {
}
/**
* Read /proc/cpuinfo, parse and calculate CPU information
* Read /proc/cpuinfo, parse and calculate CPU information.
*/
private void readProcCpuInfoFile() {
// This directory needs to be read only once
if (readCpuInfoFile) {
return;
}
HashSet<String> coreIdSet = new HashSet<>();
// Read "/proc/cpuinfo" file
BufferedReader in = null;
InputStreamReader fReader = null;
BufferedReader in;
InputStreamReader fReader;
try {
fReader = new InputStreamReader(
new FileInputStream(procfsCpuFile), Charset.forName("UTF-8"));
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// shouldn't happen....
LOG.warn("Couldn't read " + procfsCpuFile + "; can't determine cpu info");
return;
}
Matcher mat = null;
Matcher mat;
try {
numProcessors = 0;
numCores = 1;
String currentPhysicalId = "";
String str = in.readLine();
while (str != null) {
mat = PROCESSOR_FORMAT.matcher(str);
@ -224,6 +239,15 @@ private void readProcCpuInfoFile() {
if (mat.find()) {
cpuFrequency = (long)(Double.parseDouble(mat.group(1)) * 1000); // kHz
}
mat = PHYSICAL_ID_FORMAT.matcher(str);
if (mat.find()) {
currentPhysicalId = str;
}
mat = CORE_ID_FORMAT.matcher(str);
if (mat.find()) {
coreIdSet.add(currentPhysicalId + " " + str);
numCores = coreIdSet.size();
}
str = in.readLine();
}
} catch (IOException io) {
@ -245,12 +269,12 @@ private void readProcCpuInfoFile() {
}
/**
* Read /proc/stat file, parse and calculate cumulative CPU
* Read /proc/stat file, parse and calculate cumulative CPU.
*/
private void readProcStatFile() {
// Read "/proc/stat" file
BufferedReader in = null;
InputStreamReader fReader = null;
BufferedReader in;
InputStreamReader fReader;
try {
fReader = new InputStreamReader(
new FileInputStream(procfsStatFile), Charset.forName("UTF-8"));
@ -260,7 +284,7 @@ private void readProcStatFile() {
return;
}
Matcher mat = null;
Matcher mat;
try {
String str = in.readLine();
while (str != null) {
@ -328,6 +352,13 @@ public int getNumProcessors() {
return numProcessors;
}
/** {@inheritDoc} */
@Override
public int getNumCores() {
readProcCpuInfoFile();
return numCores;
}
/** {@inheritDoc} */
@Override
public long getCpuFrequency() {
@ -354,9 +385,9 @@ public float getCpuUsage() {
}
/**
* Test the {@link LinuxResourceCalculatorPlugin}
* Test the {@link LinuxResourceCalculatorPlugin}.
*
* @param args
* @param args - arguments to this calculator test
*/
public static void main(String[] args) {
LinuxResourceCalculatorPlugin plugin = new LinuxResourceCalculatorPlugin();
@ -380,4 +411,13 @@ public static void main(String[] args) {
}
System.out.println("CPU usage % : " + plugin.getCpuUsage());
}
@VisibleForTesting
void setReadCpuInfoFile(boolean readCpuInfoFileValue) {
this.readCpuInfoFile = readCpuInfoFileValue;
}
public long getJiffyLengthInMillis() {
return this.jiffyLengthInMillis;
}
}

View File

@ -64,12 +64,19 @@ public abstract class ResourceCalculatorPlugin extends Configured {
public abstract long getAvailablePhysicalMemorySize();
/**
* Obtain the total number of processors present on the system.
* Obtain the total number of logical processors present on the system.
*
* @return number of processors
* @return number of logical processors
*/
public abstract int getNumProcessors();
/**
* Obtain total number of physical cores present on the system.
*
* @return number of physical cores
*/
public abstract int getNumCores();
/**
* Obtain the CPU frequency of on the system.
*

View File

@ -147,6 +147,12 @@ public int getNumProcessors() {
return numProcessors;
}
/** {@inheritDoc} */
@Override
public int getNumCores() {
return getNumProcessors();
}
/** {@inheritDoc} */
@Override
public long getCpuFrequency() {

View File

@ -890,9 +890,25 @@
<property>
<description>Amount of physical memory, in MB, that can be allocated
for containers.</description>
for containers. If set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
automatically calculated(in case of Windows and Linux).
In other cases, the default is 8192MB.
</description>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>8192</value>
<value>-1</value>
</property>
<property>
<description>Amount of physical memory, in MB, that is reserved
for non-YARN processes. This configuration is only used if
yarn.nodemanager.resource.detect-hardware-capabilities is set
to true and yarn.nodemanager.resource.memory-mb is -1. If set
to -1, this amount is calculated as
20% of (system memory - 2*HADOOP_HEAPSIZE)
</description>
<name>yarn.nodemanager.resource.system-reserved-memory-mb</name>
<value>-1</value>
</property>
<property>
@ -923,9 +939,34 @@
<description>Number of vcores that can be allocated
for containers. This is used by the RM scheduler when allocating
resources for containers. This is not used to limit the number of
physical cores used by YARN containers.</description>
CPUs used by YARN containers. If it is set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
automatically determined from the hardware in case of Windows and Linux.
In other cases, number of vcores is 8 by default.</description>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>8</value>
<value>-1</value>
</property>
<property>
<description>Flag to determine if logical processors(such as
hyperthreads) should be counted as cores. Only applicable on Linux
when yarn.nodemanager.resource.cpu-vcores is set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true.
</description>
<name>yarn.nodemanager.resource.count-logical-processors-as-cores</name>
<value>false</value>
</property>
<property>
<description>Multiplier to determine how to convert phyiscal cores to
vcores. This value is used if yarn.nodemanager.resource.cpu-vcores
is set to -1(which implies auto-calculate vcores) and
yarn.nodemanager.resource.detect-hardware-capabilities is set to true. The
number of vcores will be calculated as
number of CPUs * multiplier.
</description>
<name>yarn.nodemanager.resource.pcores-vcores-multiplier</name>
<value>1.0</value>
</property>
<property>
@ -938,6 +979,14 @@
<value>100</value>
</property>
<property>
<description>Enable auto-detection of node capabilities such as
memory and CPU.
</description>
<name>yarn.nodemanager.resource.detect-hardware-capabilities</name>
<value>false</value>
</property>
<property>
<description>NM Webapp address.</description>
<name>yarn.nodemanager.webapp.address</name>

View File

@ -21,11 +21,13 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
@ -51,7 +53,7 @@ long getCurrentTime() {
return currentTime;
}
public void advanceTime(long adv) {
currentTime += adv * jiffyLengthInMillis;
currentTime += adv * this.getJiffyLengthInMillis();
}
}
private static final FakeLinuxResourceCalculatorPlugin plugin;
@ -109,9 +111,9 @@ public void advanceTime(long adv) {
"stepping : 2\n" +
"cpu MHz : %f\n" +
"cache size : 1024 KB\n" +
"physical id : 0\n" +
"physical id : %s\n" +
"siblings : 2\n" +
"core id : 0\n" +
"core id : %s\n" +
"cpu cores : 2\n" +
"fpu : yes\n" +
"fpu_exception : yes\n" +
@ -151,8 +153,9 @@ public void parsingProcStatAndCpuFile() throws IOException {
long cpuFrequencyKHz = 2392781;
String fileContent = "";
for (int i = 0; i < numProcessors; i++) {
fileContent += String.format(CPUINFO_FORMAT, i, cpuFrequencyKHz / 1000D) +
"\n";
fileContent +=
String.format(CPUINFO_FORMAT, i, cpuFrequencyKHz / 1000D, 0, 0)
+ "\n";
}
File tempFile = new File(FAKE_CPUFILE);
tempFile.deleteOnExit();
@ -232,4 +235,90 @@ public void parsingProcMemFile() throws IOException {
assertEquals(plugin.getPhysicalMemorySize(), 1024L * memTotal);
assertEquals(plugin.getVirtualMemorySize(), 1024L * (memTotal + swapTotal));
}
@Test
public void testCoreCounts() throws IOException {
String fileContent = "";
// single core, hyper threading
long numProcessors = 2;
long cpuFrequencyKHz = 2392781;
for (int i = 0; i < numProcessors; i++) {
fileContent =
fileContent.concat(String.format(CPUINFO_FORMAT, i,
cpuFrequencyKHz / 1000D, 0, 0));
fileContent = fileContent.concat("\n");
}
writeFakeCPUInfoFile(fileContent);
plugin.setReadCpuInfoFile(false);
assertEquals(numProcessors, plugin.getNumProcessors());
assertEquals(1, plugin.getNumCores());
// single socket quad core, no hyper threading
fileContent = "";
numProcessors = 4;
for (int i = 0; i < numProcessors; i++) {
fileContent =
fileContent.concat(String.format(CPUINFO_FORMAT, i,
cpuFrequencyKHz / 1000D, 0, i));
fileContent = fileContent.concat("\n");
}
writeFakeCPUInfoFile(fileContent);
plugin.setReadCpuInfoFile(false);
assertEquals(numProcessors, plugin.getNumProcessors());
assertEquals(4, plugin.getNumCores());
// dual socket single core, hyper threading
fileContent = "";
numProcessors = 4;
for (int i = 0; i < numProcessors; i++) {
fileContent =
fileContent.concat(String.format(CPUINFO_FORMAT, i,
cpuFrequencyKHz / 1000D, i / 2, 0));
fileContent = fileContent.concat("\n");
}
writeFakeCPUInfoFile(fileContent);
plugin.setReadCpuInfoFile(false);
assertEquals(numProcessors, plugin.getNumProcessors());
assertEquals(2, plugin.getNumCores());
// dual socket, dual core, no hyper threading
fileContent = "";
numProcessors = 4;
for (int i = 0; i < numProcessors; i++) {
fileContent =
fileContent.concat(String.format(CPUINFO_FORMAT, i,
cpuFrequencyKHz / 1000D, i / 2, i % 2));
fileContent = fileContent.concat("\n");
}
writeFakeCPUInfoFile(fileContent);
plugin.setReadCpuInfoFile(false);
assertEquals(numProcessors, plugin.getNumProcessors());
assertEquals(4, plugin.getNumCores());
// dual socket, dual core, hyper threading
fileContent = "";
numProcessors = 8;
for (int i = 0; i < numProcessors; i++) {
fileContent =
fileContent.concat(String.format(CPUINFO_FORMAT, i,
cpuFrequencyKHz / 1000D, i / 4, (i % 4) / 2));
fileContent = fileContent.concat("\n");
}
writeFakeCPUInfoFile(fileContent);
plugin.setReadCpuInfoFile(false);
assertEquals(numProcessors, plugin.getNumProcessors());
assertEquals(4, plugin.getNumCores());
}
private void writeFakeCPUInfoFile(String content) throws IOException {
File tempFile = new File(FAKE_CPUFILE);
FileWriter fWriter = new FileWriter(FAKE_CPUFILE);
tempFile.deleteOnExit();
try {
fWriter.write(content);
} finally {
IOUtils.closeQuietly(fWriter);
}
}
}

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
@ -372,28 +373,16 @@ protected String[] getRunCommand(String command, String groupId,
YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED,
YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED)) {
int containerVCores = resource.getVirtualCores();
int nodeVCores = conf.getInt(YarnConfiguration.NM_VCORES,
YarnConfiguration.DEFAULT_NM_VCORES);
// cap overall usage to the number of cores allocated to YARN
int nodeCpuPercentage = Math
.min(
conf.getInt(
YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
YarnConfiguration.DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT),
100);
nodeCpuPercentage = Math.max(0, nodeCpuPercentage);
if (nodeCpuPercentage == 0) {
String message = "Illegal value for "
+ YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
+ ". Value cannot be less than or equal to 0.";
throw new IllegalArgumentException(message);
}
float yarnVCores = (nodeCpuPercentage * nodeVCores) / 100.0f;
int nodeVCores = NodeManagerHardwareUtils.getVCores(conf);
int nodeCpuPercentage =
NodeManagerHardwareUtils.getNodeCpuPercentage(conf);
float containerCpuPercentage =
(float) (nodeCpuPercentage * containerVCores) / nodeVCores;
// CPU should be set to a percentage * 100, e.g. 20% cpu rate limit
// should be set as 20 * 100. The following setting is equal to:
// 100 * (100 * (vcores / Total # of cores allocated to YARN))
cpuRate = Math.min(10000,
(int) ((containerVCores * 10000) / yarnVCores));
// should be set as 20 * 100.
cpuRate = Math.min(10000, (int) (containerCpuPercentage * 100));
}
}
return new String[] { Shell.WINUTILS, "task", "create", "-m",

View File

@ -76,6 +76,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting;
@ -157,18 +159,16 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
@Override
protected void serviceInit(Configuration conf) throws Exception {
int memoryMb =
conf.getInt(
YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
int memoryMb = NodeManagerHardwareUtils.getContainerMemoryMB(conf);
float vMemToPMem =
conf.getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem);
int virtualCores =
conf.getInt(
YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
int virtualCores = NodeManagerHardwareUtils.getVCores(conf);
LOG.info("Nodemanager resources: memory set to " + memoryMb + "MB.");
LOG.info("Nodemanager resources: vcores set to " + virtualCores + ".");
this.totalResource = Resource.newInstance(memoryMb, virtualCores);
metrics.addResource(totalResource);

View File

@ -117,14 +117,11 @@ protected void serviceInit(Configuration conf) throws Exception {
conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
long configuredPMemForContainers = conf.getLong(
YarnConfiguration.NM_PMEM_MB,
YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l;
long configuredVCoresForContainers = conf.getLong(
YarnConfiguration.NM_VCORES,
YarnConfiguration.DEFAULT_NM_VCORES);
long configuredPMemForContainers =
NodeManagerHardwareUtils.getContainerMemoryMB(conf) * 1024 * 1024L;
long configuredVCoresForContainers =
NodeManagerHardwareUtils.getVCores(conf);
// Setting these irrespective of whether checks are enabled. Required in
// the UI.

View File

@ -22,7 +22,6 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
@ -83,6 +82,7 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
Clock clock;
private float yarnProcessors;
int nodeVCores;
public CgroupsLCEResourcesHandler() {
this.controllerPaths = new HashMap<String, String>();
@ -152,9 +152,11 @@ void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin)
initializeControllerPaths();
nodeVCores = NodeManagerHardwareUtils.getVCores(plugin, conf);
// cap overall usage to the number of cores allocated to YARN
yarnProcessors = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
int systemProcessors = plugin.getNumProcessors();
yarnProcessors = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
int systemProcessors = NodeManagerHardwareUtils.getNodeCPUs(plugin, conf);
if (systemProcessors != (int) yarnProcessors) {
LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
int[] limits = getOverallLimits(yarnProcessors);
@ -368,9 +370,6 @@ private void setupLimits(ContainerId containerId,
updateCgroup(CONTROLLER_CPU, containerName, "shares",
String.valueOf(cpuShares));
if (strictResourceUsageMode) {
int nodeVCores =
conf.getInt(YarnConfiguration.NM_VCORES,
YarnConfiguration.DEFAULT_NM_VCORES);
if (nodeVCores != containerVCores) {
float containerCPU =
(containerVCores * yarnProcessors) / (float) nodeVCores;

View File

@ -18,35 +18,84 @@
package org.apache.hadoop.yarn.server.nodemanager.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
/**
* Helper class to determine hardware related characteristics such as the
* number of processors and the amount of memory on the node.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class NodeManagerHardwareUtils {
private static final Log LOG = LogFactory
.getLog(NodeManagerHardwareUtils.class);
/**
*
* Returns the fraction of CPU cores that should be used for YARN containers.
* Returns the number of CPUs on the node. This value depends on the
* configuration setting which decides whether to count logical processors
* (such as hyperthreads) as cores or not.
*
* @param conf
* - Configuration object
* @return Number of CPUs
*/
public static int getNodeCPUs(Configuration conf) {
ResourceCalculatorPlugin plugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf);
return NodeManagerHardwareUtils.getNodeCPUs(plugin, conf);
}
/**
*
* Returns the number of CPUs on the node. This value depends on the
* configuration setting which decides whether to count logical processors
* (such as hyperthreads) as cores or not.
*
* @param plugin
* - ResourceCalculatorPlugin object to determine hardware specs
* @param conf
* - Configuration object
* @return Number of CPU cores on the node.
*/
public static int getNodeCPUs(ResourceCalculatorPlugin plugin,
Configuration conf) {
int numProcessors = plugin.getNumProcessors();
boolean countLogicalCores =
conf.getBoolean(YarnConfiguration.NM_COUNT_LOGICAL_PROCESSORS_AS_CORES,
YarnConfiguration.DEFAULT_NM_COUNT_LOGICAL_PROCESSORS_AS_CORES);
if (!countLogicalCores) {
numProcessors = plugin.getNumCores();
}
return numProcessors;
}
/**
*
* Returns the fraction of CPUs that should be used for YARN containers.
* The number is derived based on various configuration params such as
* YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
*
* @param conf
* - Configuration object
* @return Fraction of CPU cores to be used for YARN containers
* @return Fraction of CPUs to be used for YARN containers
*/
public static float getContainersCores(Configuration conf) {
public static float getContainersCPUs(Configuration conf) {
ResourceCalculatorPlugin plugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf);
return NodeManagerHardwareUtils.getContainersCores(plugin, conf);
return NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
}
/**
*
* Returns the fraction of CPU cores that should be used for YARN containers.
* Returns the fraction of CPUs that should be used for YARN containers.
* The number is derived based on various configuration params such as
* YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
*
@ -54,11 +103,11 @@ public static float getContainersCores(Configuration conf) {
* - ResourceCalculatorPlugin object to determine hardware specs
* @param conf
* - Configuration object
* @return Fraction of CPU cores to be used for YARN containers
* @return Fraction of CPUs to be used for YARN containers
*/
public static float getContainersCores(ResourceCalculatorPlugin plugin,
public static float getContainersCPUs(ResourceCalculatorPlugin plugin,
Configuration conf) {
int numProcessors = plugin.getNumProcessors();
int numProcessors = getNodeCPUs(plugin, conf);
int nodeCpuPercentage = getNodeCpuPercentage(conf);
return (nodeCpuPercentage * numProcessors) / 100.0f;
@ -88,4 +137,177 @@ public static int getNodeCpuPercentage(Configuration conf) {
}
return nodeCpuPercentage;
}
/**
* Function to return the number of vcores on the system that can be used for
* YARN containers. If a number is specified in the configuration file, then
* that number is returned. If nothing is specified - 1. If the OS is an
* "unknown" OS(one for which we don't have ResourceCalculatorPlugin
* implemented), return the default NodeManager cores. 2. If the config
* variable yarn.nodemanager.cpu.use_logical_processors is set to true, it
* returns the logical processor count(count hyperthreads as cores), else it
* returns the physical cores count.
*
* @param conf
* - the configuration for the NodeManager
* @return the number of cores to be used for YARN containers
*
*/
public static int getVCores(Configuration conf) {
// is this os for which we can determine cores?
ResourceCalculatorPlugin plugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf);
return NodeManagerHardwareUtils.getVCores(plugin, conf);
}
/**
* Function to return the number of vcores on the system that can be used for
* YARN containers. If a number is specified in the configuration file, then
* that number is returned. If nothing is specified - 1. If the OS is an
* "unknown" OS(one for which we don't have ResourceCalculatorPlugin
* implemented), return the default NodeManager cores. 2. If the config
* variable yarn.nodemanager.cpu.use_logical_processors is set to true, it
* returns the logical processor count(count hyperthreads as cores), else it
* returns the physical cores count.
*
* @param plugin
* - ResourceCalculatorPlugin object to determine hardware specs
* @param conf
* - the configuration for the NodeManager
* @return the number of cores to be used for YARN containers
*
*/
public static int getVCores(ResourceCalculatorPlugin plugin,
Configuration conf) {
int cores;
boolean hardwareDetectionEnabled =
conf.getBoolean(
YarnConfiguration.NM_ENABLE_HARDWARE_CAPABILITY_DETECTION,
YarnConfiguration.DEFAULT_NM_ENABLE_HARDWARE_CAPABILITY_DETECTION);
String message;
if (!hardwareDetectionEnabled || plugin == null) {
cores =
conf.getInt(YarnConfiguration.NM_VCORES,
YarnConfiguration.DEFAULT_NM_VCORES);
if (cores == -1) {
cores = YarnConfiguration.DEFAULT_NM_VCORES;
}
} else {
cores = conf.getInt(YarnConfiguration.NM_VCORES, -1);
if (cores == -1) {
float physicalCores =
NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
float multiplier =
conf.getFloat(YarnConfiguration.NM_PCORES_VCORES_MULTIPLIER,
YarnConfiguration.DEFAULT_NM_PCORES_VCORES_MULTIPLIER);
if (multiplier > 0) {
float tmp = physicalCores * multiplier;
if (tmp > 0 && tmp < 1) {
// on a single core machine - tmp can be between 0 and 1
cores = 1;
} else {
cores = (int) tmp;
}
} else {
message = "Illegal value for "
+ YarnConfiguration.NM_PCORES_VCORES_MULTIPLIER
+ ". Value must be greater than 0.";
throw new IllegalArgumentException(message);
}
}
}
if(cores <= 0) {
message = "Illegal value for " + YarnConfiguration.NM_VCORES
+ ". Value must be greater than 0.";
throw new IllegalArgumentException(message);
}
return cores;
}
/**
* Function to return how much memory we should set aside for YARN containers.
* If a number is specified in the configuration file, then that number is
* returned. If nothing is specified - 1. If the OS is an "unknown" OS(one for
* which we don't have ResourceCalculatorPlugin implemented), return the
* default NodeManager physical memory. 2. If the OS has a
* ResourceCalculatorPlugin implemented, the calculation is 0.8 * (RAM - 2 *
* JVM-memory) i.e. use 80% of the memory after accounting for memory used by
* the DataNode and the NodeManager. If the number is less than 1GB, log a
* warning message.
*
* @param conf
* - the configuration for the NodeManager
* @return the amount of memory that will be used for YARN containers in MB.
*/
public static int getContainerMemoryMB(Configuration conf) {
return NodeManagerHardwareUtils.getContainerMemoryMB(
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf), conf);
}
/**
* Function to return how much memory we should set aside for YARN containers.
* If a number is specified in the configuration file, then that number is
* returned. If nothing is specified - 1. If the OS is an "unknown" OS(one for
* which we don't have ResourceCalculatorPlugin implemented), return the
* default NodeManager physical memory. 2. If the OS has a
* ResourceCalculatorPlugin implemented, the calculation is 0.8 * (RAM - 2 *
* JVM-memory) i.e. use 80% of the memory after accounting for memory used by
* the DataNode and the NodeManager. If the number is less than 1GB, log a
* warning message.
*
* @param plugin
* - ResourceCalculatorPlugin object to determine hardware specs
* @param conf
* - the configuration for the NodeManager
* @return the amount of memory that will be used for YARN containers in MB.
*/
public static int getContainerMemoryMB(ResourceCalculatorPlugin plugin,
Configuration conf) {
int memoryMb;
boolean hardwareDetectionEnabled = conf.getBoolean(
YarnConfiguration.NM_ENABLE_HARDWARE_CAPABILITY_DETECTION,
YarnConfiguration.DEFAULT_NM_ENABLE_HARDWARE_CAPABILITY_DETECTION);
if (!hardwareDetectionEnabled || plugin == null) {
memoryMb = conf.getInt(YarnConfiguration.NM_PMEM_MB,
YarnConfiguration.DEFAULT_NM_PMEM_MB);
if (memoryMb == -1) {
memoryMb = YarnConfiguration.DEFAULT_NM_PMEM_MB;
}
} else {
memoryMb = conf.getInt(YarnConfiguration.NM_PMEM_MB, -1);
if (memoryMb == -1) {
int physicalMemoryMB =
(int) (plugin.getPhysicalMemorySize() / (1024 * 1024));
int hadoopHeapSizeMB =
(int) (Runtime.getRuntime().maxMemory() / (1024 * 1024));
int containerPhysicalMemoryMB =
(int) (0.8f * (physicalMemoryMB - (2 * hadoopHeapSizeMB)));
int reservedMemoryMB =
conf.getInt(YarnConfiguration.NM_SYSTEM_RESERVED_PMEM_MB, -1);
if (reservedMemoryMB != -1) {
containerPhysicalMemoryMB = physicalMemoryMB - reservedMemoryMB;
}
if(containerPhysicalMemoryMB <= 0) {
LOG.error("Calculated memory for YARN containers is too low."
+ " Node memory is " + physicalMemoryMB
+ " MB, system reserved memory is "
+ reservedMemoryMB + " MB.");
}
containerPhysicalMemoryMB = Math.max(containerPhysicalMemoryMB, 0);
memoryMb = containerPhysicalMemoryMB;
}
}
if(memoryMb <= 0) {
String message = "Illegal value for " + YarnConfiguration.NM_PMEM_MB
+ ". Value must be greater than 0.";
throw new IllegalArgumentException(message);
}
return memoryMb;
}
}

View File

@ -108,18 +108,56 @@ public void testRunCommandWithMemoryOnlyResources() {
public void testRunCommandWithCpuAndMemoryResources() {
// Windows only test
assumeTrue(Shell.WINDOWS);
int containerCores = 1;
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED, "true");
conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, "true");
String[] command = containerExecutor.getRunCommand("echo", "group1", null, null,
conf, Resource.newInstance(1024, 1));
float yarnProcessors = NodeManagerHardwareUtils.getContainersCores(
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf),
conf);
int cpuRate = Math.min(10000, (int) ((1 * 10000) / yarnProcessors));
String[] command =
containerExecutor.getRunCommand("echo", "group1", null, null, conf,
Resource.newInstance(1024, 1));
int nodeVCores = NodeManagerHardwareUtils.getVCores(conf);
Assert.assertEquals(YarnConfiguration.DEFAULT_NM_VCORES, nodeVCores);
int cpuRate = Math.min(10000, (containerCores * 10000) / nodeVCores);
// Assert the cpu and memory limits are set correctly in the command
String[] expected = { Shell.WINUTILS, "task", "create", "-m", "1024", "-c",
String[] expected =
{Shell.WINUTILS, "task", "create", "-m", "1024", "-c",
String.valueOf(cpuRate), "group1", "cmd /c " + "echo" };
Assert.assertTrue(Arrays.equals(expected, command));
Assert.assertEquals(Arrays.toString(expected), Arrays.toString(command));
conf.setBoolean(YarnConfiguration.NM_ENABLE_HARDWARE_CAPABILITY_DETECTION,
true);
int nodeCPUs = NodeManagerHardwareUtils.getNodeCPUs(conf);
float yarnCPUs = NodeManagerHardwareUtils.getContainersCPUs(conf);
nodeVCores = NodeManagerHardwareUtils.getVCores(conf);
Assert.assertEquals(nodeCPUs, (int) yarnCPUs);
Assert.assertEquals(nodeCPUs, nodeVCores);
command =
containerExecutor.getRunCommand("echo", "group1", null, null, conf,
Resource.newInstance(1024, 1));
cpuRate = Math.min(10000, (containerCores * 10000) / nodeVCores);
expected[6] = String.valueOf(cpuRate);
Assert.assertEquals(Arrays.toString(expected), Arrays.toString(command));
int yarnCpuLimit = 80;
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
yarnCpuLimit);
yarnCPUs = NodeManagerHardwareUtils.getContainersCPUs(conf);
nodeVCores = NodeManagerHardwareUtils.getVCores(conf);
Assert.assertEquals(nodeCPUs * 0.8, yarnCPUs, 0.01);
if (nodeCPUs == 1) {
Assert.assertEquals(1, nodeVCores);
} else {
Assert.assertEquals((int) (nodeCPUs * 0.8), nodeVCores);
}
command =
containerExecutor.getRunCommand("echo", "group1", null, null, conf,
Resource.newInstance(1024, 1));
// we should get 100 * (1/nodeVcores) of 80% of CPU
int containerPerc = (yarnCpuLimit * containerCores) / nodeVCores;
cpuRate = Math.min(10000, 100 * containerPerc);
expected[6] = String.valueOf(cpuRate);
Assert.assertEquals(Arrays.toString(expected), Arrays.toString(command));
}
}

View File

@ -160,6 +160,7 @@ public void testInit() throws IOException {
ResourceCalculatorPlugin plugin =
Mockito.mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
Mockito.doReturn(numProcessors).when(plugin).getNumCores();
handler.setConf(conf);
handler.initConfig();
@ -256,6 +257,7 @@ public void testContainerLimits() throws IOException {
ResourceCalculatorPlugin plugin =
Mockito.mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
Mockito.doReturn(numProcessors).when(plugin).getNumCores();
handler.setConf(conf);
handler.initConfig();

View File

@ -24,49 +24,170 @@
import org.junit.Test;
import org.mockito.Mockito;
/**
* Test the various functions provided by the NodeManagerHardwareUtils class.
*/
public class TestNodeManagerHardwareUtils {
static class TestResourceCalculatorPlugin extends ResourceCalculatorPlugin {
@Override
public long getVirtualMemorySize() {
return 0;
}
@Override
public long getPhysicalMemorySize() {
long ret = Runtime.getRuntime().maxMemory() * 2;
ret = ret + (4L * 1024 * 1024 * 1024);
return ret;
}
@Override
public long getAvailableVirtualMemorySize() {
return 0;
}
@Override
public long getAvailablePhysicalMemorySize() {
return 0;
}
@Override
public int getNumProcessors() {
return 8;
}
@Override
public long getCpuFrequency() {
return 0;
}
@Override
public long getCumulativeCpuTime() {
return 0;
}
@Override
public float getCpuUsage() {
return 0;
}
@Override
public int getNumCores() {
return 4;
}
}
@Test
public void testGetContainerCores() {
public void testGetContainerCPU() {
YarnConfiguration conf = new YarnConfiguration();
float ret;
final int numProcessors = 4;
final int numProcessors = 8;
final int numCores = 4;
ResourceCalculatorPlugin plugin =
Mockito.mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
Mockito.doReturn(numCores).when(plugin).getNumCores();
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 0);
boolean catchFlag = false;
try {
NodeManagerHardwareUtils.getContainersCores(plugin, conf);
NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
Assert.fail("getContainerCores should have thrown exception");
} catch (IllegalArgumentException ie) {
// expected
catchFlag = true;
}
Assert.assertTrue(catchFlag);
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
100);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
ret = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
Assert.assertEquals(4, (int) ret);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 50);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
ret = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
Assert.assertEquals(2, (int) ret);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
ret = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
Assert.assertEquals(3, (int) ret);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 85);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
ret = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
Assert.assertEquals(3.4, ret, 0.1);
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
110);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
ret = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
Assert.assertEquals(4, (int) ret);
}
@Test
public void testGetVCores() {
ResourceCalculatorPlugin plugin = new TestResourceCalculatorPlugin();
YarnConfiguration conf = new YarnConfiguration();
conf.setFloat(YarnConfiguration.NM_PCORES_VCORES_MULTIPLIER, 1.25f);
int ret = NodeManagerHardwareUtils.getVCores(plugin, conf);
Assert.assertEquals(YarnConfiguration.DEFAULT_NM_VCORES, ret);
conf.setBoolean(YarnConfiguration.NM_ENABLE_HARDWARE_CAPABILITY_DETECTION,
true);
ret = NodeManagerHardwareUtils.getVCores(plugin, conf);
Assert.assertEquals(5, ret);
conf.setBoolean(YarnConfiguration.NM_COUNT_LOGICAL_PROCESSORS_AS_CORES,
true);
ret = NodeManagerHardwareUtils.getVCores(plugin, conf);
Assert.assertEquals(10, ret);
conf.setInt(YarnConfiguration.NM_VCORES, 10);
ret = NodeManagerHardwareUtils.getVCores(plugin, conf);
Assert.assertEquals(10, ret);
YarnConfiguration conf1 = new YarnConfiguration();
conf1.setBoolean(YarnConfiguration.NM_ENABLE_HARDWARE_CAPABILITY_DETECTION,
false);
conf.setInt(YarnConfiguration.NM_VCORES, 10);
ret = NodeManagerHardwareUtils.getVCores(plugin, conf);
Assert.assertEquals(10, ret);
}
@Test
public void testGetContainerMemoryMB() throws Exception {
ResourceCalculatorPlugin plugin = new TestResourceCalculatorPlugin();
long physicalMemMB = plugin.getPhysicalMemorySize() / (1024 * 1024);
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_ENABLE_HARDWARE_CAPABILITY_DETECTION,
true);
int mem = NodeManagerHardwareUtils.getContainerMemoryMB(null, conf);
Assert.assertEquals(YarnConfiguration.DEFAULT_NM_PMEM_MB, mem);
mem = NodeManagerHardwareUtils.getContainerMemoryMB(plugin, conf);
int hadoopHeapSizeMB =
(int) (Runtime.getRuntime().maxMemory() / (1024 * 1024));
int calculatedMemMB =
(int) (0.8 * (physicalMemMB - (2 * hadoopHeapSizeMB)));
Assert.assertEquals(calculatedMemMB, mem);
conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024);
mem = NodeManagerHardwareUtils.getContainerMemoryMB(conf);
Assert.assertEquals(1024, mem);
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_ENABLE_HARDWARE_CAPABILITY_DETECTION,
false);
mem = NodeManagerHardwareUtils.getContainerMemoryMB(conf);
Assert.assertEquals(YarnConfiguration.DEFAULT_NM_PMEM_MB, mem);
conf.setInt(YarnConfiguration.NM_PMEM_MB, 10 * 1024);
mem = NodeManagerHardwareUtils.getContainerMemoryMB(conf);
Assert.assertEquals(10 * 1024, mem);
}
}