YARN-3388. Allocation in LeafQueue could get stuck because DRF calculator isn't well supported when computing user-limit. (Nathan Roberts via wangda)
This commit is contained in:
parent
3d937457ee
commit
444b2ea7af
@ -20,6 +20,9 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -70,9 +73,11 @@
|
||||
import org.apache.hadoop.yarn.server.utils.Lock;
|
||||
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
@ -111,7 +116,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||
|
||||
// cache last cluster resource to compute actual capacity
|
||||
private Resource lastClusterResource = Resources.none();
|
||||
|
||||
|
||||
private final QueueResourceLimitsInfo queueResourceLimitsInfo =
|
||||
new QueueResourceLimitsInfo();
|
||||
|
||||
@ -119,6 +124,10 @@ public class LeafQueue extends AbstractCSQueue {
|
||||
|
||||
private OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
|
||||
|
||||
// Summation of consumed ratios for all users in queue
|
||||
private float totalUserConsumedRatio = 0;
|
||||
private UsageRatios qUsageRatios;
|
||||
|
||||
// record all ignore partition exclusivityRMContainer, this will be used to do
|
||||
// preemption, key is the partition of the RMContainer allocated on
|
||||
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
|
||||
@ -135,6 +144,8 @@ public LeafQueue(CapacitySchedulerContext cs,
|
||||
// One time initialization is enough since it is static ordering policy
|
||||
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
|
||||
|
||||
qUsageRatios = new UsageRatios();
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("LeafQueue:" + " name=" + queueName
|
||||
+ ", fullname=" + getQueuePath());
|
||||
@ -159,7 +170,7 @@ protected synchronized void setupQueueConfigs(Resource clusterResource)
|
||||
setQueueResourceLimitsInfo(clusterResource);
|
||||
|
||||
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
|
||||
|
||||
|
||||
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
|
||||
|
||||
userLimit = conf.getUserLimit(getQueuePath());
|
||||
@ -1149,6 +1160,9 @@ public boolean getRackLocalityFullReset() {
|
||||
private Resource computeUserLimit(FiCaSchedulerApp application,
|
||||
Resource clusterResource, User user,
|
||||
String nodePartition, SchedulingMode schedulingMode) {
|
||||
Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
|
||||
clusterResource);
|
||||
|
||||
// What is our current capacity?
|
||||
// * It is equal to the max(required, queue-capacity) if
|
||||
// we're running below capacity. The 'max' ensures that jobs in queues
|
||||
@ -1157,7 +1171,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application,
|
||||
// (usedResources + required) (which extra resources we are allocating)
|
||||
Resource queueCapacity =
|
||||
Resources.multiplyAndNormalizeUp(resourceCalculator,
|
||||
labelManager.getResourceByLabel(nodePartition, clusterResource),
|
||||
partitionResource,
|
||||
queueCapacities.getAbsoluteCapacity(nodePartition),
|
||||
minimumAllocation);
|
||||
|
||||
@ -1169,15 +1183,30 @@ private Resource computeUserLimit(FiCaSchedulerApp application,
|
||||
// Allow progress for queues with miniscule capacity
|
||||
queueCapacity =
|
||||
Resources.max(
|
||||
resourceCalculator, clusterResource,
|
||||
resourceCalculator, partitionResource,
|
||||
queueCapacity,
|
||||
required);
|
||||
|
||||
|
||||
/* We want to base the userLimit calculation on
|
||||
* max(queueCapacity, usedResources+required). However, we want
|
||||
* usedResources to be based on the combined ratios of all the users in the
|
||||
* queue so we use consumedRatio to calculate such.
|
||||
* The calculation is dependent on how the resourceCalculator calculates the
|
||||
* ratio between two Resources. DRF Example: If usedResources is
|
||||
* greater than queueCapacity and users have the following [mem,cpu] usages:
|
||||
* User1: [10%,20%] - Dominant resource is 20%
|
||||
* User2: [30%,10%] - Dominant resource is 30%
|
||||
* Then total consumedRatio is then 20+30=50%. Yes, this value can be
|
||||
* larger than 100% but for the purposes of making sure all users are
|
||||
* getting their fair share, it works.
|
||||
*/
|
||||
Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
|
||||
partitionResource, qUsageRatios.getUsageRatio(nodePartition),
|
||||
minimumAllocation);
|
||||
Resource currentCapacity =
|
||||
Resources.lessThan(resourceCalculator, clusterResource,
|
||||
queueUsage.getUsed(nodePartition), queueCapacity) ? queueCapacity
|
||||
: Resources.add(queueUsage.getUsed(nodePartition), required);
|
||||
|
||||
Resources.lessThan(resourceCalculator, partitionResource, consumed,
|
||||
queueCapacity) ? queueCapacity : Resources.add(consumed, required);
|
||||
// Never allow a single user to take more than the
|
||||
// queue's configured capacity * user-limit-factor.
|
||||
// Also, the queue's configured capacity should be higher than
|
||||
@ -1186,9 +1215,10 @@ private Resource computeUserLimit(FiCaSchedulerApp application,
|
||||
final int activeUsers = activeUsersManager.getNumActiveUsers();
|
||||
|
||||
// User limit resource is determined by:
|
||||
// max{currentCapacity / #activeUsers, currentCapacity * user-limit-percentage%)
|
||||
// max{currentCapacity / #activeUsers, currentCapacity *
|
||||
// user-limit-percentage%)
|
||||
Resource userLimitResource = Resources.max(
|
||||
resourceCalculator, clusterResource,
|
||||
resourceCalculator, partitionResource,
|
||||
Resources.divideAndCeil(
|
||||
resourceCalculator, currentCapacity, activeUsers),
|
||||
Resources.divideAndCeil(
|
||||
@ -1212,8 +1242,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application,
|
||||
maxUserLimit =
|
||||
Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor);
|
||||
} else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
|
||||
maxUserLimit =
|
||||
labelManager.getResourceByLabel(nodePartition, clusterResource);
|
||||
maxUserLimit = partitionResource;
|
||||
}
|
||||
|
||||
// Cap final user limit with maxUserLimit
|
||||
@ -1221,7 +1250,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application,
|
||||
Resources.roundUp(
|
||||
resourceCalculator,
|
||||
Resources.min(
|
||||
resourceCalculator, clusterResource,
|
||||
resourceCalculator, partitionResource,
|
||||
userLimitResource,
|
||||
maxUserLimit
|
||||
),
|
||||
@ -1229,18 +1258,22 @@ private Resource computeUserLimit(FiCaSchedulerApp application,
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
String userName = application.getUser();
|
||||
LOG.debug("User limit computation for " + userName +
|
||||
LOG.debug("User limit computation for " + userName +
|
||||
" in queue " + getQueueName() +
|
||||
" userLimitPercent=" + userLimit +
|
||||
" userLimitFactor=" + userLimitFactor +
|
||||
" required: " + required +
|
||||
" consumed: " + user.getUsed() +
|
||||
" required: " + required +
|
||||
" consumed: " + consumed +
|
||||
" user-limit-resource: " + userLimitResource +
|
||||
" queueCapacity: " + queueCapacity +
|
||||
" queueCapacity: " + queueCapacity +
|
||||
" qconsumed: " + queueUsage.getUsed() +
|
||||
" consumedRatio: " + totalUserConsumedRatio +
|
||||
" currentCapacity: " + currentCapacity +
|
||||
" activeUsers: " + activeUsers +
|
||||
" clusterCapacity: " + clusterResource
|
||||
" clusterCapacity: " + clusterResource +
|
||||
" resourceByLabel: " + partitionResource +
|
||||
" usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
|
||||
" Partition: " + nodePartition
|
||||
);
|
||||
}
|
||||
user.setUserResourceLimit(userLimitResource);
|
||||
@ -1347,6 +1380,42 @@ private void updateSchedulerHealthForCompletedContainer(
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized float calculateUserUsageRatio(Resource clusterResource,
|
||||
String nodePartition) {
|
||||
Resource resourceByLabel =
|
||||
labelManager.getResourceByLabel(nodePartition, clusterResource);
|
||||
float consumed = 0;
|
||||
User user;
|
||||
for (Map.Entry<String, User> entry : users.entrySet()) {
|
||||
user = entry.getValue();
|
||||
consumed += user.resetAndUpdateUsageRatio(resourceCalculator,
|
||||
resourceByLabel, nodePartition);
|
||||
}
|
||||
return consumed;
|
||||
}
|
||||
|
||||
private synchronized void recalculateQueueUsageRatio(Resource clusterResource,
|
||||
String nodePartition) {
|
||||
ResourceUsage queueResourceUsage = this.getQueueResourceUsage();
|
||||
|
||||
if (nodePartition == null) {
|
||||
for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(),
|
||||
queueResourceUsage.getNodePartitionsSet())) {
|
||||
qUsageRatios.setUsageRatio(partition,
|
||||
calculateUserUsageRatio(clusterResource, partition));
|
||||
}
|
||||
} else {
|
||||
qUsageRatios.setUsageRatio(nodePartition,
|
||||
calculateUserUsageRatio(clusterResource, nodePartition));
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void updateQueueUsageRatio(String nodePartition,
|
||||
float delta) {
|
||||
qUsageRatios.incUsageRatio(nodePartition, delta);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void completedContainer(Resource clusterResource,
|
||||
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
|
||||
@ -1384,7 +1453,7 @@ public void completedContainer(Resource clusterResource,
|
||||
removed =
|
||||
application.containerCompleted(rmContainer, containerStatus,
|
||||
event, node.getPartition());
|
||||
|
||||
|
||||
node.releaseContainer(container);
|
||||
}
|
||||
|
||||
@ -1417,6 +1486,8 @@ synchronized void allocateResource(Resource clusterResource,
|
||||
boolean isIncreasedAllocation) {
|
||||
super.allocateResource(clusterResource, resource, nodePartition,
|
||||
isIncreasedAllocation);
|
||||
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
|
||||
clusterResource);
|
||||
|
||||
// handle ignore exclusivity container
|
||||
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
|
||||
@ -1435,6 +1506,12 @@ synchronized void allocateResource(Resource clusterResource,
|
||||
String userName = application.getUser();
|
||||
User user = getUser(userName);
|
||||
user.assignContainer(resource, nodePartition);
|
||||
|
||||
// Update usage ratios
|
||||
updateQueueUsageRatio(nodePartition,
|
||||
user.updateUsageRatio(resourceCalculator, resourceByLabel,
|
||||
nodePartition));
|
||||
|
||||
// Note this is a bit unconventional since it gets the object and modifies
|
||||
// it here, rather then using set routine
|
||||
Resources.subtractFrom(application.getHeadroom(), resource); // headroom
|
||||
@ -1455,6 +1532,8 @@ synchronized void releaseResource(Resource clusterResource,
|
||||
RMContainer rmContainer, boolean isChangeResource) {
|
||||
super.releaseResource(clusterResource, resource, nodePartition,
|
||||
isChangeResource);
|
||||
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
|
||||
clusterResource);
|
||||
|
||||
// handle ignore exclusivity container
|
||||
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
|
||||
@ -1474,6 +1553,12 @@ synchronized void releaseResource(Resource clusterResource,
|
||||
String userName = application.getUser();
|
||||
User user = getUser(userName);
|
||||
user.releaseContainer(resource, nodePartition);
|
||||
|
||||
// Update usage ratios
|
||||
updateQueueUsageRatio(nodePartition,
|
||||
user.updateUsageRatio(resourceCalculator, resourceByLabel,
|
||||
nodePartition));
|
||||
|
||||
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -1513,7 +1598,10 @@ public synchronized void updateClusterResource(Resource clusterResource,
|
||||
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
|
||||
// during allocation
|
||||
setQueueResourceLimitsInfo(clusterResource);
|
||||
|
||||
|
||||
// Update user consumedRatios
|
||||
recalculateQueueUsageRatio(clusterResource, null);
|
||||
|
||||
// Update metrics
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, null);
|
||||
@ -1565,17 +1653,93 @@ public void decAMUsedResource(String nodeLabel, Resource resourceToDec,
|
||||
queueUsage.decAMUsed(nodeLabel, resourceToDec);
|
||||
}
|
||||
|
||||
/*
|
||||
* Usage Ratio
|
||||
*/
|
||||
static private class UsageRatios {
|
||||
private Map<String, Float> usageRatios;
|
||||
private ReadLock readLock;
|
||||
private WriteLock writeLock;
|
||||
|
||||
public UsageRatios() {
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
readLock = lock.readLock();
|
||||
writeLock = lock.writeLock();
|
||||
usageRatios = new HashMap<String, Float>();
|
||||
}
|
||||
|
||||
private void incUsageRatio(String label, float delta) {
|
||||
try {
|
||||
writeLock.lock();
|
||||
Float fl = usageRatios.get(label);
|
||||
if (null == fl) {
|
||||
fl = new Float(0.0);
|
||||
}
|
||||
fl += delta;
|
||||
usageRatios.put(label, new Float(fl));
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
float getUsageRatio(String label) {
|
||||
try {
|
||||
readLock.lock();
|
||||
Float f = usageRatios.get(label);
|
||||
if (null == f) {
|
||||
return 0.0f;
|
||||
}
|
||||
return f;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void setUsageRatio(String label, float ratio) {
|
||||
try {
|
||||
writeLock.lock();
|
||||
usageRatios.put(label, new Float(ratio));
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public float getUsageRatio(String label) {
|
||||
return qUsageRatios.getUsageRatio(label);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static class User {
|
||||
ResourceUsage userResourceUsage = new ResourceUsage();
|
||||
volatile Resource userResourceLimit = Resource.newInstance(0, 0);
|
||||
int pendingApplications = 0;
|
||||
int activeApplications = 0;
|
||||
private UsageRatios userUsageRatios = new UsageRatios();
|
||||
|
||||
public ResourceUsage getResourceUsage() {
|
||||
return userResourceUsage;
|
||||
}
|
||||
|
||||
public synchronized float resetAndUpdateUsageRatio(
|
||||
ResourceCalculator resourceCalculator,
|
||||
Resource resource, String nodePartition) {
|
||||
userUsageRatios.setUsageRatio(nodePartition, 0);
|
||||
return updateUsageRatio(resourceCalculator, resource, nodePartition);
|
||||
}
|
||||
|
||||
public synchronized float updateUsageRatio(
|
||||
ResourceCalculator resourceCalculator,
|
||||
Resource resource, String nodePartition) {
|
||||
float delta;
|
||||
float newRatio =
|
||||
Resources.ratio(resourceCalculator, getUsed(nodePartition), resource);
|
||||
delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
|
||||
userUsageRatios.setUsageRatio(nodePartition, newRatio);
|
||||
return delta;
|
||||
}
|
||||
|
||||
public Resource getUsed() {
|
||||
return userResourceUsage.getUsed();
|
||||
}
|
||||
@ -1713,7 +1877,7 @@ public synchronized void collectSchedulerApplications(
|
||||
.getSchedulableEntities()) {
|
||||
apps.add(pendingApp.getApplicationAttemptId());
|
||||
}
|
||||
for (FiCaSchedulerApp app :
|
||||
for (FiCaSchedulerApp app :
|
||||
orderingPolicy.getSchedulableEntities()) {
|
||||
apps.add(app.getApplicationAttemptId());
|
||||
}
|
||||
|
@ -41,6 +41,8 @@
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
@ -72,6 +74,7 @@
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
@ -83,6 +86,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.After;
|
||||
@ -97,6 +101,7 @@
|
||||
public class TestLeafQueue {
|
||||
private final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
|
||||
|
||||
RMContext rmContext;
|
||||
RMContext spyRMContext;
|
||||
@ -106,16 +111,29 @@ public class TestLeafQueue {
|
||||
CapacitySchedulerContext csContext;
|
||||
|
||||
CSQueue root;
|
||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||
Map<String, CSQueue> queues;
|
||||
|
||||
final static int GB = 1024;
|
||||
final static String DEFAULT_RACK = "/default";
|
||||
|
||||
private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
|
||||
private final ResourceCalculator resourceCalculator =
|
||||
new DefaultResourceCalculator();
|
||||
|
||||
private final ResourceCalculator dominantResourceCalculator =
|
||||
new DominantResourceCalculator();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
setUpInternal(resourceCalculator);
|
||||
}
|
||||
|
||||
private void setUpWithDominantResourceCalculator() throws Exception {
|
||||
setUpInternal(dominantResourceCalculator);
|
||||
}
|
||||
|
||||
private void setUpInternal(ResourceCalculator rC) throws Exception {
|
||||
CapacityScheduler spyCs = new CapacityScheduler();
|
||||
queues = new HashMap<String, CSQueue>();
|
||||
cs = spy(spyCs);
|
||||
rmContext = TestUtils.getMockRMContext();
|
||||
spyRMContext = spy(rmContext);
|
||||
@ -134,6 +152,8 @@ public void setUp() throws Exception {
|
||||
csConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
|
||||
csConf.setBoolean(
|
||||
"yarn.scheduler.capacity.reservations-continue-look-all-nodes", false);
|
||||
final String newRoot = "root" + System.currentTimeMillis();
|
||||
setupQueueConfiguration(csConf, newRoot);
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
@ -153,6 +173,7 @@ public void setUp() throws Exception {
|
||||
when(csContext.getResourceCalculator()).
|
||||
thenReturn(resourceCalculator);
|
||||
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
|
||||
when(csContext.getResourceCalculator()).thenReturn(rC);
|
||||
when(csContext.getRMContext()).thenReturn(rmContext);
|
||||
RMContainerTokenSecretManager containerTokenSecretManager =
|
||||
new RMContainerTokenSecretManager(conf);
|
||||
@ -179,7 +200,8 @@ public void setUp() throws Exception {
|
||||
.thenReturn(new YarnConfiguration());
|
||||
when(cs.getNumClusterNodes()).thenReturn(3);
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static final String A = "a";
|
||||
private static final String B = "b";
|
||||
private static final String C = "c";
|
||||
@ -608,14 +630,180 @@ public void testSingleQueueWithOneUser() throws Exception {
|
||||
assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()),
|
||||
a.getMetrics().getAvailableMB());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDRFUsageRatioRounding() throws Exception {
|
||||
CSAssignment assign;
|
||||
setUpWithDominantResourceCalculator();
|
||||
// Mock the queue
|
||||
LeafQueue b = stubLeafQueue((LeafQueue) queues.get(E));
|
||||
|
||||
// Users
|
||||
final String user0 = "user_0";
|
||||
|
||||
// Submit applications
|
||||
final ApplicationAttemptId appAttemptId0 =
|
||||
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||
FiCaSchedulerApp app0 =
|
||||
new FiCaSchedulerApp(appAttemptId0, user0, b,
|
||||
b.getActiveUsersManager(), spyRMContext);
|
||||
b.submitApplicationAttempt(app0, user0);
|
||||
|
||||
// Setup some nodes
|
||||
String host0 = "127.0.0.1";
|
||||
FiCaSchedulerNode node0 =
|
||||
TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 80 * GB, 100);
|
||||
|
||||
// Make cluster relatively large so usageRatios are small
|
||||
int numNodes = 1000;
|
||||
Resource clusterResource =
|
||||
Resources.createResource(numNodes * (80 * GB), numNodes * 100);
|
||||
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||
|
||||
// Set user-limit. Need a small queue within a large cluster.
|
||||
b.setUserLimit(50);
|
||||
b.setUserLimitFactor(1000000);
|
||||
b.setMaxCapacity(1.0f);
|
||||
b.setAbsoluteCapacity(0.00001f);
|
||||
|
||||
// First allocation is larger than second but is still vcore dominant
|
||||
// so usage ratio will be based on vcores. If consumedRatio doesn't round
|
||||
// in our favor then new limit calculation will actually be less than
|
||||
// what is currently consumed and we will fail to allocate
|
||||
Priority priority = TestUtils.createMockPriority(1);
|
||||
app0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||
.createResourceRequest(ResourceRequest.ANY, 20 * GB, 29, 1, true,
|
||||
priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
|
||||
assign = b.assignContainers(clusterResource, node0, new ResourceLimits(
|
||||
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||
app0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||
.createResourceRequest(ResourceRequest.ANY, 10 * GB, 29, 2, true,
|
||||
priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
|
||||
assign = b.assignContainers(clusterResource, node0, new ResourceLimits(
|
||||
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||
assertTrue("Still within limits, should assign",
|
||||
assign.getResource().getMemorySize() > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDRFUserLimits() throws Exception {
|
||||
setUpWithDominantResourceCalculator();
|
||||
|
||||
// Mock the queue
|
||||
LeafQueue b = stubLeafQueue((LeafQueue) queues.get(B));
|
||||
// unset maxCapacity
|
||||
b.setMaxCapacity(1.0f);
|
||||
|
||||
// Users
|
||||
final String user0 = "user_0";
|
||||
final String user1 = "user_1";
|
||||
|
||||
// Submit applications
|
||||
final ApplicationAttemptId appAttemptId0 =
|
||||
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||
FiCaSchedulerApp app0 =
|
||||
new FiCaSchedulerApp(appAttemptId0, user0, b,
|
||||
b.getActiveUsersManager(), spyRMContext);
|
||||
b.submitApplicationAttempt(app0, user0);
|
||||
|
||||
final ApplicationAttemptId appAttemptId2 =
|
||||
TestUtils.getMockApplicationAttemptId(2, 0);
|
||||
FiCaSchedulerApp app2 =
|
||||
new FiCaSchedulerApp(appAttemptId2, user1, b,
|
||||
b.getActiveUsersManager(), spyRMContext);
|
||||
b.submitApplicationAttempt(app2, user1);
|
||||
|
||||
// Setup some nodes
|
||||
String host0 = "127.0.0.1";
|
||||
FiCaSchedulerNode node0 =
|
||||
TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 8 * GB, 100);
|
||||
String host1 = "127.0.0.2";
|
||||
FiCaSchedulerNode node1 =
|
||||
TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8 * GB, 100);
|
||||
|
||||
int numNodes = 2;
|
||||
Resource clusterResource =
|
||||
Resources.createResource(numNodes * (8 * GB), numNodes * 100);
|
||||
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||
|
||||
// Setup resource-requests so that one application is memory dominant
|
||||
// and other application is vcores dominant
|
||||
Priority priority = TestUtils.createMockPriority(1);
|
||||
app0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||
.createResourceRequest(ResourceRequest.ANY, 1 * GB, 40, 10, true,
|
||||
priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
|
||||
|
||||
app2.updateResourceRequests(Collections.singletonList(TestUtils
|
||||
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 10, 10, true,
|
||||
priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
|
||||
|
||||
/**
|
||||
* Start testing...
|
||||
*/
|
||||
|
||||
// Set user-limit
|
||||
b.setUserLimit(50);
|
||||
b.setUserLimitFactor(2);
|
||||
User queueUser0 = b.getUser(user0);
|
||||
User queueUser1 = b.getUser(user1);
|
||||
|
||||
assertEquals("There should 2 active users!", 2, b
|
||||
.getActiveUsersManager().getNumActiveUsers());
|
||||
// Fill both Nodes as far as we can
|
||||
CSAssignment assign;
|
||||
do {
|
||||
assign =
|
||||
b.assignContainers(clusterResource, node0, new ResourceLimits(
|
||||
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||
LOG.info(assign.toString());
|
||||
} while (assign.getResource().getMemorySize() > 0 &&
|
||||
assign.getAssignmentInformation().getNumReservations() == 0);
|
||||
do {
|
||||
assign =
|
||||
b.assignContainers(clusterResource, node1, new ResourceLimits(
|
||||
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||
} while (assign.getResource().getMemorySize() > 0 &&
|
||||
assign.getAssignmentInformation().getNumReservations() == 0);
|
||||
//LOG.info("user_0: " + queueUser0.getUsed());
|
||||
//LOG.info("user_1: " + queueUser1.getUsed());
|
||||
|
||||
assertTrue("Verify user_0 got resources ", queueUser0.getUsed()
|
||||
.getMemorySize() > 0);
|
||||
assertTrue("Verify user_1 got resources ", queueUser1.getUsed()
|
||||
.getMemorySize() > 0);
|
||||
assertTrue(
|
||||
"Exepected AbsoluteUsedCapacity > 0.95, got: "
|
||||
+ b.getAbsoluteUsedCapacity(), b.getAbsoluteUsedCapacity() > 0.95);
|
||||
|
||||
// Verify consumedRatio is based on dominant resources
|
||||
float expectedRatio =
|
||||
queueUser0.getUsed().getVirtualCores()
|
||||
/ (numNodes * 100.0f)
|
||||
+ queueUser1.getUsed().getMemorySize()
|
||||
/ (numNodes * 8.0f * GB);
|
||||
assertEquals(expectedRatio, b.getUsageRatio(""), 0.001);
|
||||
// Add another node and make sure consumedRatio is adjusted
|
||||
// accordingly.
|
||||
numNodes = 3;
|
||||
clusterResource =
|
||||
Resources.createResource(numNodes * (8 * GB), numNodes * 100);
|
||||
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||
root.updateClusterResource(clusterResource, new ResourceLimits(
|
||||
clusterResource));
|
||||
expectedRatio =
|
||||
queueUser0.getUsed().getVirtualCores()
|
||||
/ (numNodes * 100.0f)
|
||||
+ queueUser1.getUsed().getMemorySize()
|
||||
/ (numNodes * 8.0f * GB);
|
||||
assertEquals(expectedRatio, b.getUsageRatio(""), 0.001);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserLimits() throws Exception {
|
||||
// Mock the queue
|
||||
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||
//unset maxCapacity
|
||||
a.setMaxCapacity(1.0f);
|
||||
|
||||
|
||||
// Users
|
||||
final String user_0 = "user_0";
|
||||
final String user_1 = "user_1";
|
||||
|
@ -161,10 +161,17 @@ public static Priority createMockPriority( int priority) {
|
||||
public static ResourceRequest createResourceRequest(
|
||||
String resourceName, int memory, int numContainers, boolean relaxLocality,
|
||||
Priority priority, RecordFactory recordFactory, String labelExpression) {
|
||||
ResourceRequest request =
|
||||
return createResourceRequest(resourceName, memory, 1, numContainers,
|
||||
relaxLocality, priority, recordFactory, labelExpression);
|
||||
}
|
||||
|
||||
public static ResourceRequest createResourceRequest(String resourceName,
|
||||
int memory, int vcores, int numContainers, boolean relaxLocality,
|
||||
Priority priority, RecordFactory recordFactory, String labelExpression) {
|
||||
ResourceRequest request =
|
||||
recordFactory.newRecordInstance(ResourceRequest.class);
|
||||
Resource capability = Resources.createResource(memory, 1);
|
||||
|
||||
Resource capability = Resources.createResource(memory, vcores);
|
||||
|
||||
request.setNumContainers(numContainers);
|
||||
request.setResourceName(resourceName);
|
||||
request.setCapability(capability);
|
||||
@ -192,13 +199,18 @@ public static ApplicationId getMockApplicationId(int appId) {
|
||||
return ApplicationAttemptId.newInstance(applicationId, attemptId);
|
||||
}
|
||||
|
||||
public static FiCaSchedulerNode getMockNode(
|
||||
String host, String rack, int port, int capability) {
|
||||
public static FiCaSchedulerNode getMockNode(String host, String rack,
|
||||
int port, int memory) {
|
||||
return getMockNode(host, rack, port, memory, 1);
|
||||
}
|
||||
|
||||
public static FiCaSchedulerNode getMockNode(String host, String rack,
|
||||
int port, int memory, int vcores) {
|
||||
NodeId nodeId = NodeId.newInstance(host, port);
|
||||
RMNode rmNode = mock(RMNode.class);
|
||||
when(rmNode.getNodeID()).thenReturn(nodeId);
|
||||
when(rmNode.getTotalCapability()).thenReturn(
|
||||
Resources.createResource(capability, 1));
|
||||
Resources.createResource(memory, vcores));
|
||||
when(rmNode.getNodeAddress()).thenReturn(host+":"+port);
|
||||
when(rmNode.getHostName()).thenReturn(host);
|
||||
when(rmNode.getRackName()).thenReturn(rack);
|
||||
|
Loading…
Reference in New Issue
Block a user