YARN-7538. Fix performance regression introduced by Capacity Scheduler absolute min/max resource refactoring. (Sunil G via wangda)
Change-Id: Ic9bd7e599c56970fe01cb0e1bba6df7d1f77eb29
This commit is contained in:
parent
7462c38277
commit
b7b8cd5324
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
@ -38,17 +39,20 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
public class AbstractResourceUsage {
|
||||
protected ReadLock readLock;
|
||||
protected WriteLock writeLock;
|
||||
protected Map<String, UsageByLabel> usages;
|
||||
protected final Map<String, UsageByLabel> usages;
|
||||
private final UsageByLabel noLabelUsages;
|
||||
// short for no-label :)
|
||||
private static final String NL = CommonNodeLabelsManager.NO_LABEL;
|
||||
|
||||
public AbstractResourceUsage() {
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
readLock = lock.readLock();
|
||||
writeLock = lock.writeLock();
|
||||
|
||||
usages = new HashMap<String, UsageByLabel>();
|
||||
usages.put(NL, new UsageByLabel(NL));
|
||||
usages = new HashMap<>();
|
||||
|
||||
// For default label, avoid map for faster access.
|
||||
noLabelUsages = new UsageByLabel();
|
||||
usages.put(CommonNodeLabelsManager.NO_LABEL, noLabelUsages);
|
||||
}
|
||||
|
||||
// Usage enum here to make implement cleaner
|
||||
@ -57,41 +61,40 @@ public class AbstractResourceUsage {
|
||||
// be written by ordering policies
|
||||
USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4), CACHED_PENDING(
|
||||
5), AMLIMIT(6), MIN_RESOURCE(7), MAX_RESOURCE(8), EFF_MIN_RESOURCE(
|
||||
9), EFF_MAX_RESOURCE(
|
||||
10), EFF_MIN_RESOURCE_UP(11), EFF_MAX_RESOURCE_UP(12);
|
||||
9), EFF_MAX_RESOURCE(10);
|
||||
|
||||
private int idx;
|
||||
|
||||
private ResourceType(int value) {
|
||||
ResourceType(int value) {
|
||||
this.idx = value;
|
||||
}
|
||||
}
|
||||
|
||||
public static class UsageByLabel {
|
||||
// usage by label, contains all UsageType
|
||||
private Resource[] resArr;
|
||||
private final AtomicReferenceArray<Resource> resArr;
|
||||
|
||||
public UsageByLabel(String label) {
|
||||
resArr = new Resource[ResourceType.values().length];
|
||||
for (int i = 0; i < resArr.length; i++) {
|
||||
resArr[i] = Resource.newInstance(0, 0);
|
||||
};
|
||||
public UsageByLabel() {
|
||||
resArr = new AtomicReferenceArray<>(ResourceType.values().length);
|
||||
for (int i = 0; i < resArr.length(); i++) {
|
||||
resArr.set(i, Resource.newInstance(0, 0));
|
||||
}
|
||||
}
|
||||
|
||||
public Resource getUsed() {
|
||||
return resArr[ResourceType.USED.idx];
|
||||
return resArr.get(ResourceType.USED.idx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("{used=" + resArr[0] + "%, ");
|
||||
sb.append("pending=" + resArr[1] + "%, ");
|
||||
sb.append("am_used=" + resArr[2] + "%, ");
|
||||
sb.append("reserved=" + resArr[3] + "%}");
|
||||
sb.append("min_eff=" + resArr[9] + "%, ");
|
||||
sb.append("max_eff=" + resArr[10] + "%}");
|
||||
sb.append("min_effup=" + resArr[11] + "%, ");
|
||||
sb.append("{used=" + resArr.get(ResourceType.USED.idx) + ", ");
|
||||
sb.append("pending=" + resArr.get(ResourceType.PENDING.idx) + ", ");
|
||||
sb.append("am_used=" + resArr.get(ResourceType.AMUSED.idx) + ", ");
|
||||
sb.append("reserved=" + resArr.get(ResourceType.RESERVED.idx) + ", ");
|
||||
sb.append("min_eff=" + resArr.get(ResourceType.EFF_MIN_RESOURCE.idx) + ", ");
|
||||
sb.append(
|
||||
"max_eff=" + resArr.get(ResourceType.EFF_MAX_RESOURCE.idx) + "}");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
@ -104,8 +107,8 @@ public class AbstractResourceUsage {
|
||||
}
|
||||
|
||||
protected Resource _get(String label, ResourceType type) {
|
||||
if (label == null) {
|
||||
label = RMNodeLabelsManager.NO_LABEL;
|
||||
if (label == null || label.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||
return normalize(noLabelUsages.resArr.get(type.idx));
|
||||
}
|
||||
|
||||
try {
|
||||
@ -114,7 +117,7 @@ public class AbstractResourceUsage {
|
||||
if (null == usage) {
|
||||
return Resources.none();
|
||||
}
|
||||
return normalize(usage.resArr[type.idx]);
|
||||
return normalize(usage.resArr.get(type.idx));
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
@ -126,7 +129,7 @@ public class AbstractResourceUsage {
|
||||
Resource allOfType = Resources.createResource(0);
|
||||
for (Map.Entry<String, UsageByLabel> usageEntry : usages.entrySet()) {
|
||||
//all usages types are initialized
|
||||
Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]);
|
||||
Resources.addTo(allOfType, usageEntry.getValue().resArr.get(type.idx));
|
||||
}
|
||||
return allOfType;
|
||||
} finally {
|
||||
@ -135,11 +138,12 @@ public class AbstractResourceUsage {
|
||||
}
|
||||
|
||||
private UsageByLabel getAndAddIfMissing(String label) {
|
||||
if (label == null) {
|
||||
label = RMNodeLabelsManager.NO_LABEL;
|
||||
if (label == null || label.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||
return noLabelUsages;
|
||||
}
|
||||
|
||||
if (!usages.containsKey(label)) {
|
||||
UsageByLabel u = new UsageByLabel(label);
|
||||
UsageByLabel u = new UsageByLabel();
|
||||
usages.put(label, u);
|
||||
return u;
|
||||
}
|
||||
@ -151,7 +155,7 @@ public class AbstractResourceUsage {
|
||||
try {
|
||||
writeLock.lock();
|
||||
UsageByLabel usage = getAndAddIfMissing(label);
|
||||
usage.resArr[type.idx] = res;
|
||||
usage.resArr.set(type.idx, res);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
@ -161,7 +165,8 @@ public class AbstractResourceUsage {
|
||||
try {
|
||||
writeLock.lock();
|
||||
UsageByLabel usage = getAndAddIfMissing(label);
|
||||
Resources.addTo(usage.resArr[type.idx], res);
|
||||
usage.resArr.set(type.idx,
|
||||
Resources.add(usage.resArr.get(type.idx), res));
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
@ -171,7 +176,8 @@ public class AbstractResourceUsage {
|
||||
try {
|
||||
writeLock.lock();
|
||||
UsageByLabel usage = getAndAddIfMissing(label);
|
||||
Resources.subtractFrom(usage.resArr[type.idx], res);
|
||||
usage.resArr.set(type.idx,
|
||||
Resources.subtract(usage.resArr.get(type.idx), res));
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
@ -932,7 +932,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
queueUsage.incUsed(nodeLabel, resourceToInc);
|
||||
CSQueueUtils.updateUsedCapacity(resourceCalculator,
|
||||
labelManager.getResourceByLabel(nodeLabel, Resources.none()),
|
||||
Resources.none(), nodeLabel, this);
|
||||
nodeLabel, this);
|
||||
if (null != parent) {
|
||||
parent.incUsedResource(nodeLabel, resourceToInc, null);
|
||||
}
|
||||
@ -948,7 +948,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
queueUsage.decUsed(nodeLabel, resourceToDec);
|
||||
CSQueueUtils.updateUsedCapacity(resourceCalculator,
|
||||
labelManager.getResourceByLabel(nodeLabel, Resources.none()),
|
||||
Resources.none(), nodeLabel, this);
|
||||
nodeLabel, this);
|
||||
if (null != parent) {
|
||||
parent.decUsedResource(nodeLabel, resourceToDec, null);
|
||||
}
|
||||
|
@ -180,8 +180,8 @@ class CSQueueUtils {
|
||||
* used resource for all partitions of this queue.
|
||||
*/
|
||||
public static void updateUsedCapacity(final ResourceCalculator rc,
|
||||
final Resource totalPartitionResource, Resource clusterResource,
|
||||
String nodePartition, AbstractCSQueue childQueue) {
|
||||
final Resource totalPartitionResource, String nodePartition,
|
||||
AbstractCSQueue childQueue) {
|
||||
QueueCapacities queueCapacities = childQueue.getQueueCapacities();
|
||||
CSQueueMetrics queueMetrics = childQueue.getMetrics();
|
||||
ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
|
||||
@ -287,11 +287,11 @@ class CSQueueUtils {
|
||||
for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(),
|
||||
queueResourceUsage.getNodePartitionsSet())) {
|
||||
updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster),
|
||||
cluster, partition, childQueue);
|
||||
partition, childQueue);
|
||||
}
|
||||
} else {
|
||||
updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
|
||||
cluster, nodePartition, childQueue);
|
||||
nodePartition, childQueue);
|
||||
}
|
||||
|
||||
// Update queue metrics w.r.t node labels. In a generic way, we can
|
||||
|
@ -739,8 +739,8 @@ public class ParentQueue extends AbstractCSQueue {
|
||||
child.getQueueResourceUsage().getUsed(nodePartition));
|
||||
|
||||
// Get child's max resource
|
||||
Resource childConfiguredMaxResource = getEffectiveMaxCapacityDown(
|
||||
nodePartition, minimumAllocation);
|
||||
Resource childConfiguredMaxResource = child
|
||||
.getEffectiveMaxCapacityDown(nodePartition, minimumAllocation);
|
||||
|
||||
// Child's limit should be capped by child configured max resource
|
||||
childLimit =
|
||||
|
@ -4307,7 +4307,7 @@ public class TestCapacityScheduler {
|
||||
null, null, NULL_UPDATE_REQUESTS);
|
||||
CapacityScheduler.schedule(cs);
|
||||
}
|
||||
assertEquals("P2 Used Resource should be 7 GB", 7 * GB,
|
||||
assertEquals("P2 Used Resource should be 8 GB", 8 * GB,
|
||||
cs.getQueue("p2").getUsedResources().getMemorySize());
|
||||
|
||||
//Free a container from X1
|
||||
|
Loading…
x
Reference in New Issue
Block a user