diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 28160be830..2e69a756c6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -88,6 +88,9 @@ Release 2.6.0 - UNRELEASED YARN-2321. NodeManager web UI can incorrectly report Pmem enforcement (Leitao Guo via jlowe) + YARN-2273. NPE in ContinuousScheduling thread when we lose a node. + (Wei Yan via kasha) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 18ccf9d8a9..c0687bcbc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -970,37 +970,27 @@ private synchronized void nodeUpdate(RMNode nm) { } } - private void continuousScheduling() { - while (true) { - List nodeIdList = new ArrayList(nodes.keySet()); - // Sort the nodes by space available on them, so that we offer - // containers on emptier nodes first, facilitating an even spread. This - // requires holding the scheduler lock, so that the space available on a - // node doesn't change during the sort. - synchronized (this) { - Collections.sort(nodeIdList, nodeAvailableResourceComparator); - } + void continuousSchedulingAttempt() { + List nodeIdList = new ArrayList(nodes.keySet()); + // Sort the nodes by space available on them, so that we offer + // containers on emptier nodes first, facilitating an even spread. This + // requires holding the scheduler lock, so that the space available on a + // node doesn't change during the sort. + synchronized (this) { + Collections.sort(nodeIdList, nodeAvailableResourceComparator); + } - // iterate all nodes - for (NodeId nodeId : nodeIdList) { - if (nodes.containsKey(nodeId)) { - FSSchedulerNode node = getFSSchedulerNode(nodeId); - try { - if (Resources.fitsIn(minimumAllocation, - node.getAvailableResource())) { - attemptScheduling(node); - } - } catch (Throwable ex) { - LOG.warn("Error while attempting scheduling for node " + node + - ": " + ex.toString(), ex); - } - } - } + // iterate all nodes + for (NodeId nodeId : nodeIdList) { + FSSchedulerNode node = getFSSchedulerNode(nodeId); try { - Thread.sleep(getContinuousSchedulingSleepMs()); - } catch (InterruptedException e) { - LOG.warn("Error while doing sleep in continuous scheduling: " + - e.toString(), e); + if (node != null && Resources.fitsIn(minimumAllocation, + node.getAvailableResource())) { + attemptScheduling(node); + } + } catch (Throwable ex) { + LOG.error("Error while attempting scheduling for node " + node + + ": " + ex.toString(), ex); } } } @@ -1010,6 +1000,12 @@ private class NodeAvailableResourceComparator implements Comparator { @Override public int compare(NodeId n1, NodeId n2) { + if (!nodes.containsKey(n1)) { + return 1; + } + if (!nodes.containsKey(n2)) { + return -1; + } return RESOURCE_CALCULATOR.compare(clusterResource, nodes.get(n2).getAvailableResource(), nodes.get(n1).getAvailableResource()); @@ -1234,7 +1230,16 @@ private synchronized void initScheduler(Configuration conf) new Runnable() { @Override public void run() { - continuousScheduling(); + while (!Thread.currentThread().isInterrupted()) { + try { + continuousSchedulingAttempt(); + Thread.sleep(getContinuousSchedulingSleepMs()); + } catch (InterruptedException e) { + LOG.error("Continuous scheduling thread interrupted. Exiting. ", + e); + return; + } + } } } ); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index ed492cec40..df157e7500 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2763,7 +2763,43 @@ public void testContinuousScheduling() throws Exception { Assert.assertEquals(2, nodes.size()); } - + @Test + public void testContinuousSchedulingWithNodeRemoved() throws Exception { + // Disable continuous scheduling, will invoke continuous scheduling once manually + scheduler.init(conf); + scheduler.start(); + Assert.assertTrue("Continuous scheduling should be disabled.", + !scheduler.isContinuousSchedulingEnabled()); + + // Add two nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + Assert.assertEquals("We should have two alive nodes.", + 2, scheduler.getNumClusterNodes()); + + // Remove one node + NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1); + scheduler.handle(removeNode1); + Assert.assertEquals("We should only have one alive node.", + 1, scheduler.getNumClusterNodes()); + + // Invoke the continuous scheduling once + try { + scheduler.continuousSchedulingAttempt(); + } catch (Exception e) { + fail("Exception happened when doing continuous scheduling. " + + e.toString()); + } + } + @Test public void testDontAllowUndeclaredPools() throws Exception{ conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);