YARN-7541. Node updates don't update the maximum cluster capability for resources other than CPU and memory

This commit is contained in:
Daniel Templeton 2017-11-29 10:36:19 -08:00
parent 301641811d
commit 8498d287cd
5 changed files with 229 additions and 32 deletions

View File

@ -21,11 +21,9 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang.NotImplementedException;
import org.apache.curator.shaded.com.google.common.reflect.ClassPath;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -89,6 +87,26 @@ public static Resource newInstance(long memory, int vCores) {
return new LightWeightResource(memory, vCores); return new LightWeightResource(memory, vCores);
} }
/**
* Create a new {@link Resource} instance with the given CPU and memory
* values and additional resource values as set in the {@code others}
* parameter. Note that the CPU and memory settings in the {@code others}
* parameter will be ignored.
*
* @param memory the memory value
* @param vCores the CPU value
* @param others a map of other resource values indexed by resource name
* @return a {@link Resource} instance with the given resource values
*/
@Public
@Stable
public static Resource newInstance(long memory, int vCores,
Map<String, Long> others) {
ResourceInformation[] info = ResourceUtils.createResourceTypesArray(others);
return new LightWeightResource(memory, vCores, info);
}
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public static Resource newInstance(Resource resource) { public static Resource newInstance(Resource resource) {

View File

@ -629,4 +629,33 @@ public static List<ResourceInformation> getRequestedResourcesFromConfig(
return result; return result;
} }
/**
* Create an array of {@link ResourceInformation} objects corresponding to
* the passed in map of names to values. The array will be ordered according
* to the order returned by {@link #getResourceTypesArray()}. The value of
* each resource type in the returned array will either be the value given for
* that resource in the {@code res} parameter or, if none is given, 0.
*
* @param res the map of resource type values
* @return an array of {@link ResourceInformation} instances
*/
public static ResourceInformation[] createResourceTypesArray(Map<String,
Long> res) {
ResourceInformation[] info = new ResourceInformation[resourceTypes.size()];
for (Entry<String, Integer> entry : RESOURCE_NAME_TO_INDEX.entrySet()) {
int index = entry.getValue();
Long value = res.get(entry.getKey());
if (value == null) {
value = 0L;
}
info[index] = new ResourceInformation();
ResourceInformation.copy(resourceTypesArray[index], info[index]);
info[index].setValue(value);
}
return info;
}
} }

View File

@ -24,11 +24,14 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
@ -60,11 +63,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
Resources.clone(Resources.none()); Resources.clone(Resources.none());
// Max allocation // Max allocation
private long maxNodeMemory = -1; private final long[] maxAllocation;
private int maxNodeVCores = -1;
private Resource configuredMaxAllocation; private Resource configuredMaxAllocation;
private boolean forceConfiguredMaxAllocation = true; private boolean forceConfiguredMaxAllocation = true;
private long configuredMaxAllocationWaitTime; private long configuredMaxAllocationWaitTime;
private boolean reportedMaxAllocation = false;
public ClusterNodeTracker() {
maxAllocation = new long[ResourceUtils.getNumberOfKnownResourceTypes()];
Arrays.fill(maxAllocation, -1);
}
public void addNode(N node) { public void addNode(N node) {
writeLock.lock(); writeLock.lock();
@ -208,17 +216,18 @@ public Resource getMaxAllowedAllocation() {
forceConfiguredMaxAllocation = false; forceConfiguredMaxAllocation = false;
} }
if (forceConfiguredMaxAllocation if (forceConfiguredMaxAllocation || !reportedMaxAllocation) {
|| maxNodeMemory == -1 || maxNodeVCores == -1) {
return configuredMaxAllocation; return configuredMaxAllocation;
} }
Resource ret = Resources.clone(configuredMaxAllocation); Resource ret = Resources.clone(configuredMaxAllocation);
if (ret.getMemorySize() > maxNodeMemory) {
ret.setMemorySize(maxNodeMemory); for (int i = 0; i < maxAllocation.length; i++) {
ResourceInformation info = ret.getResourceInformation(i);
if (info.getValue() > maxAllocation[i]) {
info.setValue(maxAllocation[i]);
} }
if (ret.getVirtualCores() > maxNodeVCores) {
ret.setVirtualCores(maxNodeVCores);
} }
return ret; return ret;
@ -229,31 +238,51 @@ public Resource getMaxAllowedAllocation() {
private void updateMaxResources(SchedulerNode node, boolean add) { private void updateMaxResources(SchedulerNode node, boolean add) {
Resource totalResource = node.getTotalResource(); Resource totalResource = node.getTotalResource();
ResourceInformation[] totalResources;
if (totalResource != null) {
totalResources = totalResource.getResources();
} else {
LOG.warn(node.getNodeName() + " reported in with null resources, which "
+ "indicates a problem in the source code. Please file an issue at "
+ "https://issues.apache.org/jira/secure/CreateIssue!default.jspa");
return;
}
writeLock.lock(); writeLock.lock();
try { try {
if (add) { // added node if (add) { // added node
long nodeMemory = totalResource.getMemorySize(); // If we add a node, we must have a max allocation for all resource
if (nodeMemory > maxNodeMemory) { // types
maxNodeMemory = nodeMemory; reportedMaxAllocation = true;
for (int i = 0; i < maxAllocation.length; i++) {
long value = totalResources[i].getValue();
if (value > maxAllocation[i]) {
maxAllocation[i] = value;
} }
int nodeVCores = totalResource.getVirtualCores();
if (nodeVCores > maxNodeVCores) {
maxNodeVCores = nodeVCores;
} }
} else { // removed node } else { // removed node
if (maxNodeMemory == totalResource.getMemorySize()) { boolean recalculate = false;
maxNodeMemory = -1;
for (int i = 0; i < maxAllocation.length; i++) {
if (totalResources[i].getValue() == maxAllocation[i]) {
// No need to set reportedMaxAllocation to false here because we
// will recalculate before we release the lock.
maxAllocation[i] = -1;
recalculate = true;
} }
if (maxNodeVCores == totalResource.getVirtualCores()) {
maxNodeVCores = -1;
} }
// We only have to iterate through the nodes if the current max memory // We only have to iterate through the nodes if the current max memory
// or vcores was equal to the removed node's // or vcores was equal to the removed node's
if (maxNodeMemory == -1 || maxNodeVCores == -1) { if (recalculate) {
// Treat it like an empty cluster and add nodes // Treat it like an empty cluster and add nodes
for (N n : nodes.values()) { reportedMaxAllocation = false;
updateMaxResources(n, true); nodes.values().forEach(n -> updateMaxResources(n, true));
}
} }
} }
} finally { } finally {

View File

@ -49,6 +49,10 @@ public class MockNodes {
private static int NODE_ID = 0; private static int NODE_ID = 0;
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
public static void resetHostIds() {
NODE_ID = 0;
}
public static List<RMNode> newNodes(int racks, int nodesPerRack, public static List<RMNode> newNodes(int racks, int nodesPerRack,
Resource perNode) { Resource perNode) {
List<RMNode> list = Lists.newArrayList(); List<RMNode> list = Lists.newArrayList();

View File

@ -17,16 +17,21 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.List;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
/** /**
@ -34,11 +39,15 @@
* loss of generality. * loss of generality.
*/ */
public class TestClusterNodeTracker { public class TestClusterNodeTracker {
private ClusterNodeTracker<FSSchedulerNode> nodeTracker = private ClusterNodeTracker<FSSchedulerNode> nodeTracker;
new ClusterNodeTracker<>();
@Before @Before
public void setup() { public void setup() {
nodeTracker = new ClusterNodeTracker<>();
}
private void addEight4x4Nodes() {
MockNodes.resetHostIds();
List<RMNode> rmNodes = List<RMNode> rmNodes =
MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4)); MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4));
for (RMNode rmNode : rmNodes) { for (RMNode rmNode : rmNodes) {
@ -48,6 +57,7 @@ public void setup() {
@Test @Test
public void testGetNodeCount() { public void testGetNodeCount() {
addEight4x4Nodes();
assertEquals("Incorrect number of nodes in the cluster", assertEquals("Incorrect number of nodes in the cluster",
8, nodeTracker.nodeCount()); 8, nodeTracker.nodeCount());
@ -57,6 +67,7 @@ public void testGetNodeCount() {
@Test @Test
public void testGetNodesForResourceName() throws Exception { public void testGetNodesForResourceName() throws Exception {
addEight4x4Nodes();
assertEquals("Incorrect number of nodes matching ANY", assertEquals("Incorrect number of nodes matching ANY",
8, nodeTracker.getNodesByResourceName(ResourceRequest.ANY).size()); 8, nodeTracker.getNodesByResourceName(ResourceRequest.ANY).size());
@ -66,4 +77,110 @@ public void testGetNodesForResourceName() throws Exception {
assertEquals("Incorrect number of nodes matching node", assertEquals("Incorrect number of nodes matching node",
1, nodeTracker.getNodesByResourceName("host0").size()); 1, nodeTracker.getNodesByResourceName("host0").size());
} }
@Test
public void testMaxAllowedAllocation() {
// Add a third resource
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RESOURCE_TYPES, "test1");
ResourceUtils.resetResourceTypes(conf);
setup();
Resource maximum = Resource.newInstance(10240, 10,
Collections.singletonMap("test1", 10L));
nodeTracker.setConfiguredMaxAllocation(maximum);
Resource result = nodeTracker.getMaxAllowedAllocation();
assertEquals("With no nodes added, the ClusterNodeTracker did not return "
+ "the configured max allocation", maximum, result);
List<RMNode> smallNodes =
MockNodes.newNodes(1, 1, Resource.newInstance(1024, 2,
Collections.singletonMap("test1", 4L)));
FSSchedulerNode smallNode = new FSSchedulerNode(smallNodes.get(0), false);
List<RMNode> mediumNodes =
MockNodes.newNodes(1, 1, Resource.newInstance(4096, 2,
Collections.singletonMap("test1", 2L)));
FSSchedulerNode mediumNode = new FSSchedulerNode(mediumNodes.get(0), false);
List<RMNode> largeNodes =
MockNodes.newNodes(1, 1, Resource.newInstance(16384, 4,
Collections.singletonMap("test1", 1L)));
FSSchedulerNode largeNode = new FSSchedulerNode(largeNodes.get(0), false);
nodeTracker.addNode(mediumNode);
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("With a single node added, the ClusterNodeTracker did not "
+ "return that node's resources as the maximum allocation",
mediumNodes.get(0).getTotalCapability(), result);
nodeTracker.addNode(smallNode);
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("With two nodes added, the ClusterNodeTracker did not "
+ "return a the maximum allocation that was the max of their aggregate "
+ "resources",
Resource.newInstance(4096, 2, Collections.singletonMap("test1", 4L)),
result);
nodeTracker.removeNode(smallNode.getNodeID());
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("After removing a node, the ClusterNodeTracker did not "
+ "recalculate the adjusted maximum allocation correctly",
mediumNodes.get(0).getTotalCapability(), result);
nodeTracker.addNode(largeNode);
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("With two nodes added, the ClusterNodeTracker did not "
+ "return a the maximum allocation that was the max of their aggregate "
+ "resources",
Resource.newInstance(10240, 4, Collections.singletonMap("test1", 2L)),
result);
nodeTracker.removeNode(largeNode.getNodeID());
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("After removing a node, the ClusterNodeTracker did not "
+ "recalculate the adjusted maximum allocation correctly",
mediumNodes.get(0).getTotalCapability(), result);
nodeTracker.removeNode(mediumNode.getNodeID());
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("After removing all nodes, the ClusterNodeTracker did not "
+ "return the configured maximum allocation", maximum, result);
nodeTracker.addNode(smallNode);
nodeTracker.addNode(mediumNode);
nodeTracker.addNode(largeNode);
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("With three nodes added, the ClusterNodeTracker did not "
+ "return a the maximum allocation that was the max of their aggregate "
+ "resources",
Resource.newInstance(10240, 4, Collections.singletonMap("test1", 4L)),
result);
nodeTracker.removeNode(smallNode.getNodeID());
nodeTracker.removeNode(mediumNode.getNodeID());
nodeTracker.removeNode(largeNode.getNodeID());
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("After removing all nodes, the ClusterNodeTracker did not "
+ "return the configured maximum allocation", maximum, result);
}
} }