YARN-3259. FairScheduler: Trigger fairShare updates on node events. (Anubhav Dhoot via kasha)

This commit is contained in:
Karthik Kambatla 2015-06-05 09:39:41 -07:00
parent 6786daab33
commit 75885852cc
4 changed files with 163 additions and 4 deletions

View File

@ -306,6 +306,9 @@ Release 2.8.0 - UNRELEASED
YARN-3547. FairScheduler: Apps that have no resource demand should not participate
scheduling. (Xianyin Xin via kasha)
YARN-3259. FairScheduler: Trigger fairShare updates on node events.
(Anubhav Dhoot via kasha)
BUG FIXES
YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.MetricsCollector;
@ -116,4 +117,9 @@ public void addUpdateCallDuration(long value) {
public void addPreemptCallDuration(long value) {
preemptCall.add(value);
}
@VisibleForTesting
public boolean hasUpdateThreadRunChanged() {
return updateThreadRun.changed();
}
}

View File

@ -103,9 +103,9 @@
* of the root queue in the typical fair scheduling fashion. Then, the children
* distribute the resources assigned to them to their children in the same
* fashion. Applications may only be scheduled on leaf queues. Queues can be
* specified as children of other queues by placing them as sub-elements of their
* parents in the fair scheduler configuration file.
*
* specified as children of other queues by placing them as sub-elements of
* their parents in the fair scheduler configuration file.
*
* A queue's name starts with the names of its parents, with periods as
* separators. So a queue named "queue1" under the root named, would be
* referred to as "root.queue1", and a queue named "queue2" under a queue
@ -142,6 +142,8 @@ public class FairScheduler extends
@VisibleForTesting
Thread updateThread;
private final Object updateThreadMonitor = new Object();
@VisibleForTesting
Thread schedulingThread;
// timeout to join when we stop this service
@ -246,6 +248,13 @@ public QueueManager getQueueManager() {
return queueMgr;
}
// Allows UpdateThread to start processing without waiting till updateInterval
void triggerUpdate() {
synchronized (updateThreadMonitor) {
updateThreadMonitor.notify();
}
}
/**
* Thread which calls {@link FairScheduler#update()} every
* <code>updateInterval</code> milliseconds.
@ -256,7 +265,9 @@ private class UpdateThread extends Thread {
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(updateInterval);
synchronized (updateThreadMonitor) {
updateThreadMonitor.wait(updateInterval);
}
long start = getClock().getTime();
update();
preemptTasksIfNecessary();
@ -838,6 +849,8 @@ private synchronized void addNode(RMNode node) {
updateRootQueueMetrics();
updateMaximumAllocation(schedulerNode, true);
triggerUpdate();
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
LOG.info("Added node " + node.getNodeAddress() +
@ -853,6 +866,8 @@ private synchronized void removeNode(RMNode rmNode) {
Resources.subtractFrom(clusterResource, rmNode.getTotalCapability());
updateRootQueueMetrics();
triggerUpdate();
// Remove running containers
List<RMContainer> runningContainers = node.getRunningContainers();
for (RMContainer container : runningContainers) {

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestSchedulingUpdate extends FairSchedulerTestBase {
@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
// Make the update loop to never finish to ensure zero update calls
conf.setInt(
FairSchedulerConfiguration.UPDATE_INTERVAL_MS,
Integer.MAX_VALUE);
return conf;
}
@Before
public void setup() {
conf = createConfiguration();
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
}
@After
public void teardown() {
if (resourceManager != null) {
resourceManager.stop();
resourceManager = null;
}
}
@Test (timeout = 3000)
public void testSchedulingUpdateOnNodeJoinLeave() throws InterruptedException {
verifyNoCalls();
// Add one node
String host = "127.0.0.1";
final int memory = 4096;
final int cores = 4;
RMNode node1 = MockNodes.newNodeInfo(
1, Resources.createResource(memory, cores), 1, host);
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
long expectedCalls = 1;
verifyExpectedCalls(expectedCalls, memory, cores);
// Remove the node
NodeRemovedSchedulerEvent nodeEvent2 = new NodeRemovedSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
expectedCalls = 2;
verifyExpectedCalls(expectedCalls, 0, 0);
}
private void verifyExpectedCalls(long expectedCalls, int memory, int vcores)
throws InterruptedException {
boolean verified = false;
int count = 0;
while (count < 100) {
if (scheduler.fsOpDurations.hasUpdateThreadRunChanged()) {
break;
}
count++;
Thread.sleep(10);
}
assertTrue("Update Thread has not run based on its metrics",
scheduler.fsOpDurations.hasUpdateThreadRunChanged());
assertEquals("Root queue metrics memory does not have expected value",
memory, scheduler.getRootQueueMetrics().getAvailableMB());
assertEquals("Root queue metrics cpu does not have expected value",
vcores, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
MetricsCollectorImpl collector = new MetricsCollectorImpl();
scheduler.fsOpDurations.getMetrics(collector, true);
MetricsRecord record = collector.getRecords().get(0);
for (AbstractMetric abstractMetric : record.metrics()) {
if (abstractMetric.name().contains("UpdateThreadRunNumOps")) {
assertEquals("Update Thread did not run expected number of times " +
"based on metric record count",
expectedCalls,
abstractMetric.value());
verified = true;
}
}
assertTrue("Did not find metric for UpdateThreadRunNumOps", verified);
}
private void verifyNoCalls() {
assertFalse("Update thread should not have executed",
scheduler.fsOpDurations.hasUpdateThreadRunChanged());
assertEquals("Scheduler queue memory should not have been updated",
0, scheduler.getRootQueueMetrics().getAvailableMB());
assertEquals("Scheduler queue cpu should not have been updated",
0,scheduler.getRootQueueMetrics().getAvailableVirtualCores());
}
}