YARN-11147. ResourceUsage and QueueCapacities classes provide node label iterators that are not thread safe

This commit is contained in:
9uapaw 2022-05-11 18:07:04 +02:00 committed by Benjamin Teke
parent 0b32c6c113
commit 54cd0174c0
8 changed files with 13 additions and 25 deletions

View File

@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.PriorityQueue; import java.util.PriorityQueue;
@ -402,8 +401,7 @@ public class FifoIntraQueuePreemptionPlugin
pending = (pending == null) ? Resources.createResource(0, 0) : pending; pending = (pending == null) ? Resources.createResource(0, 0) : pending;
reserved = (reserved == null) ? Resources.createResource(0, 0) : reserved; reserved = (reserved == null) ? Resources.createResource(0, 0) : reserved;
HashSet<String> partitions = new HashSet<String>( Set<String> partitions = app.getAppAttemptResourceUsage().getExistingNodeLabels();
app.getAppAttemptResourceUsage().getNodePartitionsSet());
partitions.addAll(app.getTotalPendingRequestsPerPartition().keySet()); partitions.addAll(app.getTotalPendingRequestsPerPartition().keySet());
// Create TempAppPerQueue for further calculation. // Create TempAppPerQueue for further calculation.

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;
@ -200,10 +201,10 @@ public class AbstractResourceUsage {
} }
} }
public Set<String> getNodePartitionsSet() { public Set<String> getExistingNodeLabels() {
readLock.lock(); readLock.lock();
try { try {
return usages.keySet(); return new HashSet<>(usages.keySet());
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }

View File

@ -1000,8 +1000,8 @@ public abstract class AbstractCSQueue implements CSQueue {
Set<String> nodeLabels = new HashSet<String>(); Set<String> nodeLabels = new HashSet<String>();
if (this.getAccessibleNodeLabels() != null && this.getAccessibleNodeLabels() if (this.getAccessibleNodeLabels() != null && this.getAccessibleNodeLabels()
.contains(RMNodeLabelsManager.ANY)) { .contains(RMNodeLabelsManager.ANY)) {
nodeLabels.addAll(Sets.union(this.getQueueCapacities().getNodePartitionsSet(), nodeLabels.addAll(Sets.union(this.getQueueCapacities().getExistingNodeLabels(),
this.getQueueResourceUsage().getNodePartitionsSet())); this.getQueueResourceUsage().getExistingNodeLabels()));
} else { } else {
nodeLabels.addAll(this.getAccessibleNodeLabels()); nodeLabels.addAll(this.getAccessibleNodeLabels());
} }

View File

@ -1748,8 +1748,8 @@ public class AbstractLeafQueue extends AbstractCSQueue {
if (nodePartition == null) { if (nodePartition == null) {
for (String partition : Sets.union( for (String partition : Sets.union(
getQueueCapacities().getNodePartitionsSet(), getQueueCapacities().getExistingNodeLabels(),
queueResourceUsage.getNodePartitionsSet())) { queueResourceUsage.getExistingNodeLabels())) {
usersManager.updateUsageRatio(partition, clusterResource); usersManager.updateUsageRatio(partition, clusterResource);
} }
} else { } else {

View File

@ -19,9 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@ -224,8 +222,8 @@ public class CSQueueUtils {
ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage(); ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
if (nodePartition == null) { if (nodePartition == null) {
for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(), for (String partition : Sets.union(queueCapacities.getExistingNodeLabels(),
queueResourceUsage.getNodePartitionsSet())) { queueResourceUsage.getExistingNodeLabels())) {
updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster), updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster),
partition, childQueue); partition, childQueue);

View File

@ -337,13 +337,4 @@ public class QueueCapacities {
readLock.unlock(); readLock.unlock();
} }
} }
public Set<String> getNodePartitionsSet() {
readLock.lock();
try {
return capacitiesMap.keySet();
} finally {
readLock.unlock();
}
}
} }

View File

@ -972,7 +972,7 @@ public class UsersManager implements AbstractUsersManager {
// Update total resource usage of active and non-active after user // Update total resource usage of active and non-active after user
// is moved from non-active to active. // is moved from non-active to active.
for (String partition : resourceUsage.getNodePartitionsSet()) { for (String partition : resourceUsage.getExistingNodeLabels()) {
totalResUsageForNonActiveUsers.decUsed(partition, totalResUsageForNonActiveUsers.decUsed(partition,
resourceUsage.getUsed(partition)); resourceUsage.getUsed(partition));
totalResUsageForActiveUsers.incUsed(partition, totalResUsageForActiveUsers.incUsed(partition,
@ -1013,7 +1013,7 @@ public class UsersManager implements AbstractUsersManager {
// Update total resource usage of active and non-active after user is // Update total resource usage of active and non-active after user is
// moved from active to non-active. // moved from active to non-active.
for (String partition : resourceUsage.getNodePartitionsSet()) { for (String partition : resourceUsage.getExistingNodeLabels()) {
totalResUsageForActiveUsers.decUsed(partition, totalResUsageForActiveUsers.decUsed(partition,
resourceUsage.getUsed(partition)); resourceUsage.getUsed(partition));
totalResUsageForNonActiveUsers.incUsed(partition, totalResUsageForNonActiveUsers.incUsed(partition,

View File

@ -43,7 +43,7 @@ public class ResourcesInfo {
if (resourceUsage == null) { if (resourceUsage == null) {
return; return;
} }
for (String partitionName : resourceUsage.getNodePartitionsSet()) { for (String partitionName : resourceUsage.getExistingNodeLabels()) {
resourceUsagesByPartition.add(new PartitionResourcesInfo(partitionName, resourceUsagesByPartition.add(new PartitionResourcesInfo(partitionName,
new ResourceInfo(resourceUsage.getUsed(partitionName)), new ResourceInfo(resourceUsage.getUsed(partitionName)),
new ResourceInfo(resourceUsage.getReserved(partitionName)), new ResourceInfo(resourceUsage.getReserved(partitionName)),