diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 667747836b..2afff43598 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3585,6 +3585,22 @@ public static boolean areNodeLabelsEnabled(
DEFAULT_TIMELINE_SERVICE_COLLECTOR_WEBAPP_HTTPS_ADDRESS =
DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS;
+ /**
+ * Settings for NUMA awareness.
+ */
+ public static final String NM_NUMA_AWARENESS_ENABLED = NM_PREFIX
+ + "numa-awareness.enabled";
+ public static final boolean DEFAULT_NM_NUMA_AWARENESS_ENABLED = false;
+ public static final String NM_NUMA_AWARENESS_READ_TOPOLOGY = NM_PREFIX
+ + "numa-awareness.read-topology";
+ public static final boolean DEFAULT_NM_NUMA_AWARENESS_READ_TOPOLOGY = false;
+ public static final String NM_NUMA_AWARENESS_NODE_IDS = NM_PREFIX
+ + "numa-awareness.node-ids";
+ public static final String NM_NUMA_AWARENESS_NUMACTL_CMD = NM_PREFIX
+ + "numa-awareness.numactl.cmd";
+ public static final String DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD =
+ "/usr/bin/numactl";
+
public YarnConfiguration() {
super();
}
@@ -3791,6 +3807,17 @@ public static boolean systemMetricsPublisherEnabled(Configuration conf) {
YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
}
+ /**
+ * Returns whether the NUMA awareness is enabled.
+ *
+ * @param conf the configuration
+ * @return whether the NUMA awareness is enabled.
+ */
+ public static boolean numaAwarenessEnabled(Configuration conf) {
+ return conf.getBoolean(NM_NUMA_AWARENESS_ENABLED,
+ DEFAULT_NM_NUMA_AWARENESS_ENABLED);
+ }
+
/* For debugging. mp configurations to system output as XML format. */
public static void main(String[] args) throws Exception {
new YarnConfiguration(new Configuration()).writeXml(System.out);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index adf8d8acc5..e192a0d9a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3711,4 +3711,55 @@
+
+
+ Whether to enable the NUMA awareness for containers in Node Manager.
+
+ yarn.nodemanager.numa-awareness.enabled
+ false
+
+
+
+
+ Whether to read the NUMA topology from the system or from the
+ configurations. If the value is true then NM reads the NUMA topology from
+ system using the command 'numactl --hardware'. If the value is false then NM
+ reads the topology from the configurations
+ 'yarn.nodemanager.numa-awareness.node-ids'(for node id's),
+ 'yarn.nodemanager.numa-awareness.<NODE_ID>.memory'(for each node memory),
+ 'yarn.nodemanager.numa-awareness.<NODE_ID>.cpus'(for each node cpus).
+
+ yarn.nodemanager.numa-awareness.read-topology
+ false
+
+
+
+
+ NUMA node id's in the form of comma separated list. Memory and No of CPUs
+ will be read using the properties
+ 'yarn.nodemanager.numa-awareness.<NODE_ID>.memory' and
+ 'yarn.nodemanager.numa-awareness.<NODE_ID>.cpus' for each id specified
+ in this value. This property value will be read only when
+ 'yarn.nodemanager.numa-awareness.read-topology=false'.
+
+ For example, if yarn.nodemanager.numa-awareness.node-ids=0,1
+ then need to specify memory and cpus for node id's '0' and '1' like below,
+ yarn.nodemanager.numa-awareness.0.memory=73717
+ yarn.nodemanager.numa-awareness.0.cpus=4
+ yarn.nodemanager.numa-awareness.1.memory=73727
+ yarn.nodemanager.numa-awareness.1.cpus=4
+
+ yarn.nodemanager.numa-awareness.node-ids
+
+
+
+
+
+ The numactl command path which controls NUMA policy for processes or
+ shared memory.
+
+ yarn.nodemanager.numa-awareness.numactl.cmd
+ /usr/bin/numactl
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
index 44edc21a38..e92c4e26ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
@@ -485,6 +485,7 @@ public int launchContainer(ContainerStartContext ctx)
container.getResource());
String resourcesOptions = resourcesHandler.getResourcesOption(containerId);
String tcCommandFile = null;
+ List numaArgs = null;
try {
if (resourceHandlerChain != null) {
@@ -506,6 +507,9 @@ public int launchContainer(ContainerStartContext ctx)
case TC_MODIFY_STATE:
tcCommandFile = op.getArguments().get(0);
break;
+ case ADD_NUMA_PARAMS:
+ numaArgs = op.getArguments();
+ break;
default:
LOG.warn("PrivilegedOperation type unsupported in launch: "
+ op.getOperationType());
@@ -536,7 +540,7 @@ public int launchContainer(ContainerStartContext ctx)
if (pidFilePath != null) {
ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext(
- ctx, pidFilePath, resourcesOptions, tcCommandFile);
+ ctx, pidFilePath, resourcesOptions, tcCommandFile, numaArgs);
linuxContainerRuntime.launchContainer(runtimeContext);
} else {
@@ -610,11 +614,12 @@ public int launchContainer(ContainerStartContext ctx)
}
private ContainerRuntimeContext buildContainerRuntimeContext(
- ContainerStartContext ctx, Path pidFilePath,
- String resourcesOptions, String tcCommandFile) {
+ ContainerStartContext ctx, Path pidFilePath, String resourcesOptions,
+ String tcCommandFile, List numaArgs) {
List prefixCommands = new ArrayList<>();
addSchedPriorityCommand(prefixCommands);
+ addNumaArgsToCommand(prefixCommands, numaArgs);
Container container = ctx.getContainer();
@@ -662,6 +667,13 @@ public String[] getIpAndHost(Container container)
return linuxContainerRuntime.getIpAndHost(container);
}
+ private void addNumaArgsToCommand(List prefixCommands,
+ List numaArgs) {
+ if (numaArgs != null) {
+ prefixCommands.addAll(numaArgs);
+ }
+ }
+
@Override
public int reacquireContainer(ContainerReacquisitionContext ctx)
throws IOException, InterruptedException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
index ad8c22fd79..189c0d09b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
@@ -53,7 +53,8 @@ public enum OperationType {
RUN_DOCKER_CMD("--run-docker"),
GPU("--module-gpu"),
FPGA("--module-fpga"),
- LIST_AS_USER(""); //no CLI switch supported yet.
+ LIST_AS_USER(""), // no CLI switch supported yet.
+ ADD_NUMA_PARAMS(""); // no CLI switch supported yet.
private final String option;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
index a02204d614..fc55696064 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceHandlerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;
@@ -253,6 +254,14 @@ public static MemoryResourceHandler initMemoryResourceHandler(
return cGroupsMemoryResourceHandler;
}
+ private static ResourceHandler getNumaResourceHandler(Configuration conf,
+ Context nmContext) {
+ if (YarnConfiguration.numaAwarenessEnabled(conf)) {
+ return new NumaResourceHandlerImpl(conf, nmContext);
+ }
+ return null;
+ }
+
private static void addHandlerIfNotNull(List handlerList,
ResourceHandler handler) {
if (handler != null) {
@@ -273,6 +282,7 @@ private static void initializeConfiguredResourceHandlerChain(
initMemoryResourceHandler(conf));
addHandlerIfNotNull(handlerList,
initCGroupsCpuResourceHandler(conf));
+ addHandlerIfNotNull(handlerList, getNumaResourceHandler(conf, nmContext));
addHandlersFromConfiguredResourcePlugins(handlerList, conf, nmContext);
resourceHandlerChain = new ResourceHandlerChain(handlerList);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaNodeResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaNodeResource.java
new file mode 100644
index 0000000000..f434412ac2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaNodeResource.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * NumaNodeResource class holds the NUMA node topology with the total and used
+ * resources.
+ */
+public class NumaNodeResource {
+ private String nodeId;
+ private long totalMemory;
+ private int totalCpus;
+ private long usedMemory;
+ private int usedCpus;
+
+ private static final Log LOG = LogFactory.getLog(NumaNodeResource.class);
+
+ private Map containerVsMemUsage =
+ new ConcurrentHashMap<>();
+ private Map containerVsCpusUsage =
+ new ConcurrentHashMap<>();
+
+ public NumaNodeResource(String nodeId, long totalMemory, int totalCpus) {
+ this.nodeId = nodeId;
+ this.totalMemory = totalMemory;
+ this.totalCpus = totalCpus;
+ }
+
+ /**
+ * Checks whether the specified resources available or not.
+ *
+ * @param resource resource
+ * @return whether the specified resources available or not
+ */
+ public boolean isResourcesAvailable(Resource resource) {
+ LOG.debug(
+ "Memory available:" + (totalMemory - usedMemory) + ", CPUs available:"
+ + (totalCpus - usedCpus) + ", requested:" + resource);
+ if ((totalMemory - usedMemory) >= resource.getMemorySize()
+ && (totalCpus - usedCpus) >= resource.getVirtualCores()) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Assigns available memory and returns the remaining needed memory.
+ *
+ * @param memreq required memory
+ * @param containerId which container memory to assign
+ * @return remaining needed memory
+ */
+ public long assignAvailableMemory(long memreq, ContainerId containerId) {
+ long memAvailable = totalMemory - usedMemory;
+ if (memAvailable >= memreq) {
+ containerVsMemUsage.put(containerId, memreq);
+ usedMemory += memreq;
+ return 0;
+ } else {
+ usedMemory += memAvailable;
+ containerVsMemUsage.put(containerId, memAvailable);
+ return memreq - memAvailable;
+ }
+ }
+
+ /**
+ * Assigns available cpu's and returns the remaining needed cpu's.
+ *
+ * @param cpusreq required cpu's
+ * @param containerId which container cpu's to assign
+ * @return remaining needed cpu's
+ */
+ public int assignAvailableCpus(int cpusreq, ContainerId containerId) {
+ int cpusAvailable = totalCpus - usedCpus;
+ if (cpusAvailable >= cpusreq) {
+ containerVsCpusUsage.put(containerId, cpusreq);
+ usedCpus += cpusreq;
+ return 0;
+ } else {
+ usedCpus += cpusAvailable;
+ containerVsCpusUsage.put(containerId, cpusAvailable);
+ return cpusreq - cpusAvailable;
+ }
+ }
+
+ /**
+ * Assigns the requested resources for Container.
+ *
+ * @param resource resource to assign
+ * @param containerId to which container the resources to assign
+ */
+ public void assignResources(Resource resource, ContainerId containerId) {
+ containerVsMemUsage.put(containerId, resource.getMemorySize());
+ containerVsCpusUsage.put(containerId, resource.getVirtualCores());
+ usedMemory += resource.getMemorySize();
+ usedCpus += resource.getVirtualCores();
+ }
+
+ /**
+ * Releases the assigned resources for Container.
+ *
+ * @param containerId to which container the assigned resources to release
+ */
+ public void releaseResources(ContainerId containerId) {
+ if (containerVsMemUsage.containsKey(containerId)) {
+ usedMemory -= containerVsMemUsage.get(containerId);
+ containerVsMemUsage.remove(containerId);
+ }
+ if (containerVsCpusUsage.containsKey(containerId)) {
+ usedCpus -= containerVsCpusUsage.get(containerId);
+ containerVsCpusUsage.remove(containerId);
+ }
+ }
+
+ /**
+ * Recovers the memory resources for Container.
+ *
+ * @param containerId recover the memory resources for the Container
+ * @param memory memory to recover
+ */
+ public void recoverMemory(ContainerId containerId, long memory) {
+ containerVsMemUsage.put(containerId, memory);
+ usedMemory += memory;
+ }
+
+ /**
+ * Recovers the cpu's resources for Container.
+ *
+ * @param containerId recover the cpu's resources for the Container
+ * @param cpus cpu's to recover
+ */
+ public void recoverCpus(ContainerId containerId, int cpus) {
+ containerVsCpusUsage.put(containerId, cpus);
+ usedCpus += cpus;
+ }
+
+ @Override
+ public String toString() {
+ return "Node Id:" + nodeId + "\tMemory:" + totalMemory + "\tCPus:"
+ + totalCpus;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
+ result = prime * result + (int) (totalMemory ^ (totalMemory >>> 32));
+ result = prime * result + totalCpus;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ NumaNodeResource other = (NumaNodeResource) obj;
+ if (nodeId == null) {
+ if (other.nodeId != null) {
+ return false;
+ }
+ } else if (!nodeId.equals(other.nodeId)) {
+ return false;
+ }
+ if (totalMemory != other.totalMemory) {
+ return false;
+ }
+ if (totalCpus != other.totalCpus) {
+ return false;
+ }
+ return true;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java
new file mode 100644
index 0000000000..f8d47396e9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * NumaResourceAllocation contains Memory nodes and CPU nodes assigned to a
+ * container.
+ */
+public class NumaResourceAllocation implements Serializable {
+ private static final long serialVersionUID = 6339719798446595123L;
+ private Map nodeVsMemory;
+ private Map nodeVsCpus;
+
+ public NumaResourceAllocation() {
+ nodeVsMemory = new HashMap<>();
+ nodeVsCpus = new HashMap<>();
+ }
+
+ public NumaResourceAllocation(String memNodeId, long memory, String cpuNodeId,
+ int cpus) {
+ this();
+ nodeVsMemory.put(memNodeId, memory);
+ nodeVsCpus.put(cpuNodeId, cpus);
+ }
+
+ public void addMemoryNode(String memNodeId, long memory) {
+ nodeVsMemory.put(memNodeId, memory);
+ }
+
+ public void addCpuNode(String cpuNodeId, int cpus) {
+ nodeVsCpus.put(cpuNodeId, cpus);
+ }
+
+ public Set getMemNodes() {
+ return nodeVsMemory.keySet();
+ }
+
+ public Set getCpuNodes() {
+ return nodeVsCpus.keySet();
+ }
+
+ public Map getNodeVsMemory() {
+ return nodeVsMemory;
+ }
+
+ public Map getNodeVsCpus() {
+ return nodeVsCpus;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java
new file mode 100644
index 0000000000..e152bdab87
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * NUMA Resources Allocator reads the NUMA topology and assigns NUMA nodes to
+ * the containers.
+ */
+public class NumaResourceAllocator {
+
+ private static final Log LOG = LogFactory.getLog(NumaResourceAllocator.class);
+
+ // Regex to find node ids, Ex: 'available: 2 nodes (0-1)'
+ private static final String NUMA_NODEIDS_REGEX =
+ "available:\\s*[0-9]+\\s*nodes\\s*\\(([0-9\\-,]*)\\)";
+
+ // Regex to find node memory, Ex: 'node 0 size: 73717 MB'
+ private static final String NUMA_NODE_MEMORY_REGEX =
+ "node\\s*\\s*size:\\s*([0-9]+)\\s*([KMG]B)";
+
+ // Regex to find node cpus, Ex: 'node 0 cpus: 0 2 4 6'
+ private static final String NUMA_NODE_CPUS_REGEX =
+ "node\\s*\\s*cpus:\\s*([0-9\\s]+)";
+
+ private static final String GB = "GB";
+ private static final String KB = "KB";
+ private static final String NUMA_NODE = "";
+ private static final String SPACE = "\\s";
+ private static final long DEFAULT_NUMA_NODE_MEMORY = 1024;
+ private static final int DEFAULT_NUMA_NODE_CPUS = 1;
+ private static final String NUMA_RESOURCE_TYPE = "numa";
+
+ private List numaNodesList = new ArrayList<>();
+ private Map numaNodeIdVsResource = new HashMap<>();
+ private int currentAssignNode;
+
+ private Context context;
+
+ public NumaResourceAllocator(Context context) {
+ this.context = context;
+ }
+
+ public void init(Configuration conf) throws YarnException {
+ if (conf.getBoolean(YarnConfiguration.NM_NUMA_AWARENESS_READ_TOPOLOGY,
+ YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_READ_TOPOLOGY)) {
+ LOG.info("Reading NUMA topology using 'numactl --hardware' command.");
+ String cmdOutput = executeNGetCmdOutput(conf);
+ String[] outputLines = cmdOutput.split("\\n");
+ Pattern pattern = Pattern.compile(NUMA_NODEIDS_REGEX);
+ String nodeIdsStr = null;
+ for (String line : outputLines) {
+ Matcher matcher = pattern.matcher(line);
+ if (matcher.find()) {
+ nodeIdsStr = matcher.group(1);
+ break;
+ }
+ }
+ if (nodeIdsStr == null) {
+ throw new YarnException("Failed to get numa nodes from"
+ + " 'numactl --hardware' output and output is:\n" + cmdOutput);
+ }
+ String[] nodeIdCommaSplits = nodeIdsStr.split("[,\\s]");
+ for (String nodeIdOrRange : nodeIdCommaSplits) {
+ if (nodeIdOrRange.contains("-")) {
+ String[] beginNEnd = nodeIdOrRange.split("-");
+ int endNode = Integer.parseInt(beginNEnd[1]);
+ for (int nodeId = Integer
+ .parseInt(beginNEnd[0]); nodeId <= endNode; nodeId++) {
+ long memory = parseMemory(outputLines, String.valueOf(nodeId));
+ int cpus = parseCpus(outputLines, String.valueOf(nodeId));
+ addToCollection(String.valueOf(nodeId), memory, cpus);
+ }
+ } else {
+ long memory = parseMemory(outputLines, nodeIdOrRange);
+ int cpus = parseCpus(outputLines, nodeIdOrRange);
+ addToCollection(nodeIdOrRange, memory, cpus);
+ }
+ }
+ } else {
+ LOG.info("Reading NUMA topology using configurations.");
+ Collection nodeIds = conf
+ .getStringCollection(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS);
+ for (String nodeId : nodeIds) {
+ long mem = conf.getLong(
+ "yarn.nodemanager.numa-awareness." + nodeId + ".memory",
+ DEFAULT_NUMA_NODE_MEMORY);
+ int cpus = conf.getInt(
+ "yarn.nodemanager.numa-awareness." + nodeId + ".cpus",
+ DEFAULT_NUMA_NODE_CPUS);
+ addToCollection(nodeId, mem, cpus);
+ }
+ }
+ if (numaNodesList.isEmpty()) {
+ throw new YarnException("There are no available NUMA nodes"
+ + " for making containers NUMA aware.");
+ }
+ LOG.info("Available numa nodes with capacities : " + numaNodesList.size());
+ }
+
+ @VisibleForTesting
+ String executeNGetCmdOutput(Configuration conf) throws YarnException {
+ String numaCtlCmd = conf.get(
+ YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD,
+ YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD);
+ String[] args = new String[] {numaCtlCmd, "--hardware"};
+ ShellCommandExecutor shExec = new ShellCommandExecutor(args);
+ try {
+ shExec.execute();
+ } catch (IOException e) {
+ throw new YarnException("Failed to read the numa configurations.", e);
+ }
+ return shExec.getOutput();
+ }
+
+ private int parseCpus(String[] outputLines, String nodeId) {
+ int cpus = 0;
+ Pattern patternNodeCPUs = Pattern
+ .compile(NUMA_NODE_CPUS_REGEX.replace(NUMA_NODE, nodeId));
+ for (String line : outputLines) {
+ Matcher matcherNodeCPUs = patternNodeCPUs.matcher(line);
+ if (matcherNodeCPUs.find()) {
+ String cpusStr = matcherNodeCPUs.group(1);
+ cpus = cpusStr.split(SPACE).length;
+ break;
+ }
+ }
+ return cpus;
+ }
+
+ private long parseMemory(String[] outputLines, String nodeId)
+ throws YarnException {
+ long memory = 0;
+ String units;
+ Pattern patternNodeMem = Pattern
+ .compile(NUMA_NODE_MEMORY_REGEX.replace(NUMA_NODE, nodeId));
+ for (String line : outputLines) {
+ Matcher matcherNodeMem = patternNodeMem.matcher(line);
+ if (matcherNodeMem.find()) {
+ try {
+ memory = Long.parseLong(matcherNodeMem.group(1));
+ units = matcherNodeMem.group(2);
+ if (GB.equals(units)) {
+ memory = memory * 1024;
+ } else if (KB.equals(units)) {
+ memory = memory / 1024;
+ }
+ } catch (Exception ex) {
+ throw new YarnException("Failed to get memory for node:" + nodeId,
+ ex);
+ }
+ break;
+ }
+ }
+ return memory;
+ }
+
+ private void addToCollection(String nodeId, long memory, int cpus) {
+ NumaNodeResource numaNode = new NumaNodeResource(nodeId, memory, cpus);
+ numaNodesList.add(numaNode);
+ numaNodeIdVsResource.put(nodeId, numaNode);
+ }
+
+ /**
+ * Allocates the available NUMA nodes for the requested containerId with
+ * resource in a round robin fashion.
+ *
+ * @param container the container to allocate NUMA resources
+ * @return the assigned NUMA Node info or null if resources not available.
+ * @throws ResourceHandlerException when failed to store NUMA resources
+ */
+ public synchronized NumaResourceAllocation allocateNumaNodes(
+ Container container) throws ResourceHandlerException {
+ NumaResourceAllocation allocation = allocate(container.getContainerId(),
+ container.getResource());
+ if (allocation != null) {
+ try {
+ // Update state store.
+ context.getNMStateStore().storeAssignedResources(container,
+ NUMA_RESOURCE_TYPE, Arrays.asList(allocation));
+ } catch (IOException e) {
+ releaseNumaResource(container.getContainerId());
+ throw new ResourceHandlerException(e);
+ }
+ }
+ return allocation;
+ }
+
+ private NumaResourceAllocation allocate(ContainerId containerId,
+ Resource resource) {
+ for (int index = 0; index < numaNodesList.size(); index++) {
+ NumaNodeResource numaNode = numaNodesList
+ .get((currentAssignNode + index) % numaNodesList.size());
+ if (numaNode.isResourcesAvailable(resource)) {
+ numaNode.assignResources(resource, containerId);
+ LOG.info("Assigning NUMA node " + numaNode.getNodeId() + " for memory, "
+ + numaNode.getNodeId() + " for cpus for the " + containerId);
+ currentAssignNode = (currentAssignNode + index + 1)
+ % numaNodesList.size();
+ return new NumaResourceAllocation(numaNode.getNodeId(),
+ resource.getMemorySize(), numaNode.getNodeId(),
+ resource.getVirtualCores());
+ }
+ }
+
+ // If there is no single node matched for the container resource
+ // Check the NUMA nodes for Memory resources
+ NumaResourceAllocation assignedNumaNodeInfo = new NumaResourceAllocation();
+ long memreq = resource.getMemorySize();
+ for (NumaNodeResource numaNode : numaNodesList) {
+ long memrem = numaNode.assignAvailableMemory(memreq, containerId);
+ assignedNumaNodeInfo.addMemoryNode(numaNode.getNodeId(), memreq - memrem);
+ memreq = memrem;
+ if (memreq == 0) {
+ break;
+ }
+ }
+ if (memreq != 0) {
+ LOG.info("There is no available memory:" + resource.getMemorySize()
+ + " in numa nodes for " + containerId);
+ releaseNumaResource(containerId);
+ return null;
+ }
+
+ // Check the NUMA nodes for CPU resources
+ int cpusreq = resource.getVirtualCores();
+ for (int index = 0; index < numaNodesList.size(); index++) {
+ NumaNodeResource numaNode = numaNodesList
+ .get((currentAssignNode + index) % numaNodesList.size());
+ int cpusrem = numaNode.assignAvailableCpus(cpusreq, containerId);
+ assignedNumaNodeInfo.addCpuNode(numaNode.getNodeId(), cpusreq - cpusrem);
+ cpusreq = cpusrem;
+ if (cpusreq == 0) {
+ currentAssignNode = (currentAssignNode + index + 1)
+ % numaNodesList.size();
+ break;
+ }
+ }
+
+ if (cpusreq != 0) {
+ LOG.info("There are no available cpus:" + resource.getVirtualCores()
+ + " in numa nodes for " + containerId);
+ releaseNumaResource(containerId);
+ return null;
+ }
+ LOG.info("Assigning multiple NUMA nodes ("
+ + StringUtils.join(",", assignedNumaNodeInfo.getMemNodes())
+ + ") for memory, ("
+ + StringUtils.join(",", assignedNumaNodeInfo.getCpuNodes())
+ + ") for cpus for " + containerId);
+ return assignedNumaNodeInfo;
+ }
+
+ /**
+ * Release assigned NUMA resources for the container.
+ *
+ * @param containerId the container ID
+ */
+ public synchronized void releaseNumaResource(ContainerId containerId) {
+ LOG.info("Releasing the assigned NUMA resources for " + containerId);
+ for (NumaNodeResource numaNode : numaNodesList) {
+ numaNode.releaseResources(containerId);
+ }
+ }
+
+ /**
+ * Recovers assigned numa resources.
+ *
+ * @param containerId the container ID to recover resources
+ */
+ public synchronized void recoverNumaResource(ContainerId containerId) {
+ Container container = context.getContainers().get(containerId);
+ ResourceMappings resourceMappings = container.getResourceMappings();
+ List assignedResources = resourceMappings
+ .getAssignedResources(NUMA_RESOURCE_TYPE);
+ if (assignedResources.size() == 1) {
+ NumaResourceAllocation numaResourceAllocation =
+ (NumaResourceAllocation) assignedResources.get(0);
+ for (Entry nodeAndMemory : numaResourceAllocation
+ .getNodeVsMemory().entrySet()) {
+ numaNodeIdVsResource.get(nodeAndMemory.getKey())
+ .recoverMemory(containerId, nodeAndMemory.getValue());
+ }
+ for (Entry nodeAndCpus : numaResourceAllocation
+ .getNodeVsCpus().entrySet()) {
+ numaNodeIdVsResource.get(nodeAndCpus.getKey()).recoverCpus(containerId,
+ nodeAndCpus.getValue());
+ }
+ } else {
+ LOG.error("Unexpected number:" + assignedResources.size()
+ + " of assigned numa resources for " + containerId
+ + " while recovering.");
+ }
+ }
+
+ @VisibleForTesting
+ Collection getNumaNodesList() {
+ return numaNodesList;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceHandlerImpl.java
new file mode 100644
index 0000000000..128dacaea4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceHandlerImpl.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation.OperationType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+
+/**
+ * ResourceHandler implementation for allocating NUMA Resources to each
+ * container.
+ */
+public class NumaResourceHandlerImpl implements ResourceHandler {
+
+ private static final Log LOG = LogFactory
+ .getLog(NumaResourceHandlerImpl.class);
+ private NumaResourceAllocator numaResourceAllocator;
+ private String numaCtlCmd;
+
+ public NumaResourceHandlerImpl(Configuration conf, Context nmContext) {
+ LOG.info("NUMA resources allocation is enabled, initializing NUMA resources"
+ + " allocator.");
+ numaResourceAllocator = new NumaResourceAllocator(nmContext);
+ numaCtlCmd = conf.get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD,
+ YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD);
+ }
+
+ @Override
+ public List bootstrap(Configuration configuration)
+ throws ResourceHandlerException {
+ try {
+ numaResourceAllocator.init(configuration);
+ } catch (YarnException e) {
+ throw new ResourceHandlerException(e);
+ }
+ return null;
+ }
+
+ @Override
+ public List preStart(Container container)
+ throws ResourceHandlerException {
+ List ret = null;
+ NumaResourceAllocation numaAllocation = numaResourceAllocator
+ .allocateNumaNodes(container);
+ if (numaAllocation != null) {
+ ret = new ArrayList<>();
+ ArrayList args = new ArrayList<>();
+ args.add(numaCtlCmd);
+ args.add(
+ "--interleave=" + String.join(",", numaAllocation.getMemNodes()));
+ args.add(
+ "--cpunodebind=" + String.join(",", numaAllocation.getCpuNodes()));
+ ret.add(new PrivilegedOperation(OperationType.ADD_NUMA_PARAMS, args));
+ }
+ return ret;
+ }
+
+ @Override
+ public List reacquireContainer(ContainerId containerId)
+ throws ResourceHandlerException {
+ try {
+ numaResourceAllocator.recoverNumaResource(containerId);
+ } catch (Throwable e) {
+ throw new ResourceHandlerException(
+ "Failed to recover numa resource for " + containerId, e);
+ }
+ return null;
+ }
+
+ @Override
+ public List postComplete(ContainerId containerId)
+ throws ResourceHandlerException {
+ numaResourceAllocator.releaseNumaResource(containerId);
+ return null;
+ }
+
+ @Override
+ public List teardown() throws ResourceHandlerException {
+ return null;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/package-info.java
new file mode 100644
index 0000000000..1540adf6a6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.
+ * resources.numa contains classes related to NM local scheduler allocators.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceAllocator.java
new file mode 100644
index 0000000000..e1ba19c5b8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceAllocator.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings.AssignedResources;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+/**
+ * Test class for NumaResourceAllocator.
+ */
+public class TestNumaResourceAllocator {
+
+ private Configuration conf;
+ private NumaResourceAllocator numaResourceAllocator;
+
+ @Before
+ public void setUp() throws IOException, YarnException {
+ conf = new YarnConfiguration();
+ Context mockContext = mock(Context.class);
+ @SuppressWarnings("unchecked")
+ ConcurrentHashMap mockContainers = mock(
+ ConcurrentHashMap.class);
+ Container mockContainer = mock(Container.class);
+ when(mockContainer.getResourceMappings())
+ .thenReturn(new ResourceMappings());
+ when(mockContainers.get(Matchers.any())).thenReturn(mockContainer);
+ when(mockContext.getContainers()).thenReturn(mockContainers);
+ NMStateStoreService mock = mock(NMStateStoreService.class);
+ when(mockContext.getNMStateStore()).thenReturn(mock);
+ numaResourceAllocator = new NumaResourceAllocator(mockContext);
+ setNumaTopologyConfigs();
+ numaResourceAllocator.init(conf);
+ }
+
+ @Test
+ public void testReadNumaTopologyFromConfigurations() throws Exception {
+ Collection nodesList = numaResourceAllocator
+ .getNumaNodesList();
+ Collection expectedNodesList = getExpectedNumaNodesList();
+ Assert.assertEquals(expectedNodesList, nodesList);
+ }
+
+ @Test
+ public void testReadNumaTopologyFromCmdOutput() throws Exception {
+ conf.setBoolean(YarnConfiguration.NM_NUMA_AWARENESS_READ_TOPOLOGY, true);
+ String cmdOutput = "available: 2 nodes (0-1)\n\t"
+ + "node 0 cpus: 0 2 4 6\n\t"
+ + "node 0 size: 73717 MB\n\t"
+ + "node 0 free: 17272 MB\n\t"
+ + "node 1 cpus: 1 3 5 7\n\t"
+ + "node 1 size: 73727 MB\n\t"
+ + "node 1 free: 10699 MB\n\t"
+ + "node distances:\n\t"
+ + "node 0 1\n\t"
+ + "0: 10 20\n\t"
+ + "1: 20 10";
+ numaResourceAllocator = new NumaResourceAllocator(mock(Context.class)) {
+ @Override
+ String executeNGetCmdOutput(Configuration config)
+ throws YarnRuntimeException {
+ return cmdOutput;
+ }
+ };
+ numaResourceAllocator.init(conf);
+ Collection nodesList = numaResourceAllocator
+ .getNumaNodesList();
+ Collection expectedNodesList = getExpectedNumaNodesList();
+ Assert.assertEquals(expectedNodesList, nodesList);
+ }
+
+ @Test
+ public void testAllocateNumaNode() throws Exception {
+ NumaResourceAllocation nodeInfo = numaResourceAllocator
+ .allocateNumaNodes(getContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000001"),
+ Resource.newInstance(2048, 2)));
+ Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes()));
+ Assert.assertEquals("0", String.join(",", nodeInfo.getCpuNodes()));
+ }
+
+ @Test
+ public void testAllocateNumaNodeWithRoundRobinFashionAssignment()
+ throws Exception {
+ NumaResourceAllocation nodeInfo1 = numaResourceAllocator
+ .allocateNumaNodes(getContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000001"),
+ Resource.newInstance(2048, 2)));
+ Assert.assertEquals("0", String.join(",", nodeInfo1.getMemNodes()));
+ Assert.assertEquals("0", String.join(",", nodeInfo1.getCpuNodes()));
+
+ NumaResourceAllocation nodeInfo2 = numaResourceAllocator
+ .allocateNumaNodes(getContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000002"),
+ Resource.newInstance(2048, 2)));
+ Assert.assertEquals("1", String.join(",", nodeInfo2.getMemNodes()));
+ Assert.assertEquals("1", String.join(",", nodeInfo2.getCpuNodes()));
+
+ NumaResourceAllocation nodeInfo3 = numaResourceAllocator
+ .allocateNumaNodes(getContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000003"),
+ Resource.newInstance(2048, 2)));
+ Assert.assertEquals("0", String.join(",", nodeInfo3.getMemNodes()));
+ Assert.assertEquals("0", String.join(",", nodeInfo3.getCpuNodes()));
+
+ NumaResourceAllocation nodeInfo4 = numaResourceAllocator
+ .allocateNumaNodes(getContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000003"),
+ Resource.newInstance(2048, 2)));
+ Assert.assertEquals("1", String.join(",", nodeInfo4.getMemNodes()));
+ Assert.assertEquals("1", String.join(",", nodeInfo4.getCpuNodes()));
+ }
+
+ @Test
+ public void testAllocateNumaNodeWithMultipleNodesForMemory()
+ throws Exception {
+ NumaResourceAllocation nodeInfo = numaResourceAllocator
+ .allocateNumaNodes(getContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000001"),
+ Resource.newInstance(102400, 2)));
+ Assert.assertEquals("0,1", String.join(",", nodeInfo.getMemNodes()));
+ Assert.assertEquals("0", String.join(",", nodeInfo.getCpuNodes()));
+ }
+
+ @Test
+ public void testAllocateNumaNodeWithMultipleNodesForCpus() throws Exception {
+ NumaResourceAllocation nodeInfo = numaResourceAllocator
+ .allocateNumaNodes(getContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000001"),
+ Resource.newInstance(2048, 6)));
+ Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes()));
+ Assert.assertEquals("0,1", String.join(",", nodeInfo.getCpuNodes()));
+ }
+
+ @Test
+ public void testAllocateNumaNodeWhenNoNumaMemResourcesAvailable()
+ throws Exception {
+ NumaResourceAllocation nodeInfo = numaResourceAllocator
+ .allocateNumaNodes(getContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000001"),
+ Resource.newInstance(2048000, 6)));
+ Assert.assertNull("Should not assign numa nodes when there"
+ + " are no sufficient memory resources available.", nodeInfo);
+ }
+
+ @Test
+ public void testAllocateNumaNodeWhenNoNumaCpuResourcesAvailable()
+ throws Exception {
+ NumaResourceAllocation nodeInfo = numaResourceAllocator
+ .allocateNumaNodes(getContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000001"),
+ Resource.newInstance(2048, 600)));
+ Assert.assertNull("Should not assign numa nodes when there"
+ + " are no sufficient cpu resources available.", nodeInfo);
+ }
+
+ @Test
+ public void testReleaseNumaResourcess() throws Exception {
+ NumaResourceAllocation nodeInfo = numaResourceAllocator
+ .allocateNumaNodes(getContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000001"),
+ Resource.newInstance(2048, 8)));
+ Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes()));
+ Assert.assertEquals("0,1", String.join(",", nodeInfo.getCpuNodes()));
+
+ // Request the resource when all cpu nodes occupied
+ nodeInfo = numaResourceAllocator.allocateNumaNodes(getContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000002"),
+ Resource.newInstance(2048, 4)));
+ Assert.assertNull("Should not assign numa nodes when there"
+ + " are no sufficient cpu resources available.", nodeInfo);
+
+ // Release the resources
+ numaResourceAllocator.releaseNumaResource(
+ ContainerId.fromString("container_1481156246874_0001_01_000001"));
+ // Request the resources
+ nodeInfo = numaResourceAllocator.allocateNumaNodes(getContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000003"),
+ Resource.newInstance(1024, 2)));
+ Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes()));
+ Assert.assertEquals("0", String.join(",", nodeInfo.getCpuNodes()));
+ }
+
+ @Test
+ public void testRecoverNumaResource() throws Exception {
+ @SuppressWarnings("unchecked")
+ ConcurrentHashMap mockContainers = mock(
+ ConcurrentHashMap.class);
+ Context mockContext = mock(Context.class);
+ Container mockContainer = mock(Container.class);
+ ResourceMappings value = new ResourceMappings();
+ AssignedResources assignedResources = new AssignedResources();
+ assignedResources.updateAssignedResources(
+ Arrays.asList(new NumaResourceAllocation("0", 70000, "0", 4)));
+ value.addAssignedResources("numa", assignedResources);
+ when(mockContainer.getResourceMappings()).thenReturn(value);
+ when(mockContainers.get(Matchers.any())).thenReturn(mockContainer);
+ when(mockContext.getContainers()).thenReturn(mockContainers);
+ NMStateStoreService mock = mock(NMStateStoreService.class);
+ when(mockContext.getNMStateStore()).thenReturn(mock);
+ numaResourceAllocator = new NumaResourceAllocator(mockContext);
+ numaResourceAllocator.init(conf);
+ // Recover the resources
+ numaResourceAllocator.recoverNumaResource(
+ ContainerId.fromString("container_1481156246874_0001_01_000001"));
+
+ // Request resources based on the availability
+ NumaResourceAllocation numaNode = numaResourceAllocator
+ .allocateNumaNodes(getContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000005"),
+ Resource.newInstance(2048, 1)));
+ assertEquals("1", String.join(",", numaNode.getMemNodes()));
+ assertEquals("1", String.join(",", numaNode.getCpuNodes()));
+
+ // Request resources more than the available
+ numaNode = numaResourceAllocator.allocateNumaNodes(getContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000006"),
+ Resource.newInstance(2048, 4)));
+ assertNull(numaNode);
+ }
+
+ private void setNumaTopologyConfigs() {
+ conf.set(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS, "0,1");
+ conf.set("yarn.nodemanager.numa-awareness.0.memory", "73717");
+ conf.set("yarn.nodemanager.numa-awareness.0.cpus", "4");
+ conf.set("yarn.nodemanager.numa-awareness.1.memory", "73727");
+ conf.set("yarn.nodemanager.numa-awareness.1.cpus", "4");
+ }
+
+ private Collection getExpectedNumaNodesList() {
+ Collection expectedNodesList = new ArrayList<>(2);
+ expectedNodesList.add(new NumaNodeResource("0", 73717, 4));
+ expectedNodesList.add(new NumaNodeResource("1", 73727, 4));
+ return expectedNodesList;
+ }
+
+ private Container getContainer(ContainerId containerId, Resource resource) {
+ Container mockContainer = mock(Container.class);
+ when(mockContainer.getContainerId()).thenReturn(containerId);
+ when(mockContainer.getResource()).thenReturn(resource);
+ return mockContainer;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceHandlerImpl.java
new file mode 100644
index 0000000000..341cbf0a81
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceHandlerImpl.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings.AssignedResources;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+/**
+ * Test class for NumaResourceHandlerImpl.
+ *
+ */
+public class TestNumaResourceHandlerImpl {
+
+ private YarnConfiguration conf;
+ private NumaResourceHandlerImpl numaResourceHandler;
+ private Container mockContainer;
+
+ @Before
+ public void setUp() throws IOException, ResourceHandlerException {
+ conf = new YarnConfiguration();
+ setNumaTopologyConfigs();
+ Context mockContext = createAndGetMockContext();
+ NMStateStoreService mock = mock(NMStateStoreService.class);
+ when(mockContext.getNMStateStore()).thenReturn(mock);
+ numaResourceHandler = new NumaResourceHandlerImpl(conf, mockContext);
+ numaResourceHandler.bootstrap(conf);
+ mockContainer = mock(Container.class);
+ }
+
+ @Test
+ public void testAllocateNumaMemoryResource() throws ResourceHandlerException {
+ // allocates node 0 for memory and cpu
+ testAllocateNumaResource("container_1481156246874_0001_01_000001",
+ Resource.newInstance(2048, 2), "0", "0");
+ // allocates node 1 for memory and cpu since allocator uses round
+ // robin assignment
+ testAllocateNumaResource("container_1481156246874_0001_01_000002",
+ Resource.newInstance(60000, 2), "1", "1");
+ // allocates node 0,1 for memory since there is no sufficient memory in any
+ // one node
+ testAllocateNumaResource("container_1481156246874_0001_01_000003",
+ Resource.newInstance(80000, 2), "0,1", "0");
+ // returns null since there are no sufficient resources available for the
+ // request
+ when(mockContainer.getContainerId()).thenReturn(
+ ContainerId.fromString("container_1481156246874_0001_01_000004"));
+ when(mockContainer.getResource())
+ .thenReturn(Resource.newInstance(80000, 2));
+ assertNull(numaResourceHandler.preStart(mockContainer));
+ // allocates node 1 for memory and cpu
+ testAllocateNumaResource("container_1481156246874_0001_01_000005",
+ Resource.newInstance(1024, 2), "1", "1");
+ }
+
+ @Test
+ public void testAllocateNumaCpusResource() throws ResourceHandlerException {
+ // allocates node 0 for memory and cpu
+ testAllocateNumaResource("container_1481156246874_0001_01_000001",
+ Resource.newInstance(2048, 2), "0", "0");
+ // allocates node 1 for memory and cpu since allocator uses round
+ // robin assignment
+ testAllocateNumaResource("container_1481156246874_0001_01_000002",
+ Resource.newInstance(2048, 2), "1", "1");
+ // allocates node 0,1 for cpus since there is are no sufficient cpus
+ // available in any one node
+ testAllocateNumaResource("container_1481156246874_0001_01_000003",
+ Resource.newInstance(2048, 3), "0", "0,1");
+ // returns null since there are no sufficient resources available for the
+ // request
+ when(mockContainer.getContainerId()).thenReturn(
+ ContainerId.fromString("container_1481156246874_0001_01_000004"));
+ when(mockContainer.getResource()).thenReturn(Resource.newInstance(2048, 2));
+ assertNull(numaResourceHandler.preStart(mockContainer));
+ // allocates node 1 for memory and cpu
+ testAllocateNumaResource("container_1481156246874_0001_01_000005",
+ Resource.newInstance(2048, 1), "1", "1");
+ }
+
+ @Test
+ public void testReacquireContainer() throws Exception {
+ @SuppressWarnings("unchecked")
+ ConcurrentHashMap mockContainers = mock(
+ ConcurrentHashMap.class);
+ Context mockContext = mock(Context.class);
+ NMStateStoreService mock = mock(NMStateStoreService.class);
+ when(mockContext.getNMStateStore()).thenReturn(mock);
+ ResourceMappings resourceMappings = new ResourceMappings();
+ AssignedResources assignedRscs = new AssignedResources();
+ NumaResourceAllocation numaResourceAllocation = new NumaResourceAllocation(
+ "0", 70000, "0", 4);
+ assignedRscs.updateAssignedResources(Arrays.asList(numaResourceAllocation));
+ resourceMappings.addAssignedResources("numa", assignedRscs);
+ when(mockContainer.getResourceMappings()).thenReturn(resourceMappings);
+ when(mockContainers.get(Matchers.any())).thenReturn(mockContainer);
+ when(mockContext.getContainers()).thenReturn(mockContainers);
+ numaResourceHandler = new NumaResourceHandlerImpl(conf, mockContext);
+ numaResourceHandler.bootstrap(conf);
+ // recovered numa resources should be added to the used resources and
+ // remaining will be available for further allocation.
+ numaResourceHandler.reacquireContainer(
+ ContainerId.fromString("container_1481156246874_0001_01_000001"));
+
+ testAllocateNumaResource("container_1481156246874_0001_01_000005",
+ Resource.newInstance(2048, 1), "1", "1");
+ when(mockContainer.getContainerId()).thenReturn(
+ ContainerId.fromString("container_1481156246874_0001_01_000005"));
+ when(mockContainer.getResource()).thenReturn(Resource.newInstance(2048, 4));
+ List preStart = numaResourceHandler
+ .preStart(mockContainer);
+ assertNull(preStart);
+ }
+
+ private void setNumaTopologyConfigs() {
+ conf.set(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS, "0,1");
+ conf.set("yarn.nodemanager.numa-awareness.0.memory", "73717");
+ conf.set("yarn.nodemanager.numa-awareness.0.cpus", "4");
+ conf.set("yarn.nodemanager.numa-awareness.1.memory", "73727");
+ conf.set("yarn.nodemanager.numa-awareness.1.cpus", "4");
+ }
+
+ private Context createAndGetMockContext() {
+ Context mockContext = mock(Context.class);
+ @SuppressWarnings("unchecked")
+ ConcurrentHashMap mockContainers = mock(
+ ConcurrentHashMap.class);
+ mockContainer = mock(Container.class);
+ when(mockContainer.getResourceMappings())
+ .thenReturn(new ResourceMappings());
+ when(mockContainers.get(Matchers.any())).thenReturn(mockContainer);
+ when(mockContext.getContainers()).thenReturn(mockContainers);
+ return mockContext;
+ }
+
+ private void testAllocateNumaResource(String containerId, Resource resource,
+ String memNodes, String cpuNodes) throws ResourceHandlerException {
+ when(mockContainer.getContainerId())
+ .thenReturn(ContainerId.fromString(containerId));
+ when(mockContainer.getResource()).thenReturn(resource);
+ List preStart = numaResourceHandler
+ .preStart(mockContainer);
+ List arguments = preStart.get(0).getArguments();
+ assertEquals(arguments, Arrays.asList("/usr/bin/numactl",
+ "--interleave=" + memNodes, "--cpunodebind=" + cpuNodes));
+ }
+}