YARN-2273. NPE in ContinuousScheduling thread when we lose a node. (Wei Yan via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612720 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-07-22 22:44:38 +00:00
parent 853ed29f2d
commit ff77582991
3 changed files with 75 additions and 31 deletions

View File

@ -88,6 +88,9 @@ Release 2.6.0 - UNRELEASED
YARN-2321. NodeManager web UI can incorrectly report Pmem enforcement
(Leitao Guo via jlowe)
YARN-2273. NPE in ContinuousScheduling thread when we lose a node.
(Wei Yan via kasha)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -970,37 +970,27 @@ private synchronized void nodeUpdate(RMNode nm) {
}
}
private void continuousScheduling() {
while (true) {
List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
// Sort the nodes by space available on them, so that we offer
// containers on emptier nodes first, facilitating an even spread. This
// requires holding the scheduler lock, so that the space available on a
// node doesn't change during the sort.
synchronized (this) {
Collections.sort(nodeIdList, nodeAvailableResourceComparator);
}
void continuousSchedulingAttempt() {
List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
// Sort the nodes by space available on them, so that we offer
// containers on emptier nodes first, facilitating an even spread. This
// requires holding the scheduler lock, so that the space available on a
// node doesn't change during the sort.
synchronized (this) {
Collections.sort(nodeIdList, nodeAvailableResourceComparator);
}
// iterate all nodes
for (NodeId nodeId : nodeIdList) {
if (nodes.containsKey(nodeId)) {
FSSchedulerNode node = getFSSchedulerNode(nodeId);
try {
if (Resources.fitsIn(minimumAllocation,
node.getAvailableResource())) {
attemptScheduling(node);
}
} catch (Throwable ex) {
LOG.warn("Error while attempting scheduling for node " + node +
": " + ex.toString(), ex);
}
}
}
// iterate all nodes
for (NodeId nodeId : nodeIdList) {
FSSchedulerNode node = getFSSchedulerNode(nodeId);
try {
Thread.sleep(getContinuousSchedulingSleepMs());
} catch (InterruptedException e) {
LOG.warn("Error while doing sleep in continuous scheduling: " +
e.toString(), e);
if (node != null && Resources.fitsIn(minimumAllocation,
node.getAvailableResource())) {
attemptScheduling(node);
}
} catch (Throwable ex) {
LOG.error("Error while attempting scheduling for node " + node +
": " + ex.toString(), ex);
}
}
}
@ -1010,6 +1000,12 @@ private class NodeAvailableResourceComparator implements Comparator<NodeId> {
@Override
public int compare(NodeId n1, NodeId n2) {
if (!nodes.containsKey(n1)) {
return 1;
}
if (!nodes.containsKey(n2)) {
return -1;
}
return RESOURCE_CALCULATOR.compare(clusterResource,
nodes.get(n2).getAvailableResource(),
nodes.get(n1).getAvailableResource());
@ -1234,7 +1230,16 @@ private synchronized void initScheduler(Configuration conf)
new Runnable() {
@Override
public void run() {
continuousScheduling();
while (!Thread.currentThread().isInterrupted()) {
try {
continuousSchedulingAttempt();
Thread.sleep(getContinuousSchedulingSleepMs());
} catch (InterruptedException e) {
LOG.error("Continuous scheduling thread interrupted. Exiting. ",
e);
return;
}
}
}
}
);

View File

@ -2763,7 +2763,43 @@ public void testContinuousScheduling() throws Exception {
Assert.assertEquals(2, nodes.size());
}
@Test
public void testContinuousSchedulingWithNodeRemoved() throws Exception {
// Disable continuous scheduling, will invoke continuous scheduling once manually
scheduler.init(conf);
scheduler.start();
Assert.assertTrue("Continuous scheduling should be disabled.",
!scheduler.isContinuousSchedulingEnabled());
// Add two nodes
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
Assert.assertEquals("We should have two alive nodes.",
2, scheduler.getNumClusterNodes());
// Remove one node
NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1);
scheduler.handle(removeNode1);
Assert.assertEquals("We should only have one alive node.",
1, scheduler.getNumClusterNodes());
// Invoke the continuous scheduling once
try {
scheduler.continuousSchedulingAttempt();
} catch (Exception e) {
fail("Exception happened when doing continuous scheduling. " +
e.toString());
}
}
@Test
public void testDontAllowUndeclaredPools() throws Exception{
conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);