YARN-1010. FairScheduler: decouple container scheduling from nodemanager heartbeats. (Wei Yan via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1528192 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2013-10-01 19:54:50 +00:00
parent 4fe912df9c
commit ae05623a75
7 changed files with 305 additions and 9 deletions

View File

@ -26,6 +26,9 @@ Release 2.3.0 - UNRELEASED
YARN-1021. Yarn Scheduler Load Simulator. (ywskycn via tucu)
YARN-1010. FairScheduler: decouple container scheduling from nodemanager
heartbeats. (Wei Yan via Sandy Ryza)
IMPROVEMENTS
YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)

View File

@ -310,9 +310,18 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
+ localRequest);
}
NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
NodeType allowedLocality;
if (scheduler.isContinuousSchedulingEnabled()) {
allowedLocality = app.getAllowedLocalityLevelByTime(priority,
scheduler.getNodeLocalityDelayMs(),
scheduler.getRackLocalityDelayMs(),
scheduler.getClock().getTime());
} else {
allowedLocality = app.getAllowedLocalityLevel(priority,
scheduler.getNumClusterNodes(),
scheduler.getNodeLocalityThreshold(),
scheduler.getRackLocalityThreshold());
}
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& localRequest != null && localRequest.getNumContainers() != 0) {

View File

@ -464,7 +464,12 @@ public Queue getQueue() {
* @param priority The priority of the container scheduled.
*/
synchronized public void resetSchedulingOpportunities(Priority priority) {
lastScheduledContainer.put(priority, System.currentTimeMillis());
resetSchedulingOpportunities(priority, System.currentTimeMillis());
}
// used for continuous scheduling
synchronized public void resetSchedulingOpportunities(Priority priority,
long currentTimeMs) {
lastScheduledContainer.put(priority, currentTimeMs);
schedulingOpportunities.setCount(priority, 0);
}
@ -513,6 +518,55 @@ else if (allowed.equals(NodeType.RACK_LOCAL)) {
return allowedLocalityLevel.get(priority);
}
/**
* Return the level at which we are allowed to schedule containers.
* Given the thresholds indicating how much time passed before relaxing
* scheduling constraints.
*/
public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority,
long nodeLocalityDelayMs, long rackLocalityDelayMs,
long currentTimeMs) {
// if not being used, can schedule anywhere
if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
return NodeType.OFF_SWITCH;
}
// default level is NODE_LOCAL
if (! allowedLocalityLevel.containsKey(priority)) {
allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
return NodeType.NODE_LOCAL;
}
NodeType allowed = allowedLocalityLevel.get(priority);
// if level is already most liberal, we're done
if (allowed.equals(NodeType.OFF_SWITCH)) {
return NodeType.OFF_SWITCH;
}
// check waiting time
long waitTime = currentTimeMs;
if (lastScheduledContainer.containsKey(priority)) {
waitTime -= lastScheduledContainer.get(priority);
} else {
waitTime -= appSchedulable.getStartTime();
}
long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
nodeLocalityDelayMs : rackLocalityDelayMs;
if (waitTime > thresholdTime) {
if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(priority, currentTimeMs);
} else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
resetSchedulingOpportunities(priority, currentTimeMs);
}
}
return allowedLocalityLevel.get(priority);
}
synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
Priority priority, ResourceRequest request,

View File

@ -179,8 +179,12 @@ public class FairScheduler implements ResourceScheduler {
protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
protected double nodeLocalityThreshold; // Cluster threshold for node locality
protected double rackLocalityThreshold; // Cluster threshold for rack locality
protected long nodeLocalityDelayMs; // Delay for node locality
protected long rackLocalityDelayMs; // Delay for rack locality
private FairSchedulerEventLog eventLog; // Machine-readable event log
protected boolean assignMultiple; // Allocate multiple containers per
// heartbeat
@ -582,6 +586,22 @@ public double getRackLocalityThreshold() {
return rackLocalityThreshold;
}
public long getNodeLocalityDelayMs() {
return nodeLocalityDelayMs;
}
public long getRackLocalityDelayMs() {
return rackLocalityDelayMs;
}
public boolean isContinuousSchedulingEnabled() {
return continuousSchedulingEnabled;
}
public synchronized int getContinuousSchedulingSleepMs() {
return continuousSchedulingSleepMs;
}
public Resource getClusterCapacity() {
return clusterCapacity;
}
@ -907,6 +927,37 @@ private synchronized void nodeUpdate(RMNode nm) {
completedContainer, RMContainerEventType.FINISHED);
}
if (continuousSchedulingEnabled) {
if (!completedContainers.isEmpty()) {
attemptScheduling(node);
}
} else {
attemptScheduling(node);
}
}
private void continuousScheduling() {
while (true) {
for (FSSchedulerNode node : nodes.values()) {
try {
if (Resources.fitsIn(minimumAllocation, node.getAvailableResource())) {
attemptScheduling(node);
}
} catch (Throwable ex) {
LOG.warn("Error while attempting scheduling for node " + node + ": " +
ex.toString(), ex);
}
}
try {
Thread.sleep(getContinuousSchedulingSleepMs());
} catch (InterruptedException e) {
LOG.warn("Error while doing sleep in continuous scheduling: " +
e.toString(), e);
}
}
}
private synchronized void attemptScheduling(FSSchedulerNode node) {
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
@ -914,19 +965,18 @@ private synchronized void nodeUpdate(RMNode nm) {
AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
Priority reservedPriority = node.getReservedContainer().getReservedPriority();
if (reservedAppSchedulable != null &&
!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
// Don't hold the reservation if app can no longer use it
LOG.info("Releasing reservation that cannot be satisfied for application "
+ reservedAppSchedulable.getApp().getApplicationAttemptId()
+ " on node " + nm);
+ " on node " + node);
reservedAppSchedulable.unreserve(reservedPriority, node);
reservedAppSchedulable = null;
} else {
// Reservation exists; try to fulfill the reservation
LOG.info("Trying to fulfill reservation for application "
+ reservedAppSchedulable.getApp().getApplicationAttemptId()
+ " on node: " + nm);
+ " on node: " + node);
node.getReservedAppSchedulable().assignReservedContainer(node);
}
@ -1060,8 +1110,13 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
maximumAllocation = this.conf.getMaximumAllocation();
incrAllocation = this.conf.getIncrementAllocation();
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs =
this.conf.getContinuousSchedulingSleepMs();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
preemptionEnabled = this.conf.getPreemptionEnabled();
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
@ -1088,6 +1143,21 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
updateThread.setName("FairSchedulerUpdateThread");
updateThread.setDaemon(true);
updateThread.start();
if (continuousSchedulingEnabled) {
// start continuous scheduling thread
Thread schedulingThread = new Thread(
new Runnable() {
@Override
public void run() {
continuousScheduling();
}
}
);
schedulingThread.setName("ContinuousScheduling");
schedulingThread.setDaemon(true);
schedulingThread.start();
}
} else {
try {
queueMgr.reloadAllocs();

View File

@ -66,6 +66,22 @@ public class FairSchedulerConfiguration extends Configuration {
protected static final float DEFAULT_LOCALITY_THRESHOLD_RACK =
DEFAULT_LOCALITY_THRESHOLD;
/** Delay for node locality. */
protected static final String LOCALITY_DELAY_NODE_MS = CONF_PREFIX + "locality-delay-node-ms";
protected static final long DEFAULT_LOCALITY_DELAY_NODE_MS = -1L;
/** Delay for rack locality. */
protected static final String LOCALITY_DELAY_RACK_MS = CONF_PREFIX + "locality-delay-rack-ms";
protected static final long DEFAULT_LOCALITY_DELAY_RACK_MS = -1L;
/** Enable continuous scheduling or not. */
protected static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX + "continuous-scheduling-enabled";
protected static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false;
/** Sleep time of each pass in continuous scheduling (5ms in default) */
protected static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX + "continuous-scheduling-sleep-ms";
protected static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5;
/** Whether preemption is enabled. */
protected static final String PREEMPTION = CONF_PREFIX + "preemption";
protected static final boolean DEFAULT_PREEMPTION = false;
@ -134,6 +150,22 @@ public float getLocalityThresholdRack() {
return getFloat(LOCALITY_THRESHOLD_RACK, DEFAULT_LOCALITY_THRESHOLD_RACK);
}
public boolean isContinuousSchedulingEnabled() {
return getBoolean(CONTINUOUS_SCHEDULING_ENABLED, DEFAULT_CONTINUOUS_SCHEDULING_ENABLED);
}
public int getContinuousSchedulingSleepMs() {
return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
}
public long getLocalityDelayNodeMs() {
return getLong(LOCALITY_DELAY_NODE_MS, DEFAULT_LOCALITY_DELAY_NODE_MS);
}
public long getLocalityDelayRackMs() {
return getLong(LOCALITY_DELAY_RACK_MS, DEFAULT_LOCALITY_DELAY_RACK_MS);
}
public boolean getPreemptionEnabled() {
return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
}

View File

@ -25,11 +25,25 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.util.Clock;
import org.junit.Test;
import org.mockito.Mockito;
public class TestFSSchedulerApp {
private class MockClock implements Clock {
private long time = 0;
@Override
public long getTime() {
return time;
}
public void tick(int seconds) {
time = time + seconds * 1000;
}
}
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
ApplicationAttemptId attId =
@ -93,6 +107,63 @@ public void testDelayScheduling() {
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
}
@Test
public void testDelaySchedulingForContinuousScheduling()
throws InterruptedException {
Queue queue = Mockito.mock(Queue.class);
Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1);
MockClock clock = new MockClock();
long nodeLocalityDelayMs = 5 * 1000L; // 5 seconds
long rackLocalityDelayMs = 6 * 1000L; // 6 seconds
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, "user1", queue,
null, null);
AppSchedulable appSchedulable = Mockito.mock(AppSchedulable.class);
long startTime = clock.getTime();
Mockito.when(appSchedulable.getStartTime()).thenReturn(startTime);
schedulerApp.setAppSchedulable(appSchedulable);
// Default level should be node-local
assertEquals(NodeType.NODE_LOCAL,
schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
// after 4 seconds should remain node local
clock.tick(4);
assertEquals(NodeType.NODE_LOCAL,
schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
// after 6 seconds should switch to rack local
clock.tick(2);
assertEquals(NodeType.RACK_LOCAL,
schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
// manually set back to node local
schedulerApp.resetAllowedLocalityLevel(prio, NodeType.NODE_LOCAL);
schedulerApp.resetSchedulingOpportunities(prio, clock.getTime());
assertEquals(NodeType.NODE_LOCAL,
schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
// Now escalate again to rack-local, then to off-switch
clock.tick(6);
assertEquals(NodeType.RACK_LOCAL,
schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
clock.tick(7);
assertEquals(NodeType.OFF_SWITCH,
schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
}
@Test
/**
* Ensure that when negative paramaters are given (signaling delay scheduling

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -298,6 +299,14 @@ public void testLoadConfigurationOnInitialize() throws IOException {
conf.setBoolean(FairSchedulerConfiguration.SIZE_BASED_WEIGHT, true);
conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_NODE, .5);
conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7);
conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
true);
conf.setInt(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS,
10);
conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS,
5000);
conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS,
5000);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
@ -308,6 +317,11 @@ public void testLoadConfigurationOnInitialize() throws IOException {
Assert.assertEquals(true, scheduler.sizeBasedWeight);
Assert.assertEquals(.5, scheduler.nodeLocalityThreshold, .01);
Assert.assertEquals(.7, scheduler.rackLocalityThreshold, .01);
Assert.assertTrue("The continuous scheduling should be enabled",
scheduler.continuousSchedulingEnabled);
Assert.assertEquals(10, scheduler.continuousSchedulingSleepMs);
Assert.assertEquals(5000, scheduler.nodeLocalityDelayMs);
Assert.assertEquals(5000, scheduler.rackLocalityDelayMs);
Assert.assertEquals(1024, scheduler.getMaximumResourceCapability().getMemory());
Assert.assertEquals(512, scheduler.getMinimumResourceCapability().getMemory());
Assert.assertEquals(128,
@ -2255,4 +2269,47 @@ public void testConcurrentAccessOnApplications() throws Exception {
fs.applications, FSSchedulerApp.class);
}
@Test
public void testContinuousScheduling() throws Exception {
// set continuous scheduling enabled
FairScheduler fs = new FairScheduler();
Configuration conf = createConfiguration();
conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
true);
fs.reinitialize(conf, resourceManager.getRMContext());
Assert.assertTrue("Continuous scheduling should be enabled.",
fs.isContinuousSchedulingEnabled());
// Add one node
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
fs.handle(nodeEvent1);
// available resource
Assert.assertEquals(fs.getClusterCapacity().getMemory(), 8 * 1024);
Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 8);
// send application request
ApplicationAttemptId appAttemptId =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
fs.addApplication(appAttemptId, "queue11", "user11");
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request =
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
ask.add(request);
fs.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
// waiting for continuous_scheduler_sleep_time
// at least one pass
Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500);
// check consumption
Resource consumption =
fs.applications.get(appAttemptId).getCurrentConsumption();
Assert.assertEquals(1024, consumption.getMemory());
Assert.assertEquals(1, consumption.getVirtualCores());
}
}