YARN-10935. AM Total Queue Limit goes below per-user AM Limit if parent is full. Contributed by Eric Payne.

This commit is contained in:
Eric Badger 2021-09-15 20:03:45 +00:00
parent 3aa76f7e48
commit 43f0a34dd4
2 changed files with 81 additions and 2 deletions

View File

@ -800,8 +800,13 @@ public Resource calculateAndGetAMResourceLimitPerPartition(
// Current usable resource for this queue and partition is the max of // Current usable resource for this queue and partition is the max of
// queueCurrentLimit and queuePartitionResource. // queueCurrentLimit and queuePartitionResource.
Resource queuePartitionUsableResource = Resources.max(resourceCalculator, // If any of the resources available to this queue are less than queue's
lastClusterResource, queueCurrentLimit, queuePartitionResource); // guarantee, use the guarantee as the queuePartitionUsableResource
// because nothing less than the queue's guarantee should be used when
// calculating the AM limit.
Resource queuePartitionUsableResource = (Resources.fitsIn(
resourceCalculator, queuePartitionResource, queueCurrentLimit)) ?
queueCurrentLimit : queuePartitionResource;
Resource amResouceLimit = Resources.multiplyAndNormalizeUp( Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionUsableResource, amResourcePercent, resourceCalculator, queuePartitionUsableResource, amResourcePercent,

View File

@ -69,6 +69,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
@ -942,4 +943,77 @@ public RMNodeLabelsManager createNodeLabelManager() {
rm.killApp(app14.getApplicationId()); rm.killApp(app14.getApplicationId());
rm.stop(); rm.stop();
} }
// Test that max AM limit is correct in the case where one resource is
// depleted but the other is not. Use DominantResourceCalculator.
@Test
public void testAMResourceLimitWithDRCAndFullParent() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
csConf.setFloat(CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.3f);
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(conf);
when(csContext.getMinimumResourceCapability()).
thenReturn(Resources.createResource(GB));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB));
when(csContext.getResourceCalculator()).
thenReturn(new DominantResourceCalculator());
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
// Total cluster resources.
Resource clusterResource = Resources.createResource(100 * GB, 1000);
when(csContext.getClusterResource()).thenReturn(clusterResource);
// Set up queue hierarchy.
CSQueueStore queues = new CSQueueStore();
CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
csConf, null, "root", queues, queues, TestUtils.spyHook);
rootQueue.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Queue "queueA" has a 30% capacity guarantee. The max pct of "queueA" that
// can be used for AMs is 30%. So, 30% of <memory: 100GB, vCores: 1000> is
// <memory: 30GB, vCores: 30>, which is the guaranteed capacity of "queueA".
// 30% of that (rounded to the nearest 1GB) is <memory: 9GB, vCores: 9>. The
// max AM queue limit should never be less than that for any resource.
LeafQueue queueA = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A));
queueA.setCapacity(30.0f);
queueA.setUserLimitFactor(10f);
queueA.setMaxAMResourcePerQueuePercent(0.3f);
// Make sure "queueA" knows the total cluster resource.
queueA.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
// Get "queueA"'s guaranteed capacity (<memory: 30GB, vCores: 300>).
Resource capacity =
Resources.multiply(clusterResource, (queueA.getCapacity()/100));
// Limit is the actual resources available to "queueA". The following
// simulates the case where a second queue ("queueB") has "borrowed" almost
// all of "queueA"'s resources because "queueB" has a max capacity of 100%
// and has gone well over its guaranteed capacity. In this case, "queueB"
// has used 99GB of memory and used 505 vCores. This is to make vCores
// dominant in the calculations for the available resources.
when(queueA.getEffectiveCapacity(any())).thenReturn(capacity);
Resource limit = Resource.newInstance(1024, 495);
ResourceLimits currentResourceLimits =
new ResourceLimits(limit, Resources.none());
queueA.updateClusterResource(clusterResource, currentResourceLimits);
Resource expectedAmLimit = Resources.multiply(capacity,
queueA.getMaxAMResourcePerQueuePercent());
Resource amLimit = queueA.calculateAndGetAMResourceLimit();
assertTrue("AM memory limit is less than expected: Expected: " +
expectedAmLimit.getMemorySize() + "; Computed: "
+ amLimit.getMemorySize(),
amLimit.getMemorySize() >= expectedAmLimit.getMemorySize());
assertTrue("AM vCore limit is less than expected: Expected: " +
expectedAmLimit.getVirtualCores() + "; Computed: "
+ amLimit.getVirtualCores(),
amLimit.getVirtualCores() >= expectedAmLimit.getVirtualCores());
}
} }