YARN-7784. Fix Cluster metrics when placement processor is enabled. (asuresh)
This commit is contained in:
parent
c23980c4f2
commit
f8c5f5b237
@ -694,6 +694,12 @@ public class AppSchedulingInfo {
|
|||||||
metrics.runAppAttempt(applicationId, user);
|
metrics.runAppAttempt(applicationId, user);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
updateMetrics(applicationId, type, node, containerAllocated, user, queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void updateMetrics(ApplicationId applicationId, NodeType type,
|
||||||
|
SchedulerNode node, Container containerAllocated, String user,
|
||||||
|
Queue queue) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("allocate: applicationId=" + applicationId + " container="
|
LOG.debug("allocate: applicationId=" + applicationId + " container="
|
||||||
+ containerAllocated.getId() + " host=" + containerAllocated
|
+ containerAllocated.getId() + " host=" + containerAllocated
|
||||||
@ -702,10 +708,10 @@ public class AppSchedulingInfo {
|
|||||||
+ type);
|
+ type);
|
||||||
}
|
}
|
||||||
if(node != null) {
|
if(node != null) {
|
||||||
metrics.allocateResources(node.getPartition(), user, 1,
|
queue.getMetrics().allocateResources(node.getPartition(), user, 1,
|
||||||
containerAllocated.getResource(), true);
|
containerAllocated.getResource(), true);
|
||||||
}
|
}
|
||||||
metrics.incrNodeTypeAggregations(user, type);
|
queue.getMetrics().incrNodeTypeAggregations(user, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get AppPlacementAllocator by specified schedulerKey
|
// Get AppPlacementAllocator by specified schedulerKey
|
||||||
|
@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRese
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
@ -548,6 +549,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||||||
((RMContainerImpl) rmContainer).setAllocationTags(
|
((RMContainerImpl) rmContainer).setAllocationTags(
|
||||||
containerRequest.getSchedulingRequest().getAllocationTags());
|
containerRequest.getSchedulingRequest().getAllocationTags());
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
AppSchedulingInfo.updateMetrics(getApplicationId(),
|
||||||
|
allocation.getAllocationLocalityType(),
|
||||||
|
schedulerContainer.getSchedulerNode(),
|
||||||
|
schedulerContainer.getRmContainer().getContainer(), getUser(),
|
||||||
|
getQueue());
|
||||||
}
|
}
|
||||||
|
|
||||||
attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
|
attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
|
||||||
|
@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
@ -148,6 +149,10 @@ public class TestPlacementProcessor {
|
|||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
// Ensure unique nodes (antiaffinity)
|
// Ensure unique nodes (antiaffinity)
|
||||||
Assert.assertEquals(4, nodeIds.size());
|
Assert.assertEquals(4, nodeIds.size());
|
||||||
|
|
||||||
|
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
|
||||||
|
// Verify Metrics
|
||||||
|
verifyMetrics(metrics, 11264, 11, 5120, 5, 5);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
@ -197,6 +202,10 @@ public class TestPlacementProcessor {
|
|||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
// Ensure unique nodes (antiaffinity)
|
// Ensure unique nodes (antiaffinity)
|
||||||
Assert.assertEquals(5, nodeIds.size());
|
Assert.assertEquals(5, nodeIds.size());
|
||||||
|
|
||||||
|
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
|
||||||
|
// Verify Metrics
|
||||||
|
verifyMetrics(metrics, 14336, 14, 6144, 6, 6);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
@ -245,6 +254,10 @@ public class TestPlacementProcessor {
|
|||||||
for (NodeId n : nodeIdContainerIdMap.keySet()) {
|
for (NodeId n : nodeIdContainerIdMap.keySet()) {
|
||||||
Assert.assertTrue(nodeIdContainerIdMap.get(n) < 5);
|
Assert.assertTrue(nodeIdContainerIdMap.get(n) < 5);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
|
||||||
|
// Verify Metrics
|
||||||
|
verifyMetrics(metrics, 23552, 23, 9216, 9, 9);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
@ -288,6 +301,10 @@ public class TestPlacementProcessor {
|
|||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
// Ensure all containers end up on the same node (affinity)
|
// Ensure all containers end up on the same node (affinity)
|
||||||
Assert.assertEquals(1, nodeIds.size());
|
Assert.assertEquals(1, nodeIds.size());
|
||||||
|
|
||||||
|
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
|
||||||
|
// Verify Metrics
|
||||||
|
verifyMetrics(metrics, 26624, 26, 6144, 6, 6);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
@ -340,6 +357,10 @@ public class TestPlacementProcessor {
|
|||||||
for (NodeId n : nodeIdContainerIdMap.keySet()) {
|
for (NodeId n : nodeIdContainerIdMap.keySet()) {
|
||||||
Assert.assertTrue(nodeIdContainerIdMap.get(n) < 4);
|
Assert.assertTrue(nodeIdContainerIdMap.get(n) < 4);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
|
||||||
|
// Verify Metrics
|
||||||
|
verifyMetrics(metrics, 9216, 9, 7168, 7, 7);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
@ -407,6 +428,10 @@ public class TestPlacementProcessor {
|
|||||||
Assert.assertEquals(4, rej.getRequest().getAllocationRequestId());
|
Assert.assertEquals(4, rej.getRequest().getAllocationRequestId());
|
||||||
Assert.assertEquals(RejectionReason.COULD_NOT_SCHEDULE_ON_NODE,
|
Assert.assertEquals(RejectionReason.COULD_NOT_SCHEDULE_ON_NODE,
|
||||||
rej.getReason());
|
rej.getReason());
|
||||||
|
|
||||||
|
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
|
||||||
|
// Verify Metrics
|
||||||
|
verifyMetrics(metrics, 12288, 12, 4096, 4, 4);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
@ -490,6 +515,10 @@ public class TestPlacementProcessor {
|
|||||||
.map(x -> x.getNodeId()).collect(Collectors.toSet());
|
.map(x -> x.getNodeId()).collect(Collectors.toSet());
|
||||||
// Ensure unique nodes
|
// Ensure unique nodes
|
||||||
Assert.assertEquals(4, nodeIds.size());
|
Assert.assertEquals(4, nodeIds.size());
|
||||||
|
|
||||||
|
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
|
||||||
|
// Verify Metrics
|
||||||
|
verifyMetrics(metrics, 15360, 19, 9216, 5, 5);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
@ -557,6 +586,10 @@ public class TestPlacementProcessor {
|
|||||||
RejectedSchedulingRequest rej = rejectedReqs.get(0);
|
RejectedSchedulingRequest rej = rejectedReqs.get(0);
|
||||||
Assert.assertEquals(RejectionReason.COULD_NOT_PLACE_ON_NODE,
|
Assert.assertEquals(RejectionReason.COULD_NOT_PLACE_ON_NODE,
|
||||||
rej.getReason());
|
rej.getReason());
|
||||||
|
|
||||||
|
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
|
||||||
|
// Verify Metrics
|
||||||
|
verifyMetrics(metrics, 11264, 11, 5120, 5, 5);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void waitForContainerAllocation(Collection<MockNM> nodes,
|
private static void waitForContainerAllocation(Collection<MockNM> nodes,
|
||||||
@ -594,4 +627,16 @@ public class TestPlacementProcessor {
|
|||||||
ResourceSizing.newInstance(1, Resource.newInstance(mem, cores)))
|
ResourceSizing.newInstance(1, Resource.newInstance(mem, cores)))
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void verifyMetrics(QueueMetrics metrics, long availableMB,
|
||||||
|
int availableVirtualCores, long allocatedMB,
|
||||||
|
int allocatedVirtualCores, int allocatedContainers) {
|
||||||
|
Assert.assertEquals(availableMB, metrics.getAvailableMB());
|
||||||
|
Assert.assertEquals(availableVirtualCores,
|
||||||
|
metrics.getAvailableVirtualCores());
|
||||||
|
Assert.assertEquals(allocatedMB, metrics.getAllocatedMB());
|
||||||
|
Assert.assertEquals(allocatedVirtualCores,
|
||||||
|
metrics.getAllocatedVirtualCores());
|
||||||
|
Assert.assertEquals(allocatedContainers, metrics.getAllocatedContainers());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user