YARN-9169. Add metrics for queued opportunistic and guaranteed containers. Contributed by Abhishek Modi.
This commit is contained in:
parent
0f26b5efaa
commit
489411579c
@ -199,6 +199,8 @@ public void handle(ContainerSchedulerEvent event) {
|
|||||||
break;
|
break;
|
||||||
case RECOVERY_COMPLETED:
|
case RECOVERY_COMPLETED:
|
||||||
startPendingContainers(maxOppQueueLength <= 0);
|
startPendingContainers(maxOppQueueLength <= 0);
|
||||||
|
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
|
||||||
|
queuedGuaranteedContainers.size());
|
||||||
default:
|
default:
|
||||||
LOG.error("Unknown event arrived at ContainerScheduler: "
|
LOG.error("Unknown event arrived at ContainerScheduler: "
|
||||||
+ event.toString());
|
+ event.toString());
|
||||||
@ -252,6 +254,8 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
|
|||||||
"continer update of %s", containerId), ex);
|
"continer update of %s", containerId), ex);
|
||||||
}
|
}
|
||||||
startPendingContainers(maxOppQueueLength <= 0);
|
startPendingContainers(maxOppQueueLength <= 0);
|
||||||
|
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
|
||||||
|
queuedGuaranteedContainers.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -277,6 +281,8 @@ public void recoverActiveContainer(Container container,
|
|||||||
"UnKnown execution type received " + container.getContainerId()
|
"UnKnown execution type received " + container.getContainerId()
|
||||||
+ ", execType " + execType);
|
+ ", execType " + execType);
|
||||||
}
|
}
|
||||||
|
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
|
||||||
|
queuedGuaranteedContainers.size());
|
||||||
} else if (rcs.getStatus() == RecoveredContainerStatus.LAUNCHED) {
|
} else if (rcs.getStatus() == RecoveredContainerStatus.LAUNCHED) {
|
||||||
runningContainers.put(container.getContainerId(), container);
|
runningContainers.put(container.getContainerId(), container);
|
||||||
utilizationTracker.addContainerResources(container);
|
utilizationTracker.addContainerResources(container);
|
||||||
@ -378,6 +384,8 @@ private void onResourcesReclaimed(Container container) {
|
|||||||
boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
|
boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
|
||||||
startPendingContainers(forceStartGuaranteedContainers);
|
startPendingContainers(forceStartGuaranteedContainers);
|
||||||
}
|
}
|
||||||
|
this.metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
|
||||||
|
queuedGuaranteedContainers.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -508,6 +516,8 @@ protected void scheduleContainer(Container container) {
|
|||||||
startPendingContainers(false);
|
startPendingContainers(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
|
||||||
|
queuedGuaranteedContainers.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@ -662,6 +672,8 @@ private void shedQueuedOpportunisticContainers() {
|
|||||||
numAllowed--;
|
numAllowed--;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
this.metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
|
||||||
|
queuedGuaranteedContainers.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
public ContainersMonitor getContainersMonitor() {
|
public ContainersMonitor getContainersMonitor() {
|
||||||
|
@ -54,6 +54,12 @@ public class NodeManagerMetrics {
|
|||||||
@Metric MutableGaugeInt availableVCores;
|
@Metric MutableGaugeInt availableVCores;
|
||||||
@Metric("Container launch duration")
|
@Metric("Container launch duration")
|
||||||
MutableRate containerLaunchDuration;
|
MutableRate containerLaunchDuration;
|
||||||
|
|
||||||
|
@Metric("Containers queued (Guaranteed)")
|
||||||
|
MutableGaugeInt containersGuaranteedQueued;
|
||||||
|
@Metric("Containers queued (Opportunistic)")
|
||||||
|
MutableGaugeInt containersOpportunisticQueued;
|
||||||
|
|
||||||
@Metric("# of bad local dirs")
|
@Metric("# of bad local dirs")
|
||||||
MutableGaugeInt badLocalDirs;
|
MutableGaugeInt badLocalDirs;
|
||||||
@Metric("# of bad log dirs")
|
@Metric("# of bad log dirs")
|
||||||
@ -209,6 +215,11 @@ public void completeOpportunisticContainer(Resource res) {
|
|||||||
allocatedOpportunisticVCores.decr(res.getVirtualCores());
|
allocatedOpportunisticVCores.decr(res.getVirtualCores());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setQueuedContainers(int opportunisticCount, int guaranteedCount) {
|
||||||
|
containersOpportunisticQueued.set(opportunisticCount);
|
||||||
|
containersGuaranteedQueued.set(guaranteedCount);
|
||||||
|
}
|
||||||
|
|
||||||
public void addResource(Resource res) {
|
public void addResource(Resource res) {
|
||||||
availableMB = availableMB + res.getMemorySize();
|
availableMB = availableMB + res.getMemorySize();
|
||||||
availableGB.set((int)Math.floor(availableMB/1024d));
|
availableGB.set((int)Math.floor(availableMB/1024d));
|
||||||
@ -314,6 +325,16 @@ public int getRunningOpportunisticContainers() {
|
|||||||
return runningOpportunisticContainers.value();
|
return runningOpportunisticContainers.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getQueuedOpportunisticContainers() {
|
||||||
|
return containersOpportunisticQueued.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getQueuedGuaranteedContainers() {
|
||||||
|
return containersGuaranteedQueued.value();
|
||||||
|
}
|
||||||
|
|
||||||
public long getCacheSizeBeforeClean() {
|
public long getCacheSizeBeforeClean() {
|
||||||
return this.cacheSizeBeforeClean.value();
|
return this.cacheSizeBeforeClean.value();
|
||||||
}
|
}
|
||||||
|
@ -270,6 +270,8 @@ public void testStartMultipleContainers() throws Exception {
|
|||||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||||
status.getState());
|
status.getState());
|
||||||
}
|
}
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -326,6 +328,8 @@ public void testQueueMultipleContainers() throws Exception {
|
|||||||
containerScheduler.getNumQueuedGuaranteedContainers());
|
containerScheduler.getNumQueuedGuaranteedContainers());
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
containerScheduler.getNumQueuedOpportunisticContainers());
|
containerScheduler.getNumQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(1, metrics.getQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(1, metrics.getQueuedGuaranteedContainers());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -393,6 +397,8 @@ public void testStartAndQueueMultipleContainers() throws Exception {
|
|||||||
containerScheduler.getNumQueuedGuaranteedContainers());
|
containerScheduler.getNumQueuedGuaranteedContainers());
|
||||||
Assert.assertEquals(2,
|
Assert.assertEquals(2,
|
||||||
containerScheduler.getNumQueuedOpportunisticContainers());
|
containerScheduler.getNumQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(2, metrics.getQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -473,6 +479,9 @@ public void testStartOpportunistcsWhenOppQueueIsFull() throws Exception {
|
|||||||
containerScheduler.getNumQueuedGuaranteedContainers());
|
containerScheduler.getNumQueuedGuaranteedContainers());
|
||||||
Assert.assertEquals(maxOppQueueLength,
|
Assert.assertEquals(maxOppQueueLength,
|
||||||
containerScheduler.getNumQueuedOpportunisticContainers());
|
containerScheduler.getNumQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(maxOppQueueLength,
|
||||||
|
metrics.getQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -543,6 +552,9 @@ public void testKillOpportunisticForGuaranteedContainer() throws Exception {
|
|||||||
System.out.println("\nStatus : [" + status + "]\n");
|
System.out.println("\nStatus : [" + status + "]\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(1, metrics.getQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers());
|
||||||
|
|
||||||
// Make sure the remaining OPPORTUNISTIC container starts its execution.
|
// Make sure the remaining OPPORTUNISTIC container starts its execution.
|
||||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||||
createContainerId(2), ContainerState.DONE, 40);
|
createContainerId(2), ContainerState.DONE, 40);
|
||||||
@ -554,6 +566,9 @@ public void testKillOpportunisticForGuaranteedContainer() throws Exception {
|
|||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||||
contStatus1.getState());
|
contStatus1.getState());
|
||||||
|
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -628,6 +643,8 @@ public void testPauseOpportunisticForGuaranteedContainer() throws Exception {
|
|||||||
}
|
}
|
||||||
System.out.println("\nStatus : [" + status + "]\n");
|
System.out.println("\nStatus : [" + status + "]\n");
|
||||||
}
|
}
|
||||||
|
Assert.assertEquals(1, metrics.getQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers());
|
||||||
|
|
||||||
// Make sure that the GUARANTEED container completes
|
// Make sure that the GUARANTEED container completes
|
||||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||||
@ -755,6 +772,8 @@ public void testQueueShedding() throws Exception {
|
|||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
Assert.assertEquals(6, containerScheduler.getNumQueuedContainers());
|
Assert.assertEquals(6, containerScheduler.getNumQueuedContainers());
|
||||||
|
Assert.assertEquals(6, metrics.getQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers());
|
||||||
|
|
||||||
ContainerQueuingLimit containerQueuingLimit = ContainerQueuingLimit
|
ContainerQueuingLimit containerQueuingLimit = ContainerQueuingLimit
|
||||||
.newInstance();
|
.newInstance();
|
||||||
@ -791,6 +810,8 @@ public void testQueueShedding() throws Exception {
|
|||||||
}
|
}
|
||||||
Assert.assertEquals(4, deQueuedContainers);
|
Assert.assertEquals(4, deQueuedContainers);
|
||||||
Assert.assertEquals(2, numQueuedOppContainers);
|
Assert.assertEquals(2, numQueuedOppContainers);
|
||||||
|
Assert.assertEquals(2, metrics.getQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -935,6 +956,8 @@ containerManager, createContainerId(0),
|
|||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertEquals(2, killedContainers);
|
Assert.assertEquals(2, killedContainers);
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1002,6 +1025,8 @@ public void testKillOnlyRequiredOpportunisticContainers() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertEquals(2, killedContainers);
|
Assert.assertEquals(2, killedContainers);
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1064,6 +1089,8 @@ public void testStopQueuedContainer() throws Exception {
|
|||||||
|
|
||||||
Assert.assertEquals(1, runningContainersNo);
|
Assert.assertEquals(1, runningContainersNo);
|
||||||
Assert.assertEquals(2, queuedContainersNo);
|
Assert.assertEquals(2, queuedContainersNo);
|
||||||
|
Assert.assertEquals(2, metrics.getQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers());
|
||||||
|
|
||||||
// Stop one of the two queued containers.
|
// Stop one of the two queued containers.
|
||||||
StopContainersRequest stopRequest = StopContainersRequest.
|
StopContainersRequest stopRequest = StopContainersRequest.
|
||||||
@ -1094,6 +1121,7 @@ public void testStopQueuedContainer() throws Exception {
|
|||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Assert.assertEquals(1, metrics.getQueuedOpportunisticContainers());
|
||||||
Assert.assertEquals(createContainerId(0),
|
Assert.assertEquals(createContainerId(0),
|
||||||
map.get(ContainerSubState.RUNNING).getContainerId());
|
map.get(ContainerSubState.RUNNING).getContainerId());
|
||||||
Assert.assertEquals(createContainerId(1),
|
Assert.assertEquals(createContainerId(1),
|
||||||
@ -1205,6 +1233,8 @@ public void testPromotionOfOpportunisticContainers() throws Exception {
|
|||||||
|
|
||||||
// Ensure no containers are queued.
|
// Ensure no containers are queued.
|
||||||
Assert.assertEquals(0, containerScheduler.getNumQueuedContainers());
|
Assert.assertEquals(0, containerScheduler.getNumQueuedContainers());
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedOpportunisticContainers());
|
||||||
|
Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers());
|
||||||
|
|
||||||
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
ContainerState> containerStates =
|
ContainerState> containerStates =
|
||||||
|
Loading…
Reference in New Issue
Block a user