YARN-6486. FairScheduler: Deprecate continuous scheduling. (Contributed by Wilfred Spiegelenburg)
This commit is contained in:
parent
d716084f45
commit
370f1c6283
@ -943,6 +943,7 @@ private boolean isOverAMShareLimit() {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
|
private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Node offered to app: " + getName() + " reserved: " + reserved);
|
LOG.trace("Node offered to app: " + getName() + " reserved: " + reserved);
|
||||||
|
@ -41,6 +41,7 @@
|
|||||||
@Metrics(context="fairscheduler-op-durations")
|
@Metrics(context="fairscheduler-op-durations")
|
||||||
public class FSOpDurations implements MetricsSource {
|
public class FSOpDurations implements MetricsSource {
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
@Metric("Duration for a continuous scheduling run")
|
@Metric("Duration for a continuous scheduling run")
|
||||||
MutableRate continuousSchedulingRun;
|
MutableRate continuousSchedulingRun;
|
||||||
|
|
||||||
@ -90,6 +91,7 @@ public synchronized void getMetrics(MetricsCollector collector, boolean all) {
|
|||||||
registry.snapshot(collector.addRecord(registry.info()), all);
|
registry.snapshot(collector.addRecord(registry.info()), all);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
public void addContinuousSchedulingRunDuration(long value) {
|
public void addContinuousSchedulingRunDuration(long value) {
|
||||||
continuousSchedulingRun.add(value);
|
continuousSchedulingRun.add(value);
|
||||||
}
|
}
|
||||||
|
@ -48,6 +48,7 @@ class FSPreemptionThread extends Thread {
|
|||||||
private final Timer preemptionTimer;
|
private final Timer preemptionTimer;
|
||||||
private final Lock schedulerReadLock;
|
private final Lock schedulerReadLock;
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
FSPreemptionThread(FairScheduler scheduler) {
|
FSPreemptionThread(FairScheduler scheduler) {
|
||||||
setDaemon(true);
|
setDaemon(true);
|
||||||
setName("FSPreemptionThread");
|
setName("FSPreemptionThread");
|
||||||
|
@ -153,6 +153,7 @@ public class FairScheduler extends
|
|||||||
private final int UPDATE_DEBUG_FREQUENCY = 25;
|
private final int UPDATE_DEBUG_FREQUENCY = 25;
|
||||||
private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
|
private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
Thread schedulingThread;
|
Thread schedulingThread;
|
||||||
|
|
||||||
@ -167,15 +168,19 @@ public class FairScheduler extends
|
|||||||
|
|
||||||
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
|
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
|
||||||
// Continuous Scheduling enabled or not
|
// Continuous Scheduling enabled or not
|
||||||
|
@Deprecated
|
||||||
protected boolean continuousSchedulingEnabled;
|
protected boolean continuousSchedulingEnabled;
|
||||||
// Sleep time for each pass in continuous scheduling
|
// Sleep time for each pass in continuous scheduling
|
||||||
|
@Deprecated
|
||||||
protected volatile int continuousSchedulingSleepMs;
|
protected volatile int continuousSchedulingSleepMs;
|
||||||
// Node available resource comparator
|
// Node available resource comparator
|
||||||
private Comparator<FSSchedulerNode> nodeAvailableResourceComparator =
|
private Comparator<FSSchedulerNode> nodeAvailableResourceComparator =
|
||||||
new NodeAvailableResourceComparator();
|
new NodeAvailableResourceComparator();
|
||||||
protected double nodeLocalityThreshold; // Cluster threshold for node locality
|
protected double nodeLocalityThreshold; // Cluster threshold for node locality
|
||||||
protected double rackLocalityThreshold; // Cluster threshold for rack locality
|
protected double rackLocalityThreshold; // Cluster threshold for rack locality
|
||||||
|
@Deprecated
|
||||||
protected long nodeLocalityDelayMs; // Delay for node locality
|
protected long nodeLocalityDelayMs; // Delay for node locality
|
||||||
|
@Deprecated
|
||||||
protected long rackLocalityDelayMs; // Delay for rack locality
|
protected long rackLocalityDelayMs; // Delay for rack locality
|
||||||
protected boolean assignMultiple; // Allocate multiple containers per
|
protected boolean assignMultiple; // Allocate multiple containers per
|
||||||
// heartbeat
|
// heartbeat
|
||||||
@ -284,6 +289,7 @@ public QueueManager getQueueManager() {
|
|||||||
* Thread which attempts scheduling resources continuously,
|
* Thread which attempts scheduling resources continuously,
|
||||||
* asynchronous to the node heartbeats.
|
* asynchronous to the node heartbeats.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
private class ContinuousSchedulingThread extends Thread {
|
private class ContinuousSchedulingThread extends Thread {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -389,18 +395,44 @@ public double getRackLocalityThreshold() {
|
|||||||
return rackLocalityThreshold;
|
return rackLocalityThreshold;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delay in milliseconds for locality fallback node to rack.
|
||||||
|
* @deprecated linked to {@link #isContinuousSchedulingEnabled} deprecation
|
||||||
|
* @return delay in ms
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
public long getNodeLocalityDelayMs() {
|
public long getNodeLocalityDelayMs() {
|
||||||
return nodeLocalityDelayMs;
|
return nodeLocalityDelayMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delay in milliseconds for locality fallback rack to other.
|
||||||
|
* @deprecated linked to {@link #isContinuousSchedulingEnabled} deprecation
|
||||||
|
* @return delay in ms
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
public long getRackLocalityDelayMs() {
|
public long getRackLocalityDelayMs() {
|
||||||
return rackLocalityDelayMs;
|
return rackLocalityDelayMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether continuous scheduling is turned on.
|
||||||
|
* @deprecated Continuous scheduling should not be turned ON. It is
|
||||||
|
* deprecated because it can cause scheduler slowness due to locking issues.
|
||||||
|
* Schedulers should use assignmultiple as a replacement.
|
||||||
|
* @return whether continuous scheduling is enabled
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
public boolean isContinuousSchedulingEnabled() {
|
public boolean isContinuousSchedulingEnabled() {
|
||||||
return continuousSchedulingEnabled;
|
return continuousSchedulingEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The sleep time of the continuous scheduler thread.
|
||||||
|
* @deprecated linked to {@link #isContinuousSchedulingEnabled} deprecation
|
||||||
|
* @return sleep time in ms
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
public int getContinuousSchedulingSleepMs() {
|
public int getContinuousSchedulingSleepMs() {
|
||||||
return continuousSchedulingSleepMs;
|
return continuousSchedulingSleepMs;
|
||||||
}
|
}
|
||||||
@ -894,6 +926,7 @@ protected void nodeUpdate(RMNode nm) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
void continuousSchedulingAttempt() throws InterruptedException {
|
void continuousSchedulingAttempt() throws InterruptedException {
|
||||||
long start = getClock().getTime();
|
long start = getClock().getTime();
|
||||||
List<FSSchedulerNode> nodeIdList;
|
List<FSSchedulerNode> nodeIdList;
|
||||||
@ -1253,6 +1286,7 @@ public void setRMContext(RMContext rmContext) {
|
|||||||
this.rmContext = rmContext;
|
this.rmContext = rmContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
private void initScheduler(Configuration conf) throws IOException {
|
private void initScheduler(Configuration conf) throws IOException {
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
@ -1299,6 +1333,10 @@ private void initScheduler(Configuration conf) throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (continuousSchedulingEnabled) {
|
if (continuousSchedulingEnabled) {
|
||||||
|
// Contiuous scheduling is deprecated log it on startup
|
||||||
|
LOG.warn("Continuous scheduling is turned ON. It is deprecated " +
|
||||||
|
"because it can cause scheduler slowness due to locking issues. " +
|
||||||
|
"Schedulers should use assignmultiple as a replacement.");
|
||||||
// start continuous scheduling thread
|
// start continuous scheduling thread
|
||||||
schedulingThread = new ContinuousSchedulingThread();
|
schedulingThread = new ContinuousSchedulingThread();
|
||||||
schedulingThread.setName("FairSchedulerContinuousScheduling");
|
schedulingThread.setName("FairSchedulerContinuousScheduling");
|
||||||
@ -1374,6 +1412,7 @@ public void serviceStart() throws Exception {
|
|||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
@Override
|
@Override
|
||||||
public void serviceStop() throws Exception {
|
public void serviceStop() throws Exception {
|
||||||
try {
|
try {
|
||||||
|
@ -97,20 +97,49 @@ public class FairSchedulerConfiguration extends Configuration {
|
|||||||
protected static final float DEFAULT_LOCALITY_THRESHOLD_RACK =
|
protected static final float DEFAULT_LOCALITY_THRESHOLD_RACK =
|
||||||
DEFAULT_LOCALITY_THRESHOLD;
|
DEFAULT_LOCALITY_THRESHOLD;
|
||||||
|
|
||||||
/** Delay for node locality. */
|
/**
|
||||||
protected static final String LOCALITY_DELAY_NODE_MS = CONF_PREFIX + "locality-delay-node-ms";
|
* Delay for node locality.
|
||||||
|
* @deprecated Continuous scheduling is known to cause locking issue inside
|
||||||
|
* Only used when {@link #CONTINUOUS_SCHEDULING_ENABLED} is enabled
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
protected static final String LOCALITY_DELAY_NODE_MS = CONF_PREFIX +
|
||||||
|
"locality-delay-node-ms";
|
||||||
|
@Deprecated
|
||||||
protected static final long DEFAULT_LOCALITY_DELAY_NODE_MS = -1L;
|
protected static final long DEFAULT_LOCALITY_DELAY_NODE_MS = -1L;
|
||||||
|
|
||||||
/** Delay for rack locality. */
|
/**
|
||||||
protected static final String LOCALITY_DELAY_RACK_MS = CONF_PREFIX + "locality-delay-rack-ms";
|
* Delay for rack locality.
|
||||||
|
* @deprecated Continuous scheduling is known to cause locking issue inside
|
||||||
|
* Only used when {@link #CONTINUOUS_SCHEDULING_ENABLED} is enabled
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
protected static final String LOCALITY_DELAY_RACK_MS = CONF_PREFIX +
|
||||||
|
"locality-delay-rack-ms";
|
||||||
|
@Deprecated
|
||||||
protected static final long DEFAULT_LOCALITY_DELAY_RACK_MS = -1L;
|
protected static final long DEFAULT_LOCALITY_DELAY_RACK_MS = -1L;
|
||||||
|
|
||||||
/** Enable continuous scheduling or not. */
|
/**
|
||||||
protected static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX + "continuous-scheduling-enabled";
|
* Enable continuous scheduling or not.
|
||||||
|
* @deprecated Continuous scheduling is known to cause locking issue inside
|
||||||
|
* the scheduler in larger cluster, more than 100 nodes, use
|
||||||
|
* {@link #ASSIGN_MULTIPLE} to improve container allocation ramp up.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
protected static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX +
|
||||||
|
"continuous-scheduling-enabled";
|
||||||
|
@Deprecated
|
||||||
protected static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false;
|
protected static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false;
|
||||||
|
|
||||||
/** Sleep time of each pass in continuous scheduling (5ms in default) */
|
/**
|
||||||
protected static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX + "continuous-scheduling-sleep-ms";
|
* Sleep time of each pass in continuous scheduling (5ms in default).
|
||||||
|
* @deprecated Continuous scheduling is known to cause locking issue inside
|
||||||
|
* Only used when {@link #CONTINUOUS_SCHEDULING_ENABLED} is enabled
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
protected static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX +
|
||||||
|
"continuous-scheduling-sleep-ms";
|
||||||
|
@Deprecated
|
||||||
protected static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5;
|
protected static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5;
|
||||||
|
|
||||||
/** Whether preemption is enabled. */
|
/** Whether preemption is enabled. */
|
||||||
@ -282,18 +311,45 @@ public float getLocalityThresholdRack() {
|
|||||||
return getFloat(LOCALITY_THRESHOLD_RACK, DEFAULT_LOCALITY_THRESHOLD_RACK);
|
return getFloat(LOCALITY_THRESHOLD_RACK, DEFAULT_LOCALITY_THRESHOLD_RACK);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether continuous scheduling is turned on.
|
||||||
|
* @deprecated use {@link #ASSIGN_MULTIPLE} to improve container allocation
|
||||||
|
* ramp up.
|
||||||
|
* @return whether continuous scheduling is enabled
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
public boolean isContinuousSchedulingEnabled() {
|
public boolean isContinuousSchedulingEnabled() {
|
||||||
return getBoolean(CONTINUOUS_SCHEDULING_ENABLED, DEFAULT_CONTINUOUS_SCHEDULING_ENABLED);
|
return getBoolean(CONTINUOUS_SCHEDULING_ENABLED,
|
||||||
|
DEFAULT_CONTINUOUS_SCHEDULING_ENABLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The sleep time of the continuous scheduler thread.
|
||||||
|
* @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation
|
||||||
|
* @return sleep time in ms
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
public int getContinuousSchedulingSleepMs() {
|
public int getContinuousSchedulingSleepMs() {
|
||||||
return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
|
return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS,
|
||||||
|
DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delay in milliseconds for locality fallback node to rack.
|
||||||
|
* @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation
|
||||||
|
* @return delay in ms
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
public long getLocalityDelayNodeMs() {
|
public long getLocalityDelayNodeMs() {
|
||||||
return getLong(LOCALITY_DELAY_NODE_MS, DEFAULT_LOCALITY_DELAY_NODE_MS);
|
return getLong(LOCALITY_DELAY_NODE_MS, DEFAULT_LOCALITY_DELAY_NODE_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delay in milliseconds for locality fallback rack to other.
|
||||||
|
* @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation
|
||||||
|
* @return delay in ms
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
public long getLocalityDelayRackMs() {
|
public long getLocalityDelayRackMs() {
|
||||||
return getLong(LOCALITY_DELAY_RACK_MS, DEFAULT_LOCALITY_DELAY_RACK_MS);
|
return getLong(LOCALITY_DELAY_RACK_MS, DEFAULT_LOCALITY_DELAY_RACK_MS);
|
||||||
}
|
}
|
||||||
|
@ -61,10 +61,12 @@
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
public class TestContinuousScheduling extends FairSchedulerTestBase {
|
public class TestContinuousScheduling extends FairSchedulerTestBase {
|
||||||
private ControlledClock mockClock;
|
private ControlledClock mockClock;
|
||||||
private static int delayThresholdTimeMs = 1000;
|
private static int delayThresholdTimeMs = 1000;
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
@Override
|
@Override
|
||||||
public Configuration createConfiguration() {
|
public Configuration createConfiguration() {
|
||||||
Configuration conf = super.createConfiguration();
|
Configuration conf = super.createConfiguration();
|
||||||
@ -77,6 +79,7 @@ public Configuration createConfiguration() {
|
|||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
mockClock = new ControlledClock();
|
mockClock = new ControlledClock();
|
||||||
@ -187,6 +190,7 @@ public void testSortedNodes() throws Exception {
|
|||||||
Assert.assertEquals(2, nodes.size());
|
Assert.assertEquals(2, nodes.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
@Test
|
@Test
|
||||||
public void testWithNodeRemoved() throws Exception {
|
public void testWithNodeRemoved() throws Exception {
|
||||||
// Disable continuous scheduling, will invoke continuous
|
// Disable continuous scheduling, will invoke continuous
|
||||||
@ -240,6 +244,7 @@ public void testWithNodeRemoved() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
@Test
|
@Test
|
||||||
public void testInterruptedException()
|
public void testInterruptedException()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
@ -284,6 +289,7 @@ public void testInterruptedException()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
@Test
|
@Test
|
||||||
public void testSchedulerThreadLifeCycle() throws InterruptedException {
|
public void testSchedulerThreadLifeCycle() throws InterruptedException {
|
||||||
scheduler.start();
|
scheduler.start();
|
||||||
@ -300,6 +306,7 @@ public void testSchedulerThreadLifeCycle() throws InterruptedException {
|
|||||||
assertNotEquals("The Scheduling thread is still alive", 0, numRetries);
|
assertNotEquals("The Scheduling thread is still alive", 0, numRetries);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
@Test
|
@Test
|
||||||
public void TestNodeAvailableResourceComparatorTransitivity() {
|
public void TestNodeAvailableResourceComparatorTransitivity() {
|
||||||
ClusterNodeTracker<FSSchedulerNode> clusterNodeTracker =
|
ClusterNodeTracker<FSSchedulerNode> clusterNodeTracker =
|
||||||
@ -384,6 +391,7 @@ public void testFairSchedulerContinuousSchedulingInitTime() throws Exception {
|
|||||||
assertEquals(delayThresholdTimeMs, initSchedulerTime);
|
assertEquals(delayThresholdTimeMs, initSchedulerTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
private void triggerSchedulingAttempt() throws InterruptedException {
|
private void triggerSchedulingAttempt() throws InterruptedException {
|
||||||
Thread.sleep(
|
Thread.sleep(
|
||||||
2 * scheduler.getConf().getContinuousSchedulingSleepMs());
|
2 * scheduler.getConf().getContinuousSchedulingSleepMs());
|
||||||
|
@ -194,6 +194,7 @@ public void testConfValidation() throws Exception {
|
|||||||
|
|
||||||
// TESTS
|
// TESTS
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
@Test(timeout=2000)
|
@Test(timeout=2000)
|
||||||
public void testLoadConfigurationOnInitialize() throws IOException {
|
public void testLoadConfigurationOnInitialize() throws IOException {
|
||||||
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
|
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
|
||||||
@ -4410,6 +4411,7 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception {
|
|||||||
verifyQueueNumRunnable("queue1.sub3", 0, 0);
|
verifyQueueNumRunnable("queue1.sub3", 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
@Test
|
@Test
|
||||||
public void testSchedulingOnRemovedNode() throws Exception {
|
public void testSchedulingOnRemovedNode() throws Exception {
|
||||||
// Disable continuous scheduling, will invoke continuous scheduling manually
|
// Disable continuous scheduling, will invoke continuous scheduling manually
|
||||||
|
Loading…
Reference in New Issue
Block a user