YARN-3790. usedResource from rootQueue metrics may get stale data for FS scheduler after recovering the container (Zhihai Xu via rohithsharmaks)

This commit is contained in:
rohithsharmaks 2015-06-24 23:00:14 +05:30
parent 8d58512d6e
commit dd4b387d96
2 changed files with 10 additions and 4 deletions

View File

@ -705,6 +705,9 @@ Release 2.7.1 - UNRELEASED
YARN-3832. Resource Localization fails on a cluster due to existing cache YARN-3832. Resource Localization fails on a cluster due to existing cache
directories (Brahma Reddy Battula via jlowe) directories (Brahma Reddy Battula via jlowe)
YARN-3790. usedResource from rootQueue metrics may get stale data for FS
scheduler after recovering the container (Zhihai Xu via rohithsharmaks)
Release 2.7.0 - 2015-04-20 Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
@ -841,11 +842,11 @@ protected synchronized void completedContainer(RMContainer rmContainer,
+ " with event: " + event); + " with event: " + event);
} }
private synchronized void addNode(RMNode node) { private synchronized void addNode(List<NMContainerStatus> containerReports,
RMNode node) {
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
nodes.put(node.getNodeID(), schedulerNode); nodes.put(node.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, node.getTotalCapability()); Resources.addTo(clusterResource, node.getTotalCapability());
updateRootQueueMetrics();
updateMaximumAllocation(schedulerNode, true); updateMaximumAllocation(schedulerNode, true);
triggerUpdate(); triggerUpdate();
@ -854,6 +855,9 @@ private synchronized void addNode(RMNode node) {
queueMgr.getRootQueue().recomputeSteadyShares(); queueMgr.getRootQueue().recomputeSteadyShares();
LOG.info("Added node " + node.getNodeAddress() + LOG.info("Added node " + node.getNodeAddress() +
" cluster capacity: " + clusterResource); " cluster capacity: " + clusterResource);
recoverContainersOnNode(containerReports, node);
updateRootQueueMetrics();
} }
private synchronized void removeNode(RMNode rmNode) { private synchronized void removeNode(RMNode rmNode) {
@ -1147,8 +1151,7 @@ public void handle(SchedulerEvent event) {
throw new RuntimeException("Unexpected event type: " + event); throw new RuntimeException("Unexpected event type: " + event);
} }
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode()); addNode(nodeAddedEvent.getContainerReports(),
recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
nodeAddedEvent.getAddedRMNode()); nodeAddedEvent.getAddedRMNode());
break; break;
case NODE_REMOVED: case NODE_REMOVED: