YARN-2113. Add cross-user preemption within CapacityScheduler's leaf-queue. (Sunil G via wangda)

Change-Id: I9b19f69788068be05b3295247cdd7b972f8a573c
This commit is contained in:
Wangda Tan 2017-05-22 14:26:13 -07:00
parent 9cab42cc79
commit c583ab02c7
20 changed files with 1686 additions and 156 deletions

View File

@ -121,4 +121,9 @@ public boolean fitsIn(Resource cluster,
Resource smaller, Resource bigger) { Resource smaller, Resource bigger) {
return smaller.getMemorySize() <= bigger.getMemorySize(); return smaller.getMemorySize() <= bigger.getMemorySize();
} }
@Override
public boolean isAnyMajorResourceZero(Resource resource) {
return resource.getMemorySize() == 0f;
}
} }

View File

@ -239,4 +239,9 @@ public boolean fitsIn(Resource cluster,
return smaller.getMemorySize() <= bigger.getMemorySize() return smaller.getMemorySize() <= bigger.getMemorySize()
&& smaller.getVirtualCores() <= bigger.getVirtualCores(); && smaller.getVirtualCores() <= bigger.getVirtualCores();
} }
@Override
public boolean isAnyMajorResourceZero(Resource resource) {
return resource.getMemorySize() == 0f || resource.getVirtualCores() == 0;
}
} }

View File

@ -204,4 +204,13 @@ public abstract float divide(
*/ */
public abstract boolean fitsIn(Resource cluster, public abstract boolean fitsIn(Resource cluster,
Resource smaller, Resource bigger); Resource smaller, Resource bigger);
/**
* Check if resource has any major resource types (which are all NodeManagers
* included) a zero value.
*
* @param resource resource
* @return returns true if any resource is zero.
*/
public abstract boolean isAnyMajorResourceZero(Resource resource);
} }

View File

@ -352,4 +352,9 @@ public static Resource componentwiseMax(Resource lhs, Resource rhs) {
return createResource(Math.max(lhs.getMemorySize(), rhs.getMemorySize()), return createResource(Math.max(lhs.getMemorySize(), rhs.getMemorySize()),
Math.max(lhs.getVirtualCores(), rhs.getVirtualCores())); Math.max(lhs.getVirtualCores(), rhs.getVirtualCores()));
} }
public static boolean isAnyMajorResourceZero(ResourceCalculator rc,
Resource resource) {
return rc.isAnyMajorResourceZero(resource);
}
} }

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -63,4 +65,7 @@ TempQueuePerPartition getQueueByPartition(String queueName,
float getMinimumThresholdForIntraQueuePreemption(); float getMinimumThresholdForIntraQueuePreemption();
float getMaxAllowableLimitForIntraQueuePreemption(); float getMaxAllowableLimitForIntraQueuePreemption();
@Unstable
IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy();
} }

View File

@ -99,7 +99,7 @@ public static void deductPreemptableResourcesBasedSelectedCandidates(
} }
deductPreemptableResourcePerApp(context, tq.totalPartitionResource, deductPreemptableResourcePerApp(context, tq.totalPartitionResource,
tas, res, partition); tas, res);
} }
} }
} }
@ -108,10 +108,10 @@ public static void deductPreemptableResourcesBasedSelectedCandidates(
private static void deductPreemptableResourcePerApp( private static void deductPreemptableResourcePerApp(
CapacitySchedulerPreemptionContext context, CapacitySchedulerPreemptionContext context,
Resource totalPartitionResource, Collection<TempAppPerPartition> tas, Resource totalPartitionResource, Collection<TempAppPerPartition> tas,
Resource res, String partition) { Resource res) {
for (TempAppPerPartition ta : tas) { for (TempAppPerPartition ta : tas) {
ta.deductActuallyToBePreempted(context.getResourceCalculator(), ta.deductActuallyToBePreempted(context.getResourceCalculator(),
totalPartitionResource, res, partition); totalPartitionResource, res);
} }
} }
@ -157,7 +157,8 @@ public static boolean tryPreemptContainerAndDeductResToObtain(
&& Resources.greaterThan(rc, clusterResource, toObtainByPartition, && Resources.greaterThan(rc, clusterResource, toObtainByPartition,
Resources.none()) Resources.none())
&& Resources.fitsIn(rc, clusterResource, && Resources.fitsIn(rc, clusterResource,
rmContainer.getAllocatedResource(), totalPreemptionAllowed)) { rmContainer.getAllocatedResource(), totalPreemptionAllowed)
&& !Resources.isAnyMajorResourceZero(rc, toObtainByPartition)) {
Resources.subtractFrom(toObtainByPartition, Resources.subtractFrom(toObtainByPartition,
rmContainer.getAllocatedResource()); rmContainer.getAllocatedResource());
Resources.subtractFrom(totalPreemptionAllowed, Resources.subtractFrom(totalPreemptionAllowed,

View File

@ -18,11 +18,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.Set; import java.util.Set;
@ -33,7 +35,9 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -60,6 +64,26 @@ public FifoIntraQueuePreemptionPlugin(ResourceCalculator rc,
this.rc = rc; this.rc = rc;
} }
@Override
public Collection<FiCaSchedulerApp> getPreemptableApps(String queueName,
String partition) {
TempQueuePerPartition tq = context.getQueueByPartition(queueName,
partition);
List<FiCaSchedulerApp> apps = new ArrayList<FiCaSchedulerApp>();
for (TempAppPerPartition tmpApp : tq.getApps()) {
// If a lower priority app was not selected to get preempted, mark such
// apps out from preemption candidate selection.
if (Resources.equals(tmpApp.getActuallyToBePreempted(),
Resources.none())) {
continue;
}
apps.add(tmpApp.app);
}
return apps;
}
@Override @Override
public Map<String, Resource> getResourceDemandFromAppsPerQueue( public Map<String, Resource> getResourceDemandFromAppsPerQueue(
String queueName, String partition) { String queueName, String partition) {
@ -90,7 +114,7 @@ public Map<String, Resource> getResourceDemandFromAppsPerQueue(
@Override @Override
public void computeAppsIdealAllocation(Resource clusterResource, public void computeAppsIdealAllocation(Resource clusterResource,
Resource partitionBasedResource, TempQueuePerPartition tq, TempQueuePerPartition tq,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource totalPreemptedResourceAllowed, Resource totalPreemptedResourceAllowed,
Resource queueReassignableResource, float maxAllowablePreemptLimit) { Resource queueReassignableResource, float maxAllowablePreemptLimit) {
@ -113,17 +137,15 @@ public void computeAppsIdealAllocation(Resource clusterResource,
// 3. Create all tempApps for internal calculation and return a list from // 3. Create all tempApps for internal calculation and return a list from
// high priority to low priority order. // high priority to low priority order.
TAPriorityComparator taComparator = new TAPriorityComparator(); PriorityQueue<TempAppPerPartition> orderedByPriority = createTempAppForResCalculation(
PriorityQueue<TempAppPerPartition> orderedByPriority = tq, apps, clusterResource, perUserAMUsed);
createTempAppForResCalculation(tq.partition, apps, taComparator);
// 4. Calculate idealAssigned per app by checking based on queue's // 4. Calculate idealAssigned per app by checking based on queue's
// unallocated resource.Also return apps arranged from lower priority to // unallocated resource.Also return apps arranged from lower priority to
// higher priority. // higher priority.
TreeSet<TempAppPerPartition> orderedApps = TreeSet<TempAppPerPartition> orderedApps = calculateIdealAssignedResourcePerApp(
calculateIdealAssignedResourcePerApp(clusterResource, clusterResource, tq, selectedCandidates, queueReassignableResource,
partitionBasedResource, tq, selectedCandidates, orderedByPriority);
queueReassignableResource, orderedByPriority, perUserAMUsed);
// 5. A configurable limit that could define an ideal allowable preemption // 5. A configurable limit that could define an ideal allowable preemption
// limit. Based on current queue's capacity,defined how much % could become // limit. Based on current queue's capacity,defined how much % could become
@ -146,7 +168,7 @@ public void computeAppsIdealAllocation(Resource clusterResource,
// 7. From lowest priority app onwards, calculate toBePreempted resource // 7. From lowest priority app onwards, calculate toBePreempted resource
// based on demand. // based on demand.
calculateToBePreemptedResourcePerApp(clusterResource, orderedApps, calculateToBePreemptedResourcePerApp(clusterResource, orderedApps,
preemptionLimit); Resources.clone(preemptionLimit));
// Save all apps (low to high) to temp queue for further reference // Save all apps (low to high) to temp queue for further reference
tq.addAllApps(orderedApps); tq.addAllApps(orderedApps);
@ -154,7 +176,8 @@ public void computeAppsIdealAllocation(Resource clusterResource,
// 8. There are chances that we may preempt for the demand from same // 8. There are chances that we may preempt for the demand from same
// priority level, such cases are to be validated out. // priority level, such cases are to be validated out.
validateOutSameAppPriorityFromDemand(clusterResource, validateOutSameAppPriorityFromDemand(clusterResource,
(TreeSet<TempAppPerPartition>) tq.getApps()); (TreeSet<TempAppPerPartition>) orderedApps, tq.getUsersPerPartition(),
context.getIntraQueuePreemptionOrderPolicy());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition); LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition);
@ -177,17 +200,17 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource,
Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(), Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(),
tmpApp.idealAssigned); tmpApp.idealAssigned);
Resources.subtractFrom(preemtableFromApp, tmpApp.selected); Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.selected);
Resources.subtractFrom(preemtableFromApp, tmpApp.getAMUsed()); Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.getAMUsed());
// Calculate toBePreempted from apps as follows: // Calculate toBePreempted from apps as follows:
// app.preemptable = min(max(app.used - app.selected - app.ideal, 0), // app.preemptable = min(max(app.used - app.selected - app.ideal, 0),
// intra_q_preemptable) // intra_q_preemptable)
tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources
.max(rc, clusterResource, preemtableFromApp, Resources.none()), .max(rc, clusterResource, preemtableFromApp, Resources.none()),
preemptionLimit); Resources.clone(preemptionLimit));
preemptionLimit = Resources.subtract(preemptionLimit, preemptionLimit = Resources.subtractFromNonNegative(preemptionLimit,
tmpApp.toBePreempted); tmpApp.toBePreempted);
} }
} }
@ -222,31 +245,24 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource,
* } * }
* *
* @param clusterResource Cluster Resource * @param clusterResource Cluster Resource
* @param partitionBasedResource resource per partition
* @param tq TempQueue * @param tq TempQueue
* @param selectedCandidates Already Selected preemption candidates * @param selectedCandidates Already Selected preemption candidates
* @param queueReassignableResource Resource used in a queue * @param queueReassignableResource Resource used in a queue
* @param orderedByPriority List of running apps * @param orderedByPriority List of running apps
* @param perUserAMUsed AM used resource
* @return List of temp apps ordered from low to high priority * @return List of temp apps ordered from low to high priority
*/ */
private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp( private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
Resource clusterResource, Resource partitionBasedResource, Resource clusterResource, TempQueuePerPartition tq,
TempQueuePerPartition tq,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource queueReassignableResource, Resource queueReassignableResource,
PriorityQueue<TempAppPerPartition> orderedByPriority, PriorityQueue<TempAppPerPartition> orderedByPriority) {
Map<String, Resource> perUserAMUsed) {
Comparator<TempAppPerPartition> reverseComp = Collections Comparator<TempAppPerPartition> reverseComp = Collections
.reverseOrder(new TAPriorityComparator()); .reverseOrder(new TAPriorityComparator());
TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp); TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp);
Map<String, Resource> userIdealAssignedMapping = new HashMap<>();
String partition = tq.partition; String partition = tq.partition;
Map<String, TempUserPerPartition> usersPerPartition = tq.getUsersPerPartition();
Map<String, Resource> preCalculatedUserLimit =
new HashMap<String, Resource>();
while (!orderedByPriority.isEmpty()) { while (!orderedByPriority.isEmpty()) {
// Remove app from the next highest remaining priority and process it to // Remove app from the next highest remaining priority and process it to
@ -256,44 +272,19 @@ private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
// Once unallocated resource is 0, we can stop assigning ideal per app. // Once unallocated resource is 0, we can stop assigning ideal per app.
if (Resources.lessThanOrEqual(rc, clusterResource, if (Resources.lessThanOrEqual(rc, clusterResource,
queueReassignableResource, Resources.none())) { queueReassignableResource, Resources.none())
|| Resources.isAnyMajorResourceZero(rc, queueReassignableResource)) {
continue; continue;
} }
String userName = tmpApp.app.getUser(); String userName = tmpApp.app.getUser();
Resource userLimitResource = preCalculatedUserLimit.get(userName); TempUserPerPartition tmpUser = usersPerPartition.get(userName);
Resource userLimitResource = tmpUser.getUserLimit();
// Verify whether we already calculated headroom for this user. Resource idealAssignedForUser = tmpUser.idealAssigned;
if (userLimitResource == null) {
userLimitResource = Resources.clone(
tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
Resource amUsed = perUserAMUsed.get(userName);
if (null == amUsed) {
amUsed = Resources.createResource(0, 0);
}
// Real AM used need not have to be considered for user-limit as well.
userLimitResource = Resources.subtract(userLimitResource, amUsed);
if (LOG.isDebugEnabled()) {
LOG.debug("Userlimit for user '" + userName + "' is :"
+ userLimitResource + ", and amUsed is:" + amUsed);
}
preCalculatedUserLimit.put(userName, userLimitResource);
}
Resource idealAssignedForUser = userIdealAssignedMapping.get(userName);
if (idealAssignedForUser == null) {
idealAssignedForUser = Resources.createResource(0, 0);
userIdealAssignedMapping.put(userName, idealAssignedForUser);
}
// Calculate total selected container resources from current app. // Calculate total selected container resources from current app.
getAlreadySelectedPreemptionCandidatesResource(selectedCandidates, getAlreadySelectedPreemptionCandidatesResource(selectedCandidates, tmpApp,
tmpApp, partition); tmpUser, partition);
// For any app, used+pending will give its idealAssigned. However it will // For any app, used+pending will give its idealAssigned. However it will
// be tightly linked to queue's unallocated quota. So lower priority apps // be tightly linked to queue's unallocated quota. So lower priority apps
@ -304,10 +295,11 @@ private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
if (Resources.lessThan(rc, clusterResource, idealAssignedForUser, if (Resources.lessThan(rc, clusterResource, idealAssignedForUser,
userLimitResource)) { userLimitResource)) {
appIdealAssigned = Resources.min(rc, clusterResource, appIdealAssigned, Resource idealAssigned = Resources.min(rc, clusterResource,
appIdealAssigned,
Resources.subtract(userLimitResource, idealAssignedForUser)); Resources.subtract(userLimitResource, idealAssignedForUser));
tmpApp.idealAssigned = Resources.clone(Resources.min(rc, tmpApp.idealAssigned = Resources.clone(Resources.min(rc,
clusterResource, queueReassignableResource, appIdealAssigned)); clusterResource, queueReassignableResource, idealAssigned));
Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned); Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned);
} else { } else {
continue; continue;
@ -322,7 +314,8 @@ private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected)); Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected));
} }
Resources.subtractFrom(queueReassignableResource, tmpApp.idealAssigned); Resources.subtractFromNonNegative(queueReassignableResource,
tmpApp.idealAssigned);
} }
return orderedApps; return orderedApps;
@ -334,7 +327,8 @@ private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
*/ */
private void getAlreadySelectedPreemptionCandidatesResource( private void getAlreadySelectedPreemptionCandidatesResource(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
TempAppPerPartition tmpApp, String partition) { TempAppPerPartition tmpApp, TempUserPerPartition tmpUser,
String partition) {
tmpApp.selected = Resources.createResource(0, 0); tmpApp.selected = Resources.createResource(0, 0);
Set<RMContainer> containers = selectedCandidates Set<RMContainer> containers = selectedCandidates
.get(tmpApp.app.getApplicationAttemptId()); .get(tmpApp.app.getApplicationAttemptId());
@ -346,16 +340,23 @@ private void getAlreadySelectedPreemptionCandidatesResource(
for (RMContainer cont : containers) { for (RMContainer cont : containers) {
if (partition.equals(cont.getNodeLabelExpression())) { if (partition.equals(cont.getNodeLabelExpression())) {
Resources.addTo(tmpApp.selected, cont.getAllocatedResource()); Resources.addTo(tmpApp.selected, cont.getAllocatedResource());
Resources.addTo(tmpUser.selected, cont.getAllocatedResource());
} }
} }
} }
private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation( private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
String partition, Collection<FiCaSchedulerApp> apps, TempQueuePerPartition tq, Collection<FiCaSchedulerApp> apps,
TAPriorityComparator taComparator) { Resource clusterResource,
Map<String, Resource> perUserAMUsed) {
TAPriorityComparator taComparator = new TAPriorityComparator();
PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>( PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>(
100, taComparator); 100, taComparator);
String partition = tq.partition;
Map<String, TempUserPerPartition> usersPerPartition = tq
.getUsersPerPartition();
// have an internal temp app structure to store intermediate data(priority) // have an internal temp app structure to store intermediate data(priority)
for (FiCaSchedulerApp app : apps) { for (FiCaSchedulerApp app : apps) {
@ -387,56 +388,156 @@ private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
tmpApp.idealAssigned = Resources.createResource(0, 0); tmpApp.idealAssigned = Resources.createResource(0, 0);
orderedByPriority.add(tmpApp); orderedByPriority.add(tmpApp);
// Create a TempUserPerPartition structure to hold more information
// regarding each user's entities such as UserLimit etc. This could
// be kept in a user to TempUserPerPartition map for further reference.
String userName = app.getUser();
if (!usersPerPartition.containsKey(userName)) {
ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName)
.getResourceUsage();
TempUserPerPartition tmpUser = new TempUserPerPartition(
tq.leafQueue.getUser(userName), tq.queueName,
Resources.clone(userResourceUsage.getUsed(partition)),
Resources.clone(perUserAMUsed.get(userName)),
Resources.clone(userResourceUsage.getReserved(partition)),
Resources.none());
Resource userLimitResource = Resources.clone(
tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
// Real AM used need not have to be considered for user-limit as well.
userLimitResource = Resources.subtract(userLimitResource,
tmpUser.amUsed);
tmpUser.setUserLimit(userLimitResource);
if (LOG.isDebugEnabled()) {
LOG.debug("TempUser:" + tmpUser);
}
tmpUser.idealAssigned = Resources.createResource(0, 0);
tq.addUserPerPartition(userName, tmpUser);
}
} }
return orderedByPriority; return orderedByPriority;
} }
/* /*
* Fifo+Priority based preemption policy need not have to preempt resources at * Fifo+Priority based preemption policy need not have to preempt resources at
* same priority level. Such cases will be validated out. * same priority level. Such cases will be validated out. But if the demand is
* from an app of different user, force to preempt resources even if apps are
* at same priority.
*/ */
public void validateOutSameAppPriorityFromDemand(Resource cluster, public void validateOutSameAppPriorityFromDemand(Resource cluster,
TreeSet<TempAppPerPartition> appsOrderedfromLowerPriority) { TreeSet<TempAppPerPartition> orderedApps,
Map<String, TempUserPerPartition> usersPerPartition,
IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrder) {
TempAppPerPartition[] apps = appsOrderedfromLowerPriority TempAppPerPartition[] apps = orderedApps
.toArray(new TempAppPerPartition[appsOrderedfromLowerPriority.size()]); .toArray(new TempAppPerPartition[orderedApps.size()]);
if (apps.length <= 0) { if (apps.length <= 0) {
return; return;
} }
int lPriority = 0; for (int hPriority = apps.length - 1; hPriority >= 0; hPriority--) {
int hPriority = apps.length - 1;
while (lPriority < hPriority // Check whether high priority app with demand needs resource from other
&& !apps[lPriority].equals(apps[hPriority]) // user.
&& apps[lPriority].getPriority() < apps[hPriority].getPriority()) { if (Resources.greaterThan(rc, cluster,
Resource toPreemptFromOther = apps[hPriority] apps[hPriority].getToBePreemptFromOther(), Resources.none())) {
.getToBePreemptFromOther();
Resource actuallyToPreempt = apps[lPriority].getActuallyToBePreempted();
Resource delta = Resources.subtract(apps[lPriority].toBePreempted,
actuallyToPreempt);
if (Resources.greaterThan(rc, cluster, delta, Resources.none())) { // Given we have a demand from a high priority app, we can do a reverse
Resource toPreempt = Resources.min(rc, cluster, // scan from lower priority apps to select resources.
toPreemptFromOther, delta); // Since idealAssigned of each app has considered user-limit, this logic
// will provide eventual consistency w.r.t user-limit as well.
for (int lPriority = 0; lPriority < apps.length; lPriority++) {
apps[hPriority].setToBePreemptFromOther( // Check whether app with demand needs resource from other user.
Resources.subtract(toPreemptFromOther, toPreempt)); if (Resources.greaterThan(rc, cluster, apps[lPriority].toBePreempted,
apps[lPriority].setActuallyToBePreempted( Resources.none())) {
Resources.add(actuallyToPreempt, toPreempt));
}
if (Resources.lessThanOrEqual(rc, cluster, // If apps are of same user, and priority is same, then skip.
apps[lPriority].toBePreempted, if ((apps[hPriority].getUser().equals(apps[lPriority].getUser()))
apps[lPriority].getActuallyToBePreempted())) { && (apps[lPriority].getPriority() >= apps[hPriority]
lPriority++; .getPriority())) {
continue; continue;
} }
if (Resources.equals(apps[hPriority].getToBePreemptFromOther(), if (Resources.lessThanOrEqual(rc, cluster,
Resources.none())) { apps[lPriority].toBePreempted,
hPriority--; apps[lPriority].getActuallyToBePreempted())
continue; || Resources.equals(apps[hPriority].getToBePreemptFromOther(),
Resources.none())) {
continue;
}
// Ideally if any application has a higher priority, then it can
// force to preempt any lower priority app from any user. However
// if admin enforces user-limit over priority, preemption module
// will not choose lower priority apps from usre's who are not yet
// met its user-limit.
TempUserPerPartition tmpUser = usersPerPartition
.get(apps[lPriority].getUser());
if ((!apps[hPriority].getUser().equals(apps[lPriority].getUser()))
&& (!tmpUser.isUserLimitReached(rc, cluster))
&& (intraQueuePreemptionOrder
.equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST))) {
continue;
}
Resource toPreemptFromOther = apps[hPriority]
.getToBePreemptFromOther();
Resource actuallyToPreempt = apps[lPriority]
.getActuallyToBePreempted();
// A lower priority app could offer more resource to preempt, if
// multiple higher priority/under served users needs resources.
// After one iteration, we need to ensure that actuallyToPreempt is
// subtracted from the resource to preempt.
Resource preemptableFromLowerPriorityApp = Resources
.subtract(apps[lPriority].toBePreempted, actuallyToPreempt);
// In case of user-limit preemption, when app's are from different
// user and of same priority, we will do user-limit preemption if
// there is a demand from under UL quota app.
// However this under UL quota app's demand may be more.
// Still we should ensure that we are not doing over preemption such
// that only a maximum of (user's used - UL quota) could be
// preempted.
if ((!apps[hPriority].getUser().equals(apps[lPriority].getUser()))
&& (apps[lPriority].getPriority() == apps[hPriority]
.getPriority())
&& tmpUser.isUserLimitReached(rc, cluster)) {
Resource deltaULQuota = Resources
.subtract(tmpUser.getUsedDeductAM(), tmpUser.selected);
Resources.subtractFrom(deltaULQuota, tmpUser.getUserLimit());
if (tmpUser.isPreemptionQuotaForULDeltaDone()) {
deltaULQuota = Resources.createResource(0, 0);
}
if (Resources.lessThan(rc, cluster, deltaULQuota,
preemptableFromLowerPriorityApp)) {
tmpUser.updatePreemptionQuotaForULDeltaAsDone(true);
preemptableFromLowerPriorityApp = deltaULQuota;
}
}
if (Resources.greaterThan(rc, cluster,
preemptableFromLowerPriorityApp, Resources.none())) {
Resource toPreempt = Resources.min(rc, cluster,
toPreemptFromOther, preemptableFromLowerPriorityApp);
apps[hPriority].setToBePreemptFromOther(
Resources.subtract(toPreemptFromOther, toPreempt));
apps[lPriority].setActuallyToBePreempted(
Resources.add(actuallyToPreempt, toPreempt));
}
}
}
} }
} }
} }
@ -456,6 +557,40 @@ private Resource calculateUsedAMResourcesPerQueue(String partition,
Resources.addTo(userAMResource, app.getAMResource(partition)); Resources.addTo(userAMResource, app.getAMResource(partition));
Resources.addTo(amUsed, app.getAMResource(partition)); Resources.addTo(amUsed, app.getAMResource(partition));
} }
return amUsed; return amUsed;
} }
@Override
public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app,
Resource clusterResource, Resource usedResource, RMContainer c) {
// Ensure below checks
// 1. This check must be done only when preemption order is USERLIMIT_FIRST
// 2. By selecting container "c", check whether this user's resource usage
// is going below its user-limit.
// 3. Used resource of user must be always greater than user-limit to
// skip some containers as per this check. If used resource is under user
// limit, then these containers of this user has to be preempted as demand
// might be due to high priority apps running in same user.
String partition = context.getScheduler()
.getSchedulerNode(c.getAllocatedNode()).getPartition();
TempQueuePerPartition tq = context.getQueueByPartition(app.getQueueName(),
partition);
TempUserPerPartition tmpUser = tq.getUsersPerPartition().get(app.getUser());
// Given user is not present, skip the check.
if (tmpUser == null) {
return false;
}
// For ideal resource computations, user-limit got saved by subtracting am
// used resource in TempUser. Hence it has to be added back here for
// complete check.
Resource userLimit = Resources.add(tmpUser.getUserLimit(), tmpUser.amUsed);
return Resources.lessThanOrEqual(rc, clusterResource,
Resources.subtract(usedResource, c.getAllocatedResource()), userLimit)
&& context.getIntraQueuePreemptionOrderPolicy()
.equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST);
}
} }

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
@ -31,8 +32,9 @@
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator; import java.util.HashMap;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -51,14 +53,14 @@ static class TAPriorityComparator
Comparator<TempAppPerPartition> { Comparator<TempAppPerPartition> {
@Override @Override
public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) { public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) {
Priority p1 = Priority.newInstance(tq1.getPriority()); Priority p1 = Priority.newInstance(ta1.getPriority());
Priority p2 = Priority.newInstance(tq2.getPriority()); Priority p2 = Priority.newInstance(ta2.getPriority());
if (!p1.equals(p2)) { if (!p1.equals(p2)) {
return p1.compareTo(p2); return p1.compareTo(p2);
} }
return tq1.getApplicationId().compareTo(tq2.getApplicationId()); return ta1.getApplicationId().compareTo(ta2.getApplicationId());
} }
} }
@ -121,17 +123,27 @@ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<String, Resource> resToObtainByPartition = fifoPreemptionComputePlugin Map<String, Resource> resToObtainByPartition = fifoPreemptionComputePlugin
.getResourceDemandFromAppsPerQueue(queueName, partition); .getResourceDemandFromAppsPerQueue(queueName, partition);
// 6. Based on the selected resource demand per partition, select // Default preemption iterator considers only FIFO+priority. For
// userlimit preemption, its possible that some lower priority apps
// needs from high priority app of another user. Hence use apps
// ordered by userlimit starvation as well.
Collection<FiCaSchedulerApp> apps = fifoPreemptionComputePlugin
.getPreemptableApps(queueName, partition);
// 6. Get user-limit to ensure that we do not preempt resources which
// will force user's resource to come under its UL.
Map<String, Resource> rollingResourceUsagePerUser = new HashMap<>();
initializeUsageAndUserLimitForCompute(clusterResource, partition,
leafQueue, rollingResourceUsagePerUser);
// 7. Based on the selected resource demand per partition, select
// containers with known policy from inter-queue preemption. // containers with known policy from inter-queue preemption.
try { try {
leafQueue.getReadLock().lock(); leafQueue.getReadLock().lock();
Iterator<FiCaSchedulerApp> desc = leafQueue.getOrderingPolicy() for (FiCaSchedulerApp app : apps) {
.getPreemptionIterator(); preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates,
while (desc.hasNext()) { clusterResource, totalPreemptedResourceAllowed,
FiCaSchedulerApp app = desc.next(); resToObtainByPartition, rollingResourceUsagePerUser);
preemptFromLeastStarvedApp(selectedCandidates, clusterResource,
totalPreemptedResourceAllowed, resToObtainByPartition,
leafQueue, app);
} }
} finally { } finally {
leafQueue.getReadLock().unlock(); leafQueue.getReadLock().unlock();
@ -142,16 +154,30 @@ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
return selectedCandidates; return selectedCandidates;
} }
private void preemptFromLeastStarvedApp( private void initializeUsageAndUserLimitForCompute(Resource clusterResource,
String partition, LeafQueue leafQueue,
Map<String, Resource> rollingResourceUsagePerUser) {
for (String user : leafQueue.getAllUsers()) {
// Initialize used resource of a given user for rolling computation.
rollingResourceUsagePerUser.put(user, Resources.clone(
leafQueue.getUser(user).getResourceUsage().getUsed(partition)));
if (LOG.isDebugEnabled()) {
LOG.debug("Rolling resource usage for user:" + user + " is : "
+ rollingResourceUsagePerUser.get(user));
}
}
}
private void preemptFromLeastStarvedApp(LeafQueue leafQueue,
FiCaSchedulerApp app,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptedResourceAllowed, Resource clusterResource, Resource totalPreemptedResourceAllowed,
Map<String, Resource> resToObtainByPartition, LeafQueue leafQueue, Map<String, Resource> resToObtainByPartition,
FiCaSchedulerApp app) { Map<String, Resource> rollingResourceUsagePerUser) {
// ToDo: Reuse reservation selector here. // ToDo: Reuse reservation selector here.
List<RMContainer> liveContainers = new ArrayList<>( List<RMContainer> liveContainers = new ArrayList<>(app.getLiveContainers());
app.getLiveContainers());
sortContainers(liveContainers); sortContainers(liveContainers);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -160,6 +186,8 @@ private void preemptFromLeastStarvedApp(
+ totalPreemptedResourceAllowed); + totalPreemptedResourceAllowed);
} }
Resource rollingUsedResourcePerUser = rollingResourceUsagePerUser
.get(app.getUser());
for (RMContainer c : liveContainers) { for (RMContainer c : liveContainers) {
// if there are no demand, return. // if there are no demand, return.
@ -184,12 +212,34 @@ private void preemptFromLeastStarvedApp(
continue; continue;
} }
// Try to preempt this container // If selected container brings down resource usage under its user's
CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( // UserLimit (or equals to), we must skip such containers.
rc, preemptionContext, resToObtainByPartition, c, clusterResource, if (fifoPreemptionComputePlugin.skipContainerBasedOnIntraQueuePolicy(app,
selectedCandidates, totalPreemptedResourceAllowed); clusterResource, rollingUsedResourcePerUser, c)) {
} if (LOG.isDebugEnabled()) {
LOG.debug(
"Skipping container: " + c.getContainerId() + " with resource:"
+ c.getAllocatedResource() + " as UserLimit for user:"
+ app.getUser() + " with resource usage: "
+ rollingUsedResourcePerUser + " is going under UL");
}
break;
}
// Try to preempt this container
boolean ret = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
resToObtainByPartition, c, clusterResource, selectedCandidates,
totalPreemptedResourceAllowed);
// Subtract from respective user's resource usage once a container is
// selected for preemption.
if (ret && preemptionContext.getIntraQueuePreemptionOrderPolicy()
.equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) {
Resources.subtractFrom(rollingUsedResourcePerUser,
c.getAllocatedResource());
}
}
} }
private void computeIntraQueuePreemptionDemand(Resource clusterResource, private void computeIntraQueuePreemptionDemand(Resource clusterResource,
@ -205,12 +255,7 @@ private void computeIntraQueuePreemptionDemand(Resource clusterResource,
continue; continue;
} }
// 2. Its better to get partition based resource limit earlier before // 2. loop through all queues corresponding to a partition.
// starting calculation
Resource partitionBasedResource =
context.getPartitionResource(partition);
// 3. loop through all queues corresponding to a partition.
for (String queueName : queueNames) { for (String queueName : queueNames) {
TempQueuePerPartition tq = context.getQueueByPartition(queueName, TempQueuePerPartition tq = context.getQueueByPartition(queueName,
partition); partition);
@ -221,23 +266,22 @@ private void computeIntraQueuePreemptionDemand(Resource clusterResource,
continue; continue;
} }
// 4. Consider reassignableResource as (used - actuallyToBePreempted). // 3. Consider reassignableResource as (used - actuallyToBePreempted).
// This provides as upper limit to split apps quota in a queue. // This provides as upper limit to split apps quota in a queue.
Resource queueReassignableResource = Resources.subtract(tq.getUsed(), Resource queueReassignableResource = Resources.subtract(tq.getUsed(),
tq.getActuallyToBePreempted()); tq.getActuallyToBePreempted());
// 5. Check queue's used capacity. Make sure that the used capacity is // 4. Check queue's used capacity. Make sure that the used capacity is
// above certain limit to consider for intra queue preemption. // above certain limit to consider for intra queue preemption.
if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context
.getMinimumThresholdForIntraQueuePreemption()) { .getMinimumThresholdForIntraQueuePreemption()) {
continue; continue;
} }
// 6. compute the allocation of all apps based on queue's unallocated // 5. compute the allocation of all apps based on queue's unallocated
// capacity // capacity
fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource, fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource,
partitionBasedResource, tq, selectedCandidates, tq, selectedCandidates, totalPreemptedResourceAllowed,
totalPreemptedResourceAllowed,
queueReassignableResource, queueReassignableResource,
context.getMaxAllowableLimitForIntraQueuePreemption()); context.getMaxAllowableLimitForIntraQueuePreemption());
} }

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
interface IntraQueuePreemptionComputePlugin { interface IntraQueuePreemptionComputePlugin {
@ -32,8 +34,14 @@ Map<String, Resource> getResourceDemandFromAppsPerQueue(String queueName,
String partition); String partition);
void computeAppsIdealAllocation(Resource clusterResource, void computeAppsIdealAllocation(Resource clusterResource,
Resource partitionBasedResource, TempQueuePerPartition tq, TempQueuePerPartition tq,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource totalPreemptedResourceAllowed, Resource queueTotalUnassigned, Resource totalPreemptedResourceAllowed, Resource queueTotalUnassigned,
float maxAllowablePreemptLimit); float maxAllowablePreemptLimit);
Collection<FiCaSchedulerApp> getPreemptableApps(String queueName,
String partition);
boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app,
Resource clusterResource, Resource usedResource, RMContainer c);
} }

View File

@ -22,6 +22,7 @@
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -79,6 +80,16 @@
*/ */
public class ProportionalCapacityPreemptionPolicy public class ProportionalCapacityPreemptionPolicy
implements SchedulingEditPolicy, CapacitySchedulerPreemptionContext { implements SchedulingEditPolicy, CapacitySchedulerPreemptionContext {
/**
* IntraQueuePreemptionOrder will be used to define various priority orders
* which could be configured by admin.
*/
@Unstable
public enum IntraQueuePreemptionOrderPolicy {
PRIORITY_FIRST, USERLIMIT_FIRST;
}
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class); LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class);
@ -95,6 +106,7 @@ public class ProportionalCapacityPreemptionPolicy
private float maxAllowableLimitForIntraQueuePreemption; private float maxAllowableLimitForIntraQueuePreemption;
private float minimumThresholdForIntraQueuePreemption; private float minimumThresholdForIntraQueuePreemption;
private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy;
// Pointer to other RM components // Pointer to other RM components
private RMContext rmContext; private RMContext rmContext;
@ -190,6 +202,13 @@ public void init(Configuration config, RMContext context,
CapacitySchedulerConfiguration. CapacitySchedulerConfiguration.
DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD); DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD);
intraQueuePreemptionOrderPolicy = IntraQueuePreemptionOrderPolicy
.valueOf(csConfig
.get(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY)
.toUpperCase());
rc = scheduler.getResourceCalculator(); rc = scheduler.getResourceCalculator();
nlm = scheduler.getRMContext().getNodeLabelManager(); nlm = scheduler.getRMContext().getNodeLabelManager();
@ -242,7 +261,6 @@ public synchronized void editSchedule() {
} }
} }
@SuppressWarnings("unchecked")
private void preemptOrkillSelectedContainerAfterWait( private void preemptOrkillSelectedContainerAfterWait(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
long currentTime) { long currentTime) {
@ -652,4 +670,9 @@ public void addPartitionToUnderServedQueues(String queueName,
} }
underServedQueues.add(queueName); underServedQueues.add(queueName);
} }
@Override
public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() {
return intraQueuePreemptionOrderPolicy;
}
} }

View File

@ -91,8 +91,12 @@ public ApplicationId getApplicationId() {
return applicationId; return applicationId;
} }
public String getUser() {
return this.app.getUser();
}
public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator, public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator,
Resource cluster, Resource toBeDeduct, String partition) { Resource cluster, Resource toBeDeduct) {
if (Resources.greaterThan(resourceCalculator, cluster, if (Resources.greaterThan(resourceCalculator, cluster,
getActuallyToBePreempted(), toBeDeduct)) { getActuallyToBePreempted(), toBeDeduct)) {
Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);

View File

@ -26,6 +26,8 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
/** /**
* Temporary data-structure tracking resource availability, pending resource * Temporary data-structure tracking resource availability, pending resource
@ -59,6 +61,10 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
int relativePriority = 0; int relativePriority = 0;
TempQueuePerPartition parent = null; TempQueuePerPartition parent = null;
// This will hold a temp user data structure and will hold userlimit,
// idealAssigned, used etc.
Map<String, TempUserPerPartition> usersPerPartition = new LinkedHashMap<>();
TempQueuePerPartition(String queueName, Resource current, TempQueuePerPartition(String queueName, Resource current,
boolean preemptionDisabled, String partition, Resource killable, boolean preemptionDisabled, String partition, Resource killable,
float absCapacity, float absMaxCapacity, Resource totalPartitionResource, float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
@ -289,4 +295,12 @@ public Collection<TempAppPerPartition> getApps() {
return apps; return apps;
} }
public void addUserPerPartition(String userName,
TempUserPerPartition tmpUser) {
this.usersPerPartition.put(userName, tmpUser);
}
public Map<String, TempUserPerPartition> getUsersPerPartition() {
return usersPerPartition;
}
} }

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Temporary data-structure tracking resource availability, pending resource
* need, current utilization for an application.
*/
public class TempUserPerPartition extends AbstractPreemptionEntity {
private final User user;
private Resource userLimit;
private boolean donePreemptionQuotaForULDelta = false;
TempUserPerPartition(User user, String queueName, Resource usedPerPartition,
Resource amUsedPerPartition, Resource reserved,
Resource pendingPerPartition) {
super(queueName, usedPerPartition, amUsedPerPartition, reserved,
pendingPerPartition);
this.user = user;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(" NAME: " + getUserName()).append(" CUR: ").append(getUsed())
.append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved)
.append(" AM_USED: ").append(amUsed).append(" USER_LIMIT: ")
.append(getUserLimit()).append(" IDEAL_ASSIGNED: ")
.append(idealAssigned).append(" USED_WO_AMUSED: ")
.append(getUsedDeductAM()).append(" IDEAL_PREEMPT: ")
.append(toBePreempted).append(" ACTUAL_PREEMPT: ")
.append(getActuallyToBePreempted()).append("\n");
return sb.toString();
}
public String getUserName() {
return user.getUserName();
}
public Resource getUserLimit() {
return userLimit;
}
public void setUserLimit(Resource userLimitResource) {
this.userLimit = userLimitResource;
}
public boolean isUserLimitReached(ResourceCalculator rc,
Resource clusterResource) {
if (Resources.greaterThan(rc, clusterResource, getUsedDeductAM(),
userLimit)) {
return true;
}
return false;
}
public boolean isPreemptionQuotaForULDeltaDone() {
return this.donePreemptionQuotaForULDelta;
}
public void updatePreemptionQuotaForULDeltaAsDone(boolean done) {
this.donePreemptionQuotaForULDelta = done;
}
}

View File

@ -1233,6 +1233,14 @@ public boolean getLazyPreemptionEnabled() {
public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT = public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
0.2f; 0.2f;
/**
* For intra-queue preemption, enforce a preemption order such as
* "userlimit_first" or "priority_first".
*/
public static final String INTRAQUEUE_PREEMPTION_ORDER_POLICY = PREEMPTION_CONFIG_PREFIX
+ INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "preemption-order-policy";
public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first";
/** /**
* Maximum application for a queue to be used when application per queue is * Maximum application for a queue to be used when application per queue is
* not defined.To be consistent with previous version the default value is set * not defined.To be consistent with previous version the default value is set

View File

@ -43,12 +43,10 @@
import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@ -56,7 +54,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
@ -2022,4 +2019,12 @@ public void stopQueue() {
writeLock.unlock(); writeLock.unlock();
} }
} }
/**
* Get all valid users in this queue.
* @return user list
*/
public Set<String> getAllUsers() {
return this.getUsersManager().getUsers().keySet();
}
} }

View File

@ -253,6 +253,15 @@ public Resource getUserResourceLimit() {
public void setUserResourceLimit(Resource userResourceLimit) { public void setUserResourceLimit(Resource userResourceLimit) {
this.userResourceLimit = userResourceLimit; this.userResourceLimit = userResourceLimit;
} }
public String getUserName() {
return this.userName;
}
@VisibleForTesting
public void setResourceUsage(ResourceUsage resourceUsage) {
this.userResourceUsage = resourceUsage;
}
} /* End of User class */ } /* End of User class */
/** /**
@ -344,7 +353,7 @@ public void userLimitNeedsRecompute() {
/* /*
* Get all users of queue. * Get all users of queue.
*/ */
private Map<String, User> getUsers() { public Map<String, User> getUsers() {
return users; return users;
} }

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -96,6 +97,7 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
Clock mClock = null; Clock mClock = null;
CapacitySchedulerConfiguration conf = null; CapacitySchedulerConfiguration conf = null;
CapacityScheduler cs = null; CapacityScheduler cs = null;
@SuppressWarnings("rawtypes")
EventHandler<Event> mDisp = null; EventHandler<Event> mDisp = null;
ProportionalCapacityPreemptionPolicy policy = null; ProportionalCapacityPreemptionPolicy policy = null;
Resource clusterResource = null; Resource clusterResource = null;
@ -247,6 +249,7 @@ public Integer answer(InvocationOnMock invocation) throws Throwable {
if (containerId == 1) { if (containerId == 1) {
when(rmc.isAMContainer()).thenReturn(true); when(rmc.isAMContainer()).thenReturn(true);
when(app.getAMResource(label)).thenReturn(res); when(app.getAMResource(label)).thenReturn(res);
when(app.getAppAMNodePartitionName()).thenReturn(label);
} }
if (reserved) { if (reserved) {
@ -280,6 +283,12 @@ public Integer answer(InvocationOnMock invocation) throws Throwable {
containerId++; containerId++;
} }
// If app has 0 container, and it has only pending, still make sure to
// update label.
if (repeat == 0) {
when(app.getAppAMNodePartitionName()).thenReturn(label);
}
// Some more app specific aggregated data can be better filled here. // Some more app specific aggregated data can be better filled here.
when(app.getPriority()).thenReturn(pri); when(app.getPriority()).thenReturn(pri);
when(app.getUser()).thenReturn(userName); when(app.getUser()).thenReturn(userName);
@ -315,10 +324,15 @@ public Integer answer(InvocationOnMock invocation) throws Throwable {
private void mockApplications(String appsConfig) { private void mockApplications(String appsConfig) {
int id = 1; int id = 1;
HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>(); HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>();
HashMap<String, HashMap<String, HashMap<String, ResourceUsage>>> userResourceUsagePerLabel = new HashMap<>();
LeafQueue queue = null; LeafQueue queue = null;
int mulp = -1;
for (String a : appsConfig.split(";")) { for (String a : appsConfig.split(";")) {
String[] strs = a.split("\t"); String[] strs = a.split("\t");
String queueName = strs[0]; String queueName = strs[0];
if (mulp <= 0 && strs.length > 2 && strs[2] != null) {
mulp = 100 / (new Integer(strs[2]).intValue());
}
// get containers // get containers
List<RMContainer> liveContainers = new ArrayList<RMContainer>(); List<RMContainer> liveContainers = new ArrayList<RMContainer>();
@ -338,6 +352,7 @@ private void mockApplications(String appsConfig) {
when(app.getReservedContainers()).thenReturn(reservedContainers); when(app.getReservedContainers()).thenReturn(reservedContainers);
when(app.getApplicationAttemptId()).thenReturn(appAttemptId); when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
when(app.getApplicationId()).thenReturn(appId); when(app.getApplicationId()).thenReturn(appId);
when(app.getQueueName()).thenReturn(queueName);
// add to LeafQueue // add to LeafQueue
queue = (LeafQueue) nameToCSQueues.get(queueName); queue = (LeafQueue) nameToCSQueues.get(queueName);
@ -349,23 +364,71 @@ private void mockApplications(String appsConfig) {
users = new HashSet<String>(); users = new HashSet<String>();
userMap.put(queueName, users); userMap.put(queueName, users);
} }
users.add(app.getUser()); users.add(app.getUser());
String label = app.getAppAMNodePartitionName();
// Get label to queue
HashMap<String, HashMap<String, ResourceUsage>> userResourceUsagePerQueue = userResourceUsagePerLabel
.get(label);
if (null == userResourceUsagePerQueue) {
userResourceUsagePerQueue = new HashMap<>();
userResourceUsagePerLabel.put(label, userResourceUsagePerQueue);
}
// Get queue to user based resource map
HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerQueue
.get(queueName);
if (null == userResourceUsage) {
userResourceUsage = new HashMap<>();
userResourceUsagePerQueue.put(queueName, userResourceUsage);
}
// Get user to its resource usage.
ResourceUsage usage = userResourceUsage.get(app.getUser());
if (null == usage) {
usage = new ResourceUsage();
userResourceUsage.put(app.getUser(), usage);
}
usage.incAMUsed(app.getAMResource(label));
usage.incUsed(app.getAppAttemptResourceUsage().getUsed(label));
id++; id++;
} }
for (String queueName : userMap.keySet()) { for (String label : userResourceUsagePerLabel.keySet()) {
queue = (LeafQueue) nameToCSQueues.get(queueName); for (String queueName : userMap.keySet()) {
// Currently we have user-limit test support only for default label. queue = (LeafQueue) nameToCSQueues.get(queueName);
Resource totResoucePerPartition = partitionToResource.get(""); // Currently we have user-limit test support only for default label.
Resource capacity = Resources.multiply(totResoucePerPartition, Resource totResoucePerPartition = partitionToResource.get("");
queue.getQueueCapacities().getAbsoluteCapacity()); Resource capacity = Resources.multiply(totResoucePerPartition,
HashSet<String> users = userMap.get(queue.getQueueName()); queue.getQueueCapacities().getAbsoluteCapacity());
Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size()); HashSet<String> users = userMap.get(queue.getQueueName());
for (String userName : users) { when(queue.getAllUsers()).thenReturn(users);
when(queue.getResourceLimitForAllUsers(eq(userName), Resource userLimit;
any(Resource.class), anyString(), any(SchedulingMode.class))) if (mulp > 0) {
.thenReturn(userLimit); userLimit = Resources.divideAndCeil(rc, capacity, mulp);
} else {
userLimit = Resources.divideAndCeil(rc, capacity,
users.size());
}
LOG.debug("Updating user-limit from mock: totResoucePerPartition="
+ totResoucePerPartition + ", capacity=" + capacity
+ ", users.size()=" + users.size() + ", userlimit= " + userLimit
+ ",label= " + label + ",queueName= " + queueName);
HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerLabel
.get(label).get(queueName);
for (String userName : users) {
User user = new User(userName);
if (userResourceUsage != null) {
user.setResourceUsage(userResourceUsage.get(userName));
}
when(queue.getUser(eq(userName))).thenReturn(user);
when(queue.getResourceLimitForAllUsers(eq(userName),
any(Resource.class), anyString(), any(SchedulingMode.class)))
.thenReturn(userLimit);
}
} }
} }
} }

View File

@ -62,12 +62,16 @@ public void testSimpleIntraQueuePreemption() throws IOException {
* Apps which are running at low priority (4) will preempt few of its * Apps which are running at low priority (4) will preempt few of its
* resources to meet the demand. * resources to meet the demand.
*/ */
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;"; String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label String nodesConfig = // n1 has no label
"n1= res=100"; "n1= res=100";
String queuesConfig = String queuesConfig =
// guaranteed,max,used,pending,reserved // guaranteed,max,used,pending,reserved
"root(=[100 100 80 120 0]);" + // root "root(=[100 100 79 120 0]);" + // root
"-a(=[11 100 11 50 0]);" + // a "-a(=[11 100 11 50 0]);" + // a
"-b(=[40 100 38 60 0]);" + // b "-b(=[40 100 38 60 0]);" + // b
"-c(=[20 100 10 10 0]);" + // c "-c(=[20 100 10 10 0]);" + // c
@ -304,6 +308,8 @@ public void testLimitPreemptionWithMaxIntraQueuePreemptableLimit()
conf.setFloat(CapacitySchedulerConfiguration. conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5); (float) 0.5);
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;"; String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label String nodesConfig = // n1 has no label
@ -357,6 +363,8 @@ public void testLimitPreemptionWithTotalPreemptedResourceAllowed()
// report "ideal" preempt as 10%. Ensure preemption happens only for 10% // report "ideal" preempt as 10%. Ensure preemption happens only for 10%
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
(float) 0.1); (float) 0.1);
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;"; String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label String nodesConfig = // n1 has no label
@ -411,6 +419,8 @@ public void testAlreadySelectedContainerFromInterQueuePreemption()
conf.setFloat(CapacitySchedulerConfiguration. conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5); (float) 0.5);
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;"; String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label String nodesConfig = // n1 has no label
@ -418,7 +428,7 @@ public void testAlreadySelectedContainerFromInterQueuePreemption()
String queuesConfig = String queuesConfig =
// guaranteed,max,used,pending,reserved // guaranteed,max,used,pending,reserved
"root(=[100 100 95 170 0]);" + // root "root(=[100 100 95 170 0]);" + // root
"-a(=[60 100 70 50 0]);" + // a "-a(=[60 100 70 35 0]);" + // a
"-b(=[40 100 25 120 0])"; // b "-b(=[40 100 25 120 0])"; // b
String appsConfig = String appsConfig =
@ -467,6 +477,8 @@ public void testSkipAMContainersInInterQueuePreemption() throws IOException {
conf.setFloat(CapacitySchedulerConfiguration. conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5); (float) 0.5);
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;"; String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label String nodesConfig = // n1 has no label
@ -516,6 +528,8 @@ public void testSkipAMContainersInInterQueuePreemptionSingleApp()
* cycle. Eventhough there are more demand and no other low priority * cycle. Eventhough there are more demand and no other low priority
* apps are present, still AM contaier need to soared. * apps are present, still AM contaier need to soared.
*/ */
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;"; String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label String nodesConfig = // n1 has no label
@ -660,6 +674,8 @@ public void testNodePartitionIntraQueuePreemption() throws IOException {
conf.setFloat(CapacitySchedulerConfiguration. conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5); (float) 0.5);
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;" + // default partition String labelsConfig = "=100,true;" + // default partition
"x=100,true"; // partition=x "x=100,true"; // partition=x
@ -720,6 +736,8 @@ public void testComplexIntraQueuePreemption() throws IOException {
conf.setFloat(CapacitySchedulerConfiguration. conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5); (float) 0.5);
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;"; String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label String nodesConfig = // n1 has no label
@ -840,8 +858,10 @@ public void testIntraQueuePreemptionWithTwoUsers()
policy.editSchedule(); policy.editSchedule();
// Considering user-limit of 50% since only 2 users are there, only preempt // Considering user-limit of 50% since only 2 users are there, only preempt
// 15 more (5 is already running) eventhough demand is for 30. // 14 more (5 is already running) eventhough demand is for 30. Ideally we
verify(mDisp, times(15)).handle(argThat( // must preempt 15. But 15th container will bring user1's usage to 20 which
// is same as user-limit. Hence skip 15th container.
verify(mDisp, times(14)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3)))); getAppAttemptId(3))));
} }
@ -869,6 +889,8 @@ public void testComplexNodePartitionIntraQueuePreemption()
conf.setFloat(CapacitySchedulerConfiguration. conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5); (float) 0.5);
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;" + // default partition String labelsConfig = "=100,true;" + // default partition
"x=100,true"; // partition=x "x=100,true"; // partition=x

View File

@ -0,0 +1,899 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* Test class for IntraQueuePreemption scenarios.
*/
public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
extends
ProportionalCapacityPreemptionPolicyMockFramework {
@Before
public void setup() {
super.setup();
conf.setBoolean(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
}
@Test
public void testSimpleIntraQueuePreemptionWithTwoUsers()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* |
* a
* </pre>
*
* Scenario:
* Preconditions:
* Queue total resources: 100
* Minimum user limit percent: 50%
* +--------------+----------+------+---------+
* | APP | USER | PRIORITY | USED | PENDING |
* +--------------+----------+------+---------+
* | app1 | user1 | 1 | 100 | 0 |
* | app2 | user2 | 1 | 0 | 30 |
* +--------------+----------+------+---------+
* Hence in queueA of 100, each user has a quota of 50. app1 of high priority
* has a demand of 0 and its already using 100. app2 from user2 has a demand
* of 30, and UL is 50. 30 would be preempted from app1.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 30 0]);" + // root
"-a(=[100 100 100 30 0])"; // a
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,100,false,0,user1);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,0,false,30,user2)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2 needs more resource and its well under its user-limit. Hence preempt
// resources from app1.
verify(mDisp, times(30)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@Test
public void testNoIntraQueuePreemptionWithSingleUser()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* |
* a
* </pre>
*
* Scenario:
* Queue total resources: 100
* Minimum user limit percent: 50%
* +--------------+----------+------+---------+
* | APP | USER | PRIORITY | USED | PENDING |
* +--------------+----------+------+---------+
* | app1 | user1 | 1 | 100 | 0 |
* | app2 | user1 | 1 | 0 | 30 |
* +--------------+----------+------+---------+
* Given single user, lower priority/late submitted apps has to
* wait.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 30 0]);" + // root
"-a(=[100 100 100 30 0])"; // a
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,100,false,0,user1);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,0,false,30,user1)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2 needs more resource. Since app1,2 are from same user, there wont be
// any preemption.
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@Test
public void testNoIntraQueuePreemptionWithTwoUserUnderUserLimit()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* |
* a
* </pre>
*
* Scenario:
* Queue total resources: 100
* Minimum user limit percent: 50%
* +--------------+----------+------+---------+
* | APP | USER | PRIORITY | USED | PENDING |
* +--------------+----------+------+---------+
* | app1 | user1 | 1 | 50 | 0 |
* | app2 | user2 | 1 | 30 | 30 |
* +--------------+----------+------+---------+
* Hence in queueA of 100, each user has a quota of 50. app1 of high priority
* has a demand of 0 and its already using 50. app2 from user2 has a demand
* of 30, and UL is 50. Since app1 is under UL, there should not be any
* preemption.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 80 30 0]);" + // root
"-a(=[100 100 80 30 0])"; // a
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,50,false,0,user1);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,30,false,30,user2)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2 needs more resource. Since app1,2 are from same user, there wont be
// any preemption.
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@Test
public void testSimpleIntraQueuePreemptionWithTwoUsersWithAppPriority()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* |
* a
* </pre>
*
* Scenario:
* Queue total resources: 100
* Minimum user limit percent: 50%
* +--------------+----------+------+---------+
* | APP | USER | PRIORITY | USED | PENDING |
* +--------------+----------+------+---------+
* | app1 | user1 | 2 | 100 | 0 |
* | app2 | user2 | 1 | 0 | 30 |
* +--------------+----------+------+---------+
* Hence in queueA of 100, each user has a quota of 50. app1 of high priority
* has a demand of 0 and its already using 100. app2 from user2 has a demand
* of 30, and UL is 50. 30 would be preempted from app1.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 30 0]);" + // root
"-a(=[100 100 100 30 0])"; // a
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(2,1,n1,,100,false,0,user1);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,0,false,30,user2)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2 needs more resource and its well under its user-limit. Hence preempt
// resources from app1 even though its priority is more than app2.
verify(mDisp, times(30)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@Test
public void testIntraQueuePreemptionOfUserLimitWithMultipleApps()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* |
* a
* </pre>
*
* Scenario:
* Queue total resources: 100
* Minimum user limit percent: 50%
* +--------------+----------+------+---------+
* | APP | USER | PRIORITY | USED | PENDING |
* +--------------+----------+------+---------+
* | app1 | user1 | 1 | 30 | 30 |
* | app2 | user2 | 1 | 20 | 20 |
* | app3 | user1 | 1 | 30 | 30 |
* | app4 | user2 | 1 | 0 | 10 |
* +--------------+----------+------+---------+
* Hence in queueA of 100, each user has a quota of 50. Now have multiple
* apps and check for preemption across apps.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 80 90 0]);" + // root
"-a(=[100 100 80 90 0])"; // a
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,30,false,30,user1);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,20,false,20,user2);" +
"a\t" // app3 in a
+ "(1,1,n1,,30,false,30,user1);" +
"a\t" // app4 in a
+ "(1,1,n1,,0,false,10,user2)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2/app4 needs more resource and its well under its user-limit. Hence
// preempt resources from app3 (compare to app1, app3 has low priority).
verify(mDisp, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testNoPreemptionOfUserLimitWithMultipleAppsAndSameUser()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* |
* a
* </pre>
*
* Scenario:
* Queue total resources: 100
* Minimum user limit percent: 50%
* +--------------+----------+------+---------+
* | APP | USER | PRIORITY | USED | PENDING |
* +--------------+----------+------+---------+
* | app1 | user1 | 1 | 30 | 30 |
* | app2 | user1 | 1 | 20 | 20 |
* | app3 | user1 | 1 | 30 | 30 |
* | app4 | user1 | 1 | 0 | 10 |
* +--------------+----------+------+---------+
* Hence in queueA of 100, each user has a quota of 50. Now have multiple
* apps and check for preemption across apps.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 80 90 0]);" + // root
"-a(=[100 100 80 90 0])"; // a
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,30,false,20,user1);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,20,false,20,user1);" +
"a\t" // app3 in a
+ "(1,1,n1,,30,false,30,user1);" +
"a\t" // app4 in a
+ "(1,1,n1,,0,false,10,user1)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2/app4 needs more resource and its well under its user-limit. Hence
// preempt resources from app3 (compare to app1, app3 has low priority).
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
}
@Test
public void testIntraQueuePreemptionOfUserLimitWitAppsOfDifferentPriority()
throws IOException {
/**
* Queue structure is:
* <pre>
* root
* |
* a
* </pre>
*
* Scenario:
* Queue total resources: 100
* Minimum user limit percent: 50%
* +--------------+----------+------+---------+
* | APP | USER | PRIORITY | USED | PENDING |
* +--------------+----------+------+---------+
* | app1 | user1 | 3 | 30 | 30 |
* | app2 | user2 | 1 | 20 | 20 |
* | app3 | user1 | 4 | 30 | 0 |
* | app4 | user2 | 1 | 0 | 10 |
* +--------------+----------+------+---------+
* Hence in queueA of 100, each user has a quota of 50. Now have multiple
* apps and check for preemption across apps.
*/
// Set max preemption limit as 50%.
conf.setFloat(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 80 60 0]);" + // root
"-a(=[100 100 80 60 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(3,1,n1,,30,false,30,user1);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,20,false,20,user2);" + "a\t" // app3 in a
+ "(4,1,n1,,30,false,0,user1);" + "a\t" // app4 in a
+ "(1,1,n1,,0,false,10,user2)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2/app4 needs more resource and its well under its user-limit. Hence
// preempt resources from app1 (compare to app3, app1 has low priority).
verify(mDisp, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@Test
public void testIntraQueuePreemptionOfUserLimitInTwoQueues()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Guaranteed resource of a/b are 40:60 Total cluster resource = 100
* maxIntraQueuePreemptableLimit by default is 50%. This test is to verify
* that intra-queue preemption could occur in two queues when user-limit
* irreuglarity is present in queue.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 90 80 0]);" + // root
"-a(=[60 100 55 60 0]);" + // a
"-b(=[40 100 35 20 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(3,1,n1,,20,false,30,user1);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,20,false,20,user2);" +
"a\t" // app3 in a
+ "(4,1,n1,,15,false,0,user1);" +
"a\t" // app4 in a
+ "(1,1,n1,,0,false,10,user2);" +
"b\t" // app5 in b
+ "(3,1,n1,,25,false,10,user1);" +
"b\t" // app6 in b
+ "(1,1,n1,,10,false,10,user2)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2/app4 needs more resource and its well under its user-limit. Hence
// preempt resources from app1 (compare to app3, app1 has low priority).
verify(mDisp, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(5))));
}
@Test
public void testIntraQueuePreemptionWithTwoRequestingUsers()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* |
* a
* </pre>
*
* Scenario:
* Queue total resources: 100
* Minimum user limit percent: 50%
* +--------------+----------+------+---------+
* | APP | USER | PRIORITY | USED | PENDING |
* +--------------+----------+------+---------+
* | app1 | user1 | 1 | 60 | 10 |
* | app2 | user2 | 1 | 40 | 10 |
* +--------------+----------+------+---------+
* Hence in queueA of 100, each user has a quota of 50. Now have multiple
* apps and check for preemption across apps.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 20 0]);" + // root
"-a(=[100 100 100 20 0])"; // a
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,60,false,10,user1);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,40,false,10,user2)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2 needs more resource and its well under its user-limit. Hence preempt
// resources from app1.
verify(mDisp, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@Test
public void testNoIntraQueuePreemptionIfBelowUserLimitAndLowPriorityExtraUsers()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* |
* a
* </pre>
*
* Scenario:
* Preconditions:
* Queue total resources: 100
* Minimum user limit percent: 50%
* +--------------+----------+------+---------+
* | APP | USER | PRIORITY | USED | PENDING |
* +--------------+----------+------+---------+
* | app1 | user1 | 1 | 50 | 0 |
* | app2 | user2 | 1 | 50 | 0 |
* | app3 | user3 | 0 | 0 | 10 |
* +--------------+----------+------+---------+
* This scenario should never preempt from either user1 or user2
*/
// Set max preemption per round to 50% (this is different from minimum user
// limit percent).
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.7);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 10 0]);" + // root
"-a(=[100 100 100 10 0])"; // a
String appsConfig =
// queueName\t\
// (priority,resource,host,label,#repeat,reserved,pending,user)\tMULP;
"a\t(1,1,n1,,50,false,0,user1)\t50;" + // app1, user1
"a\t(1,1,n1,,50,false,0,user2)\t50;" + // app2, user2
"a\t(0,1,n1,,0,false,10,user3)\t50"; // app3, user3
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2/app4 needs more resource and its well under its user-limit. Hence
// preempt resources from app1 (compare to app3, app1 has low priority).
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@Test
public void testNoIntraQueuePreemptionIfBelowUserLimitAndSamePriorityExtraUsers()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* |
* a
* </pre>
*
* Scenario:
* Preconditions:
* Queue total resources: 100
* Minimum user limit percent: 50%
* +--------------+----------+------+---------+
* | APP | USER | PRIORITY | USED | PENDING |
* +--------------+----------+------+---------+
* | app1 | user1 | 1 | 50 | 0 |
* | app2 | user2 | 1 | 50 | 0 |
* | app3 | user3 | 1 | 0 | 10 |
* +--------------+----------+------+---------+
* This scenario should never preempt from either user1 or user2
*/
// Set max preemption per round to 50% (this is different from minimum user
// limit percent).
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.7);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 10 0]);" + // root
"-a(=[100 100 100 10 0])"; // a
String appsConfig =
// queueName\t\
// (priority,resource,host,label,#repeat,reserved,pending,user)\tMULP;
"a\t(1,1,n1,,50,false,0,user1)\t50;" + // app1, user1
"a\t(1,1,n1,,50,false,0,user2)\t50;" + // app2, user2
"a\t(1,1,n1,,0,false,10,user3)\t50"; // app3, user3
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2/app4 needs more resource and its well under its user-limit. Hence
// preempt resources from app1 (compare to app3, app1 has low priority).
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@Test
public void testNoIntraQueuePreemptionIfBelowUserLimitAndHighPriorityExtraUsers()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* |
* a
* </pre>
*
* Scenario:
* Preconditions:
* Queue total resources: 100
* Minimum user limit percent: 50%
* +--------------+----------+------+---------+
* | APP | USER | PRIORITY | USED | PENDING |
* +--------------+----------+------+---------+
* | app1 | user1 | 1 | 50 | 0 |
* | app2 | user2 | 1 | 50 | 0 |
* | app3 | user3 | 5 | 0 | 10 |
* +--------------+----------+------+---------+
* This scenario should never preempt from either user1 or user2
*/
// Set max preemption per round to 50% (this is different from minimum user
// limit percent).
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.7);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 10 0]);" + // root
"-a(=[100 100 100 10 0])"; // a
String appsConfig =
// queueName\t\
// (priority,resource,host,label,#repeat,reserved,pending,user)\tMULP;
"a\t(1,1,n1,,50,false,0,user1)\t50;" + // app1, user1
"a\t(1,1,n1,,50,false,0,user2)\t50;" + // app2, user2
"a\t(5,1,n1,,0,false,10,user3)\t50"; // app3, user3
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2/app4 needs more resource and its well under its user-limit. Hence
// preempt resources from app1 (compare to app3, app1 has low priority).
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@Test
public void testNoIntraQueuePreemptionWithUserLimitDeadzone()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* |
* a
* </pre>
*
* Scenario:
* Queue total resources: 100
* Minimum user limit percent: 50%
* +--------------+----------+------+---------+
* | APP | USER | PRIORITY | USED | PENDING |
* +--------------+----------+------+---------+
* | app1 | user1 | 1 | 60 | 10 |
* | app2 | user2 | 1 | 40 | 10 |
* +--------------+----------+------+---------+
* Hence in queueA of 100, each user has a quota of 50. Now have multiple
* apps and check for preemption across apps but also ensure that user's
* usage not coming under its user-limit.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 20 0]);" + // root
"-a(=[100 100 100 20 0])"; // a
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,3,n1,,20,false,10,user1);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,40,false,10,user2)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2 needs more resource and its well under its user-limit. Hence preempt
// 3 resources (9GB) from app1. We will not preempt last container as it may
// pull user's usage under its user-limit.
verify(mDisp, times(3)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@Test
public void testIntraQueuePreemptionWithUserLimitDeadzoneAndPriority()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* |
* a
* </pre>
*
* Scenario:
* Queue total resources: 100
* Minimum user limit percent: 50%
* +--------------+----------+------+---------+
* | APP | USER | PRIORITY | USED | PENDING |
* +--------------+----------+------+---------+
* | app1 | user1 | 1 | 60 | 10 |
* | app2 | user2 | 1 | 40 | 10 |
* +--------------+----------+------+---------+
* Hence in queueA of 100, each user has a quota of 50. Now have multiple
* apps and check for preemption across apps but also ensure that user's
* usage not coming under its user-limit.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 20 0]);" + // root
"-a(=[100 100 100 20 0])"; // a
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,3,n1,,20,false,10,user1);" + // app1 a
"a\t" // app2 in a
+ "(2,1,n1,,0,false,10,user1);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,40,false,20,user2)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2 needs more resource and its well under its user-limit. Hence preempt
// 3 resources (9GB) from app1. We will not preempt last container as it may
// pull user's usage under its user-limit.
verify(mDisp, times(3)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
// After first round, 3 containers were preempted from app1 and resource
// distribution will be like below.
appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,3,n1,,17,false,10,user1);" + // app1 a
"a\t" // app2 in a
+ "(2,1,n1,,0,false,10,user1);" + // app2 a
"a\t" // app2 in a
+ "(1,1,n1,,49,false,11,user2)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// app2 has priority demand within same user 'user1'. However user1's used
// is alredy under UL. Hence no preemption. We will still get 3 container
// while asserting as it was aleady selected in earlier round.
verify(mDisp, times(3)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
}

View File

@ -0,0 +1,178 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test class for IntraQueuePreemption scenarios.
*/
public class TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF
extends
ProportionalCapacityPreemptionPolicyMockFramework {
@Before
public void setup() {
super.setup();
conf.setBoolean(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
rc = new DominantResourceCalculator();
when(cs.getResourceCalculator()).thenReturn(rc);
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
}
@Test
public void testSimpleIntraQueuePreemptionWithVCoreResource()
throws IOException {
/**
* The simplest test preemption, Queue structure is:
*
* <pre>
* root
* / | | \
* a b c d
* </pre>
*
* Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
* 100 Scenario: Queue B has few running apps and two high priority apps
* have demand. Apps which are running at low priority (4) will preempt few
* of its resources to meet the demand.
*/
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100:200,true;";
String nodesConfig = // n1 has no label
"n1= res=100:200";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100:50 100:50 80:40 120:60 0]);" + // root
"-a(=[10:5 100:50 10:5 50:25 0]);" + // a
"-b(=[40:20 100:50 40:20 60:30 0]);" + // b
"-c(=[20:10 100:50 10:5 10:5 0]);" + // c
"-d(=[30:15 100:50 20:10 0 0])"; // d
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,
// pending)
"a\t" // app1 in a
+ "(1,1:1,n1,,5,false,25:25);" + // app1 a
"a\t" // app2 in a
+ "(1,1:1,n1,,5,false,25:25);" + // app2 a
"b\t" // app3 in b
+ "(4,1:1,n1,,36,false,20:20);" + // app3 b
"b\t" // app4 in b
+ "(4,1:1,n1,,2,false,10:10);" + // app4 b
"b\t" // app4 in b
+ "(5,1:1,n1,,1,false,10:10);" + // app5 b
"b\t" // app4 in b
+ "(6,1:1,n1,,1,false,10:10);" + // app6 in b
"c\t" // app1 in a
+ "(1,1:1,n1,,10,false,10:10);" + "d\t" // app7 in c
+ "(1,1:1,n1,,20,false,0)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// For queue B, app3 and app4 were of lower priority. Hence take 8
// containers from them by hitting the intraQueuePreemptionDemand of 20%.
verify(mDisp, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
verify(mDisp, times(7)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testIntraQueuePreemptionWithDominantVCoreResource()
throws IOException {
/**
* The simplest test preemption, Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Guaranteed resource of a/b are 40:60 Total cluster resource = 100
* Scenario: Queue B has few running apps and two high priority apps have
* demand. Apps which are running at low priority (4) will preempt few of
* its resources to meet the demand.
*/
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100:200,true;";
String nodesConfig = // n1 has no label
"n1= res=100:200";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100:50 100:50 50:40 110:60 0]);" + // root
"-a(=[40:20 100:50 9:9 50:30 0]);" + // a
"-b(=[60:30 100:50 40:30 60:30 0]);"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,
// pending)
"a\t" // app1 in a
+ "(1,2:1,n1,,4,false,25:25);" + // app1 a
"a\t" // app2 in a
+ "(1,1:3,n1,,2,false,25:25);" + // app2 a
"b\t" // app3 in b
+ "(4,2:1,n1,,10,false,20:20);" + // app3 b
"b\t" // app4 in b
+ "(4,1:2,n1,,5,false,10:10);" + // app4 b
"b\t" // app5 in b
+ "(5,1:1,n1,,5,false,30:20);" + // app5 b
"b\t" // app6 in b
+ "(6,2:1,n1,,5,false,30:20);";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// For queue B, app3 and app4 were of lower priority. Hence take 4
// containers.
verify(mDisp, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
verify(mDisp, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
verify(mDisp, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(5))));
}
}