YARN-655. Fair scheduler metrics should subtract allocated memory from available memory. (sandyr via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1480809 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e0562e3d07
commit
51ccb87031
@ -367,6 +367,9 @@ Release 2.0.5-beta - UNRELEASED
|
|||||||
|
|
||||||
YARN-637. FS: maxAssign is not honored. (kkambatl via tucu)
|
YARN-637. FS: maxAssign is not honored. (kkambatl via tucu)
|
||||||
|
|
||||||
|
YARN-655. Fair scheduler metrics should subtract allocated memory from
|
||||||
|
available memory. (sandyr via tucu)
|
||||||
|
|
||||||
Release 2.0.4-alpha - 2013-04-25
|
Release 2.0.4-alpha - 2013-04-25
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -43,6 +43,7 @@
|
|||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -425,6 +426,10 @@ public int getAppsKilled() {
|
|||||||
public int getAppsFailed() {
|
public int getAppsFailed() {
|
||||||
return appsFailed.value();
|
return appsFailed.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Resource getAllocatedResources() {
|
||||||
|
return BuilderUtils.newResource(allocatedMB.value(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
public int getAllocatedMB() {
|
public int getAllocatedMB() {
|
||||||
return allocatedMB.value();
|
return allocatedMB.value();
|
||||||
|
@ -234,10 +234,6 @@ protected synchronized void update() {
|
|||||||
// Recursively compute fair shares for all queues
|
// Recursively compute fair shares for all queues
|
||||||
// and update metrics
|
// and update metrics
|
||||||
rootQueue.recomputeShares();
|
rootQueue.recomputeShares();
|
||||||
|
|
||||||
// Update recorded capacity of root queue (child queues are updated
|
|
||||||
// when fair share is calculated).
|
|
||||||
rootMetrics.setAvailableResourcesToQueue(clusterCapacity);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -670,6 +666,7 @@ private synchronized void completedContainer(RMContainer rmContainer,
|
|||||||
} else {
|
} else {
|
||||||
application.containerCompleted(rmContainer, containerStatus, event);
|
application.containerCompleted(rmContainer, containerStatus, event);
|
||||||
node.releaseContainer(container);
|
node.releaseContainer(container);
|
||||||
|
updateRootQueueMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Application " + applicationAttemptId +
|
LOG.info("Application " + applicationAttemptId +
|
||||||
@ -681,6 +678,7 @@ private synchronized void completedContainer(RMContainer rmContainer,
|
|||||||
private synchronized void addNode(RMNode node) {
|
private synchronized void addNode(RMNode node) {
|
||||||
nodes.put(node.getNodeID(), new FSSchedulerNode(node));
|
nodes.put(node.getNodeID(), new FSSchedulerNode(node));
|
||||||
Resources.addTo(clusterCapacity, node.getTotalCapability());
|
Resources.addTo(clusterCapacity, node.getTotalCapability());
|
||||||
|
updateRootQueueMetrics();
|
||||||
|
|
||||||
LOG.info("Added node " + node.getNodeAddress() +
|
LOG.info("Added node " + node.getNodeAddress() +
|
||||||
" cluster capacity: " + clusterCapacity);
|
" cluster capacity: " + clusterCapacity);
|
||||||
@ -689,6 +687,7 @@ private synchronized void addNode(RMNode node) {
|
|||||||
private synchronized void removeNode(RMNode rmNode) {
|
private synchronized void removeNode(RMNode rmNode) {
|
||||||
FSSchedulerNode node = nodes.get(rmNode.getNodeID());
|
FSSchedulerNode node = nodes.get(rmNode.getNodeID());
|
||||||
Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
|
Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
|
||||||
|
updateRootQueueMetrics();
|
||||||
|
|
||||||
// Remove running containers
|
// Remove running containers
|
||||||
List<RMContainer> runningContainers = node.getRunningContainers();
|
List<RMContainer> runningContainers = node.getRunningContainers();
|
||||||
@ -901,6 +900,7 @@ private synchronized void nodeUpdate(RMNode nm) {
|
|||||||
if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
|
if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
updateRootQueueMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -922,6 +922,18 @@ public SchedulerAppReport getSchedulerAppInfo(
|
|||||||
}
|
}
|
||||||
return new SchedulerAppReport(applications.get(appAttemptId));
|
return new SchedulerAppReport(applications.get(appAttemptId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subqueue metrics might be a little out of date because fair shares are
|
||||||
|
* recalculated at the update interval, but the root queue metrics needs to
|
||||||
|
* be updated synchronously with allocations and completions so that cluster
|
||||||
|
* metrics will be consistent.
|
||||||
|
*/
|
||||||
|
private void updateRootQueueMetrics() {
|
||||||
|
rootMetrics.setAvailableResourcesToQueue(
|
||||||
|
Resources.subtract(
|
||||||
|
clusterCapacity, rootMetrics.getAllocatedResources()));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public QueueMetrics getRootQueueMetrics() {
|
public QueueMetrics getRootQueueMetrics() {
|
||||||
|
@ -68,6 +68,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||||
@ -128,6 +129,7 @@ public void setUp() throws IOException {
|
|||||||
public void tearDown() {
|
public void tearDown() {
|
||||||
scheduler = null;
|
scheduler = null;
|
||||||
resourceManager = null;
|
resourceManager = null;
|
||||||
|
QueueMetrics.clearQueueMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Configuration createConfiguration() {
|
private Configuration createConfiguration() {
|
||||||
@ -337,6 +339,13 @@ public void testSimpleContainerAllocation() {
|
|||||||
|
|
||||||
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
|
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
|
||||||
getResourceUsage().getMemory());
|
getResourceUsage().getMemory());
|
||||||
|
|
||||||
|
// verify metrics
|
||||||
|
QueueMetrics queue1Metrics = scheduler.getQueueManager().getQueue("queue1")
|
||||||
|
.getMetrics();
|
||||||
|
assertEquals(1024, queue1Metrics.getAllocatedMB());
|
||||||
|
assertEquals(1024, scheduler.getRootQueueMetrics().getAllocatedMB());
|
||||||
|
assertEquals(512, scheduler.getRootQueueMetrics().getAvailableMB());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 5000)
|
@Test (timeout = 5000)
|
||||||
@ -1276,6 +1285,7 @@ public void testReservationWhileMultiplePriorities() {
|
|||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
assertEquals(1, app.getLiveContainers().size());
|
assertEquals(1, app.getLiveContainers().size());
|
||||||
|
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
|
||||||
|
|
||||||
// Create request at higher priority
|
// Create request at higher priority
|
||||||
createSchedulingRequestExistingApplication(1024, 1, attId);
|
createSchedulingRequestExistingApplication(1024, 1, attId);
|
||||||
@ -1291,6 +1301,7 @@ public void testReservationWhileMultiplePriorities() {
|
|||||||
// Complete container
|
// Complete container
|
||||||
scheduler.allocate(attId, new ArrayList<ResourceRequest>(),
|
scheduler.allocate(attId, new ArrayList<ResourceRequest>(),
|
||||||
Arrays.asList(containerId));
|
Arrays.asList(containerId));
|
||||||
|
assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
|
||||||
|
|
||||||
// Schedule at opening
|
// Schedule at opening
|
||||||
scheduler.update();
|
scheduler.update();
|
||||||
@ -1302,6 +1313,7 @@ public void testReservationWhileMultiplePriorities() {
|
|||||||
for (RMContainer liveContainer : liveContainers) {
|
for (RMContainer liveContainer : liveContainers) {
|
||||||
Assert.assertEquals(2, liveContainer.getContainer().getPriority().getPriority());
|
Assert.assertEquals(2, liveContainer.getContainer().getPriority().getPriority());
|
||||||
}
|
}
|
||||||
|
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -1606,4 +1618,24 @@ public void testReservationThatDoesntFit() {
|
|||||||
assertEquals(1, app.getLiveContainers().size());
|
assertEquals(1, app.getLiveContainers().size());
|
||||||
assertEquals(0, app.getReservedContainers().size());
|
assertEquals(0, app.getReservedContainers().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveNodeUpdatesRootQueueMetrics() {
|
||||||
|
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
|
||||||
|
|
||||||
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
||||||
|
NodeAddedSchedulerEvent addEvent = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(addEvent);
|
||||||
|
|
||||||
|
assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
|
||||||
|
scheduler.update(); // update shouldn't change things
|
||||||
|
assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
|
||||||
|
|
||||||
|
NodeRemovedSchedulerEvent removeEvent = new NodeRemovedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(removeEvent);
|
||||||
|
|
||||||
|
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
|
||||||
|
scheduler.update(); // update shouldn't change things
|
||||||
|
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user