YARN-3140. Improve locks in AbstractCSQueue/LeafQueue/ParentQueue. Contributed by Wangda Tan
This commit is contained in:
parent
e52d6e7a46
commit
2b66d9ec5b
@ -564,4 +564,14 @@
|
||||
</Or>
|
||||
<Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" />
|
||||
</Match>
|
||||
|
||||
<!-- Ignore VO_VOLATILE_INCREMENT, they will be protected by writeLock -->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User" />
|
||||
<Or>
|
||||
<Field name="pendingApplications" />
|
||||
<Field name="activeApplications" />
|
||||
</Or>
|
||||
<Bug pattern="VO_VOLATILE_INCREMENT" />
|
||||
</Match>
|
||||
</FindBugsFilter>
|
||||
|
@ -24,6 +24,7 @@
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -60,25 +61,25 @@
|
||||
|
||||
public abstract class AbstractCSQueue implements CSQueue {
|
||||
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
|
||||
CSQueue parent;
|
||||
volatile CSQueue parent;
|
||||
final String queueName;
|
||||
volatile int numContainers;
|
||||
|
||||
final Resource minimumAllocation;
|
||||
volatile Resource maximumAllocation;
|
||||
QueueState state;
|
||||
volatile QueueState state;
|
||||
final CSQueueMetrics metrics;
|
||||
protected final PrivilegedEntity queueEntity;
|
||||
|
||||
final ResourceCalculator resourceCalculator;
|
||||
Set<String> accessibleLabels;
|
||||
RMNodeLabelsManager labelManager;
|
||||
final RMNodeLabelsManager labelManager;
|
||||
String defaultLabelExpression;
|
||||
|
||||
Map<AccessType, AccessControlList> acls =
|
||||
new HashMap<AccessType, AccessControlList>();
|
||||
volatile boolean reservationsContinueLooking;
|
||||
private boolean preemptionDisabled;
|
||||
private volatile boolean preemptionDisabled;
|
||||
|
||||
// Track resource usage-by-label like used-resource/pending-resource, etc.
|
||||
volatile ResourceUsage queueUsage;
|
||||
@ -94,6 +95,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
|
||||
protected ActivitiesManager activitiesManager;
|
||||
|
||||
protected ReentrantReadWriteLock.ReadLock readLock;
|
||||
protected ReentrantReadWriteLock.WriteLock writeLock;
|
||||
|
||||
public AbstractCSQueue(CapacitySchedulerContext cs,
|
||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||
this.labelManager = cs.getRMContext().getNodeLabelManager();
|
||||
@ -116,7 +120,11 @@ public AbstractCSQueue(CapacitySchedulerContext cs,
|
||||
queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath());
|
||||
|
||||
// initialize QueueCapacities
|
||||
queueCapacities = new QueueCapacities(parent == null);
|
||||
queueCapacities = new QueueCapacities(parent == null);
|
||||
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
readLock = lock.readLock();
|
||||
writeLock = lock.writeLock();
|
||||
}
|
||||
|
||||
protected void setupConfigurableCapacities() {
|
||||
@ -128,12 +136,12 @@ protected void setupConfigurableCapacities() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized float getCapacity() {
|
||||
public float getCapacity() {
|
||||
return queueCapacities.getCapacity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized float getAbsoluteCapacity() {
|
||||
public float getAbsoluteCapacity() {
|
||||
return queueCapacities.getAbsoluteCapacity();
|
||||
}
|
||||
|
||||
@ -167,7 +175,7 @@ public int getNumContainers() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized QueueState getState() {
|
||||
public QueueState getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
@ -187,13 +195,13 @@ public PrivilegedEntity getPrivilegedEntity() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized CSQueue getParent() {
|
||||
public CSQueue getParent() {
|
||||
return parent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setParent(CSQueue newParentQueue) {
|
||||
this.parent = (ParentQueue)newParentQueue;
|
||||
public void setParent(CSQueue newParentQueue) {
|
||||
this.parent = newParentQueue;
|
||||
}
|
||||
|
||||
public Set<String> getAccessibleNodeLabels() {
|
||||
@ -221,18 +229,22 @@ public void setAbsoluteUsedCapacity(float absUsedCapacity) {
|
||||
* Set maximum capacity - used only for testing.
|
||||
* @param maximumCapacity new max capacity
|
||||
*/
|
||||
synchronized void setMaxCapacity(float maximumCapacity) {
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueueName(),
|
||||
queueCapacities.getCapacity(), maximumCapacity);
|
||||
float absMaxCapacity =
|
||||
CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
|
||||
CSQueueUtils.checkAbsoluteCapacity(getQueueName(),
|
||||
queueCapacities.getAbsoluteCapacity(),
|
||||
absMaxCapacity);
|
||||
|
||||
queueCapacities.setMaximumCapacity(maximumCapacity);
|
||||
queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity);
|
||||
void setMaxCapacity(float maximumCapacity) {
|
||||
try {
|
||||
writeLock.lock();
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueueName(),
|
||||
queueCapacities.getCapacity(), maximumCapacity);
|
||||
float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(
|
||||
maximumCapacity, parent);
|
||||
CSQueueUtils.checkAbsoluteCapacity(getQueueName(),
|
||||
queueCapacities.getAbsoluteCapacity(), absMaxCapacity);
|
||||
|
||||
queueCapacities.setMaximumCapacity(maximumCapacity);
|
||||
queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -240,70 +252,82 @@ public String getDefaultNodeLabelExpression() {
|
||||
return defaultLabelExpression;
|
||||
}
|
||||
|
||||
synchronized void setupQueueConfigs(Resource clusterResource)
|
||||
void setupQueueConfigs(Resource clusterResource)
|
||||
throws IOException {
|
||||
// get labels
|
||||
this.accessibleLabels =
|
||||
csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath());
|
||||
this.defaultLabelExpression = csContext.getConfiguration()
|
||||
.getDefaultNodeLabelExpression(getQueuePath());
|
||||
try {
|
||||
writeLock.lock();
|
||||
// get labels
|
||||
this.accessibleLabels =
|
||||
csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath());
|
||||
this.defaultLabelExpression =
|
||||
csContext.getConfiguration().getDefaultNodeLabelExpression(
|
||||
getQueuePath());
|
||||
|
||||
// inherit from parent if labels not set
|
||||
if (this.accessibleLabels == null && parent != null) {
|
||||
this.accessibleLabels = parent.getAccessibleNodeLabels();
|
||||
}
|
||||
|
||||
// inherit from parent if labels not set
|
||||
if (this.defaultLabelExpression == null && parent != null
|
||||
&& this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) {
|
||||
this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
|
||||
}
|
||||
// inherit from parent if labels not set
|
||||
if (this.accessibleLabels == null && parent != null) {
|
||||
this.accessibleLabels = parent.getAccessibleNodeLabels();
|
||||
}
|
||||
|
||||
// After we setup labels, we can setup capacities
|
||||
setupConfigurableCapacities();
|
||||
|
||||
this.maximumAllocation =
|
||||
csContext.getConfiguration().getMaximumAllocationPerQueue(
|
||||
getQueuePath());
|
||||
|
||||
authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
|
||||
|
||||
this.state = csContext.getConfiguration().getState(getQueuePath());
|
||||
this.acls = csContext.getConfiguration().getAcls(getQueuePath());
|
||||
// inherit from parent if labels not set
|
||||
if (this.defaultLabelExpression == null && parent != null
|
||||
&& this.accessibleLabels.containsAll(
|
||||
parent.getAccessibleNodeLabels())) {
|
||||
this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
|
||||
}
|
||||
|
||||
// Update metrics
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, null);
|
||||
|
||||
// Check if labels of this queue is a subset of parent queue, only do this
|
||||
// when we not root
|
||||
if (parent != null && parent.getParent() != null) {
|
||||
if (parent.getAccessibleNodeLabels() != null
|
||||
&& !parent.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
|
||||
// if parent isn't "*", child shouldn't be "*" too
|
||||
if (this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
|
||||
throw new IOException("Parent's accessible queue is not ANY(*), "
|
||||
+ "but child's accessible queue is *");
|
||||
} else {
|
||||
Set<String> diff =
|
||||
Sets.difference(this.getAccessibleNodeLabels(),
|
||||
parent.getAccessibleNodeLabels());
|
||||
if (!diff.isEmpty()) {
|
||||
throw new IOException("Some labels of child queue is not a subset "
|
||||
+ "of parent queue, these labels=["
|
||||
+ StringUtils.join(diff, ",") + "]");
|
||||
// After we setup labels, we can setup capacities
|
||||
setupConfigurableCapacities();
|
||||
|
||||
this.maximumAllocation =
|
||||
csContext.getConfiguration().getMaximumAllocationPerQueue(
|
||||
getQueuePath());
|
||||
|
||||
authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
|
||||
|
||||
this.state = csContext.getConfiguration().getState(getQueuePath());
|
||||
this.acls = csContext.getConfiguration().getAcls(getQueuePath());
|
||||
|
||||
// Update metrics
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, null);
|
||||
|
||||
// Check if labels of this queue is a subset of parent queue, only do this
|
||||
// when we not root
|
||||
if (parent != null && parent.getParent() != null) {
|
||||
if (parent.getAccessibleNodeLabels() != null && !parent
|
||||
.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
|
||||
// if parent isn't "*", child shouldn't be "*" too
|
||||
if (this.getAccessibleNodeLabels().contains(
|
||||
RMNodeLabelsManager.ANY)) {
|
||||
throw new IOException("Parent's accessible queue is not ANY(*), "
|
||||
+ "but child's accessible queue is *");
|
||||
} else{
|
||||
Set<String> diff = Sets.difference(this.getAccessibleNodeLabels(),
|
||||
parent.getAccessibleNodeLabels());
|
||||
if (!diff.isEmpty()) {
|
||||
throw new IOException(
|
||||
"Some labels of child queue is not a subset "
|
||||
+ "of parent queue, these labels=[" + StringUtils
|
||||
.join(diff, ",") + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.reservationsContinueLooking =
|
||||
csContext.getConfiguration().getReservationContinueLook();
|
||||
|
||||
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
||||
this.reservationsContinueLooking = csContext.getConfiguration()
|
||||
.getReservationContinueLook();
|
||||
|
||||
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
|
||||
}
|
||||
|
||||
|
||||
protected QueueInfo getQueueInfo() {
|
||||
// Deliberately doesn't use lock here, because this method will be invoked
|
||||
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice
|
||||
// consistency here.
|
||||
// TODO, improve this
|
||||
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
|
||||
queueInfo.setQueueName(queueName);
|
||||
queueInfo.setAccessibleNodeLabels(accessibleLabels);
|
||||
@ -318,8 +342,12 @@ protected QueueInfo getQueueInfo() {
|
||||
}
|
||||
|
||||
public QueueStatistics getQueueStatistics() {
|
||||
QueueStatistics stats =
|
||||
recordFactory.newRecordInstance(QueueStatistics.class);
|
||||
// Deliberately doesn't use lock here, because this method will be invoked
|
||||
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice
|
||||
// consistency here.
|
||||
// TODO, improve this
|
||||
QueueStatistics stats = recordFactory.newRecordInstance(
|
||||
QueueStatistics.class);
|
||||
stats.setNumAppsSubmitted(getMetrics().getAppsSubmitted());
|
||||
stats.setNumAppsRunning(getMetrics().getAppsRunning());
|
||||
stats.setNumAppsPending(getMetrics().getAppsPending());
|
||||
@ -351,26 +379,36 @@ public Resource getMinimumAllocation() {
|
||||
return minimumAllocation;
|
||||
}
|
||||
|
||||
synchronized void allocateResource(Resource clusterResource,
|
||||
void allocateResource(Resource clusterResource,
|
||||
Resource resource, String nodePartition, boolean changeContainerResource) {
|
||||
queueUsage.incUsed(nodePartition, resource);
|
||||
try {
|
||||
writeLock.lock();
|
||||
queueUsage.incUsed(nodePartition, resource);
|
||||
|
||||
if (!changeContainerResource) {
|
||||
++numContainers;
|
||||
if (!changeContainerResource) {
|
||||
++numContainers;
|
||||
}
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, nodePartition);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, nodePartition);
|
||||
}
|
||||
|
||||
protected synchronized void releaseResource(Resource clusterResource,
|
||||
protected void releaseResource(Resource clusterResource,
|
||||
Resource resource, String nodePartition, boolean changeContainerResource) {
|
||||
queueUsage.decUsed(nodePartition, resource);
|
||||
try {
|
||||
writeLock.lock();
|
||||
queueUsage.decUsed(nodePartition, resource);
|
||||
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, nodePartition);
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, nodePartition);
|
||||
|
||||
if (!changeContainerResource) {
|
||||
--numContainers;
|
||||
if (!changeContainerResource) {
|
||||
--numContainers;
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -381,7 +419,13 @@ public boolean getReservationContinueLooking() {
|
||||
|
||||
@Private
|
||||
public Map<AccessType, AccessControlList> getACLs() {
|
||||
return acls;
|
||||
try {
|
||||
readLock.lock();
|
||||
return acls;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Private
|
||||
@ -464,86 +508,88 @@ Resource getQueueMaxResource(String nodePartition, Resource clusterResource) {
|
||||
minimumAllocation);
|
||||
}
|
||||
|
||||
synchronized boolean canAssignToThisQueue(Resource clusterResource,
|
||||
boolean canAssignToThisQueue(Resource clusterResource,
|
||||
String nodePartition, ResourceLimits currentResourceLimits,
|
||||
Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) {
|
||||
// Get current limited resource:
|
||||
// - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
|
||||
// queues' max capacity.
|
||||
// - When doing IGNORE_PARTITION_EXCLUSIVITY allocation, we will not respect
|
||||
// queue's max capacity, queue's max capacity on the partition will be
|
||||
// considered to be 100%. Which is a queue can use all resource in the
|
||||
// partition.
|
||||
// Doing this because: for non-exclusive allocation, we make sure there's
|
||||
// idle resource on the partition, to avoid wastage, such resource will be
|
||||
// leveraged as much as we can, and preemption policy will reclaim it back
|
||||
// when partitoned-resource-request comes back.
|
||||
Resource currentLimitResource =
|
||||
getCurrentLimitResource(nodePartition, clusterResource,
|
||||
currentResourceLimits, schedulingMode);
|
||||
try {
|
||||
readLock.lock();
|
||||
// Get current limited resource:
|
||||
// - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
|
||||
// queues' max capacity.
|
||||
// - When doing IGNORE_PARTITION_EXCLUSIVITY allocation, we will not respect
|
||||
// queue's max capacity, queue's max capacity on the partition will be
|
||||
// considered to be 100%. Which is a queue can use all resource in the
|
||||
// partition.
|
||||
// Doing this because: for non-exclusive allocation, we make sure there's
|
||||
// idle resource on the partition, to avoid wastage, such resource will be
|
||||
// leveraged as much as we can, and preemption policy will reclaim it back
|
||||
// when partitoned-resource-request comes back.
|
||||
Resource currentLimitResource = getCurrentLimitResource(nodePartition,
|
||||
clusterResource, currentResourceLimits, schedulingMode);
|
||||
|
||||
Resource nowTotalUsed = queueUsage.getUsed(nodePartition);
|
||||
Resource nowTotalUsed = queueUsage.getUsed(nodePartition);
|
||||
|
||||
// Set headroom for currentResourceLimits:
|
||||
// When queue is a parent queue: Headroom = limit - used + killable
|
||||
// When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself)
|
||||
Resource usedExceptKillable = nowTotalUsed;
|
||||
if (null != getChildQueues() && !getChildQueues().isEmpty()) {
|
||||
usedExceptKillable = Resources.subtract(nowTotalUsed,
|
||||
getTotalKillableResource(nodePartition));
|
||||
}
|
||||
currentResourceLimits.setHeadroom(
|
||||
Resources.subtract(currentLimitResource, usedExceptKillable));
|
||||
// Set headroom for currentResourceLimits:
|
||||
// When queue is a parent queue: Headroom = limit - used + killable
|
||||
// When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself)
|
||||
Resource usedExceptKillable = nowTotalUsed;
|
||||
if (null != getChildQueues() && !getChildQueues().isEmpty()) {
|
||||
usedExceptKillable = Resources.subtract(nowTotalUsed,
|
||||
getTotalKillableResource(nodePartition));
|
||||
}
|
||||
currentResourceLimits.setHeadroom(
|
||||
Resources.subtract(currentLimitResource, usedExceptKillable));
|
||||
|
||||
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
|
||||
usedExceptKillable, currentLimitResource)) {
|
||||
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
|
||||
usedExceptKillable, currentLimitResource)) {
|
||||
|
||||
// if reservation continous looking enabled, check to see if could we
|
||||
// potentially use this node instead of a reserved node if the application
|
||||
// has reserved containers.
|
||||
// TODO, now only consider reservation cases when the node has no label
|
||||
if (this.reservationsContinueLooking
|
||||
&& nodePartition.equals(RMNodeLabelsManager.NO_LABEL)
|
||||
&& Resources.greaterThan(resourceCalculator, clusterResource,
|
||||
resourceCouldBeUnreserved, Resources.none())) {
|
||||
// resource-without-reserved = used - reserved
|
||||
Resource newTotalWithoutReservedResource =
|
||||
Resources.subtract(usedExceptKillable, resourceCouldBeUnreserved);
|
||||
// if reservation continous looking enabled, check to see if could we
|
||||
// potentially use this node instead of a reserved node if the application
|
||||
// has reserved containers.
|
||||
// TODO, now only consider reservation cases when the node has no label
|
||||
if (this.reservationsContinueLooking && nodePartition.equals(
|
||||
RMNodeLabelsManager.NO_LABEL) && Resources.greaterThan(
|
||||
resourceCalculator, clusterResource, resourceCouldBeUnreserved,
|
||||
Resources.none())) {
|
||||
// resource-without-reserved = used - reserved
|
||||
Resource newTotalWithoutReservedResource = Resources.subtract(
|
||||
usedExceptKillable, resourceCouldBeUnreserved);
|
||||
|
||||
// when total-used-without-reserved-resource < currentLimit, we still
|
||||
// have chance to allocate on this node by unreserving some containers
|
||||
if (Resources.lessThan(resourceCalculator, clusterResource,
|
||||
newTotalWithoutReservedResource, currentLimitResource)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("try to use reserved: " + getQueueName()
|
||||
+ " usedResources: " + queueUsage.getUsed()
|
||||
+ ", clusterResources: " + clusterResource
|
||||
+ ", reservedResources: " + resourceCouldBeUnreserved
|
||||
+ ", capacity-without-reserved: "
|
||||
+ newTotalWithoutReservedResource + ", maxLimitCapacity: "
|
||||
+ currentLimitResource);
|
||||
// when total-used-without-reserved-resource < currentLimit, we still
|
||||
// have chance to allocate on this node by unreserving some containers
|
||||
if (Resources.lessThan(resourceCalculator, clusterResource,
|
||||
newTotalWithoutReservedResource, currentLimitResource)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"try to use reserved: " + getQueueName() + " usedResources: "
|
||||
+ queueUsage.getUsed() + ", clusterResources: "
|
||||
+ clusterResource + ", reservedResources: "
|
||||
+ resourceCouldBeUnreserved
|
||||
+ ", capacity-without-reserved: "
|
||||
+ newTotalWithoutReservedResource + ", maxLimitCapacity: "
|
||||
+ currentLimitResource);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getQueueName() + "Check assign to queue, nodePartition="
|
||||
+ nodePartition + " usedResources: " + queueUsage
|
||||
.getUsed(nodePartition) + " clusterResources: " + clusterResource
|
||||
+ " currentUsedCapacity " + Resources
|
||||
.divide(resourceCalculator, clusterResource,
|
||||
queueUsage.getUsed(nodePartition), labelManager
|
||||
.getResourceByLabel(nodePartition, clusterResource))
|
||||
+ " max-capacity: " + queueCapacities
|
||||
.getAbsoluteMaximumCapacity(nodePartition) + ")");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getQueueName()
|
||||
+ "Check assign to queue, nodePartition="
|
||||
+ nodePartition
|
||||
+ " usedResources: "
|
||||
+ queueUsage.getUsed(nodePartition)
|
||||
+ " clusterResources: "
|
||||
+ clusterResource
|
||||
+ " currentUsedCapacity "
|
||||
+ Resources.divide(resourceCalculator, clusterResource,
|
||||
queueUsage.getUsed(nodePartition),
|
||||
labelManager.getResourceByLabel(nodePartition, clusterResource))
|
||||
+ " max-capacity: "
|
||||
+ queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")");
|
||||
}
|
||||
return false;
|
||||
return true;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
return true;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -79,76 +79,98 @@ public PlanQueue(CapacitySchedulerContext cs, String queueName,
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void reinitialize(CSQueue newlyParsedQueue,
|
||||
public void reinitialize(CSQueue newlyParsedQueue,
|
||||
Resource clusterResource) throws IOException {
|
||||
// Sanity check
|
||||
if (!(newlyParsedQueue instanceof PlanQueue)
|
||||
|| !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
|
||||
throw new IOException("Trying to reinitialize " + getQueuePath()
|
||||
+ " from " + newlyParsedQueue.getQueuePath());
|
||||
}
|
||||
try {
|
||||
writeLock.lock();
|
||||
// Sanity check
|
||||
if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
|
||||
.getQueuePath().equals(getQueuePath())) {
|
||||
throw new IOException(
|
||||
"Trying to reinitialize " + getQueuePath() + " from "
|
||||
+ newlyParsedQueue.getQueuePath());
|
||||
}
|
||||
|
||||
PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
|
||||
PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
|
||||
|
||||
if (newlyParsedParentQueue.getChildQueues().size() > 0) {
|
||||
throw new IOException(
|
||||
"Reservable Queue should not have sub-queues in the"
|
||||
+ "configuration");
|
||||
}
|
||||
if (newlyParsedParentQueue.getChildQueues().size() > 0) {
|
||||
throw new IOException(
|
||||
"Reservable Queue should not have sub-queues in the"
|
||||
+ "configuration");
|
||||
}
|
||||
|
||||
// Set new configs
|
||||
setupQueueConfigs(clusterResource);
|
||||
// Set new configs
|
||||
setupQueueConfigs(clusterResource);
|
||||
|
||||
updateQuotas(newlyParsedParentQueue.userLimit,
|
||||
newlyParsedParentQueue.userLimitFactor,
|
||||
newlyParsedParentQueue.maxAppsForReservation,
|
||||
newlyParsedParentQueue.maxAppsPerUserForReservation);
|
||||
updateQuotas(newlyParsedParentQueue.userLimit,
|
||||
newlyParsedParentQueue.userLimitFactor,
|
||||
newlyParsedParentQueue.maxAppsForReservation,
|
||||
newlyParsedParentQueue.maxAppsPerUserForReservation);
|
||||
|
||||
// run reinitialize on each existing queue, to trigger absolute cap
|
||||
// recomputations
|
||||
for (CSQueue res : this.getChildQueues()) {
|
||||
res.reinitialize(res, clusterResource);
|
||||
}
|
||||
showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues;
|
||||
}
|
||||
|
||||
synchronized void addChildQueue(CSQueue newQueue)
|
||||
throws SchedulerDynamicEditException {
|
||||
if (newQueue.getCapacity() > 0) {
|
||||
throw new SchedulerDynamicEditException("Queue " + newQueue
|
||||
+ " being added has non zero capacity.");
|
||||
}
|
||||
boolean added = this.childQueues.add(newQueue);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("updateChildQueues (action: add queue): " + added + " "
|
||||
+ getChildQueuesToPrint());
|
||||
// run reinitialize on each existing queue, to trigger absolute cap
|
||||
// recomputations
|
||||
for (CSQueue res : this.getChildQueues()) {
|
||||
res.reinitialize(res, clusterResource);
|
||||
}
|
||||
showReservationsAsQueues =
|
||||
newlyParsedParentQueue.showReservationsAsQueues;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void removeChildQueue(CSQueue remQueue)
|
||||
void addChildQueue(CSQueue newQueue)
|
||||
throws SchedulerDynamicEditException {
|
||||
if (remQueue.getCapacity() > 0) {
|
||||
throw new SchedulerDynamicEditException("Queue " + remQueue
|
||||
+ " being removed has non zero capacity.");
|
||||
try {
|
||||
writeLock.lock();
|
||||
if (newQueue.getCapacity() > 0) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Queue " + newQueue + " being added has non zero capacity.");
|
||||
}
|
||||
boolean added = this.childQueues.add(newQueue);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("updateChildQueues (action: add queue): " + added + " "
|
||||
+ getChildQueuesToPrint());
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
Iterator<CSQueue> qiter = childQueues.iterator();
|
||||
while (qiter.hasNext()) {
|
||||
CSQueue cs = qiter.next();
|
||||
if (cs.equals(remQueue)) {
|
||||
qiter.remove();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removed child queue: {}", cs.getQueueName());
|
||||
}
|
||||
|
||||
void removeChildQueue(CSQueue remQueue)
|
||||
throws SchedulerDynamicEditException {
|
||||
try {
|
||||
writeLock.lock();
|
||||
if (remQueue.getCapacity() > 0) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Queue " + remQueue + " being removed has non zero capacity.");
|
||||
}
|
||||
Iterator<CSQueue> qiter = childQueues.iterator();
|
||||
while (qiter.hasNext()) {
|
||||
CSQueue cs = qiter.next();
|
||||
if (cs.equals(remQueue)) {
|
||||
qiter.remove();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removed child queue: {}", cs.getQueueName());
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized float sumOfChildCapacities() {
|
||||
float ret = 0;
|
||||
for (CSQueue l : childQueues) {
|
||||
ret += l.getCapacity();
|
||||
protected float sumOfChildCapacities() {
|
||||
try {
|
||||
writeLock.lock();
|
||||
float ret = 0;
|
||||
for (CSQueue l : childQueues) {
|
||||
ret += l.getCapacity();
|
||||
}
|
||||
return ret;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private void updateQuotas(int userLimit, float userLimitFactor,
|
||||
|
@ -51,22 +51,28 @@ public ReservationQueue(CapacitySchedulerContext cs, String queueName,
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void reinitialize(CSQueue newlyParsedQueue,
|
||||
public void reinitialize(CSQueue newlyParsedQueue,
|
||||
Resource clusterResource) throws IOException {
|
||||
// Sanity check
|
||||
if (!(newlyParsedQueue instanceof ReservationQueue)
|
||||
|| !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
|
||||
throw new IOException("Trying to reinitialize " + getQueuePath()
|
||||
+ " from " + newlyParsedQueue.getQueuePath());
|
||||
}
|
||||
super.reinitialize(newlyParsedQueue, clusterResource);
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, null);
|
||||
try {
|
||||
writeLock.lock();
|
||||
// Sanity check
|
||||
if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue
|
||||
.getQueuePath().equals(getQueuePath())) {
|
||||
throw new IOException(
|
||||
"Trying to reinitialize " + getQueuePath() + " from "
|
||||
+ newlyParsedQueue.getQueuePath());
|
||||
}
|
||||
super.reinitialize(newlyParsedQueue, clusterResource);
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, null);
|
||||
|
||||
updateQuotas(parent.getUserLimitForReservation(),
|
||||
parent.getUserLimitFactor(),
|
||||
parent.getMaxApplicationsForReservations(),
|
||||
parent.getMaxApplicationsPerUserForReservation());
|
||||
updateQuotas(parent.getUserLimitForReservation(),
|
||||
parent.getUserLimitFactor(),
|
||||
parent.getMaxApplicationsForReservations(),
|
||||
parent.getMaxApplicationsPerUserForReservation());
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -77,21 +83,26 @@ public synchronized void reinitialize(CSQueue newlyParsedQueue,
|
||||
* maxCapacity, etc..)
|
||||
* @throws SchedulerDynamicEditException
|
||||
*/
|
||||
public synchronized void setEntitlement(QueueEntitlement entitlement)
|
||||
public void setEntitlement(QueueEntitlement entitlement)
|
||||
throws SchedulerDynamicEditException {
|
||||
float capacity = entitlement.getCapacity();
|
||||
if (capacity < 0 || capacity > 1.0f) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Capacity demand is not in the [0,1] range: " + capacity);
|
||||
}
|
||||
setCapacity(capacity);
|
||||
setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity());
|
||||
// note: we currently set maxCapacity to capacity
|
||||
// this might be revised later
|
||||
setMaxCapacity(entitlement.getMaxCapacity());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("successfully changed to " + capacity + " for queue "
|
||||
+ this.getQueueName());
|
||||
try {
|
||||
writeLock.lock();
|
||||
float capacity = entitlement.getCapacity();
|
||||
if (capacity < 0 || capacity > 1.0f) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Capacity demand is not in the [0,1] range: " + capacity);
|
||||
}
|
||||
setCapacity(capacity);
|
||||
setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity());
|
||||
// note: we currently set maxCapacity to capacity
|
||||
// this might be revised later
|
||||
setMaxCapacity(entitlement.getMaxCapacity());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("successfully changed to " + capacity + " for queue " + this
|
||||
.getQueueName());
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -828,8 +828,8 @@ public RMNodeLabelsManager createNodeLabelManager() {
|
||||
app.getAppAttemptResourceUsage().getPending().getMemorySize());
|
||||
// Queue/user/application's usage will be updated
|
||||
checkUsedResource(rm1, "default", 0 * GB, null);
|
||||
Assert.assertEquals(0 * GB, ((LeafQueue) cs.getQueue("default"))
|
||||
.getUser("user").getUsed().getMemorySize());
|
||||
// User will be removed
|
||||
Assert.assertNull(((LeafQueue) cs.getQueue("default")).getUser("user"));
|
||||
Assert.assertEquals(0 * GB,
|
||||
app.getAppAttemptResourceUsage().getReserved().getMemorySize());
|
||||
Assert.assertEquals(0 * GB,
|
||||
|
Loading…
Reference in New Issue
Block a user