YARN-1474. Make schedulers services. (Tsuyoshi Ozawa via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1598908 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-05-31 19:33:09 +00:00
parent 77805fb69b
commit a4ba451802
18 changed files with 558 additions and 202 deletions

View File

@ -61,10 +61,13 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -89,11 +92,13 @@
import com.codahale.metrics.SlidingWindowReservoir;
import com.codahale.metrics.Timer;
public class ResourceSchedulerWrapper implements
SchedulerWrapper,ResourceScheduler,Configurable {
final public class ResourceSchedulerWrapper
extends AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>
implements SchedulerWrapper, ResourceScheduler, Configurable {
private static final String EOL = System.getProperty("line.separator");
private static final int SAMPLING_SIZE = 60;
private ScheduledExecutorService pool;
private RMContext rmContext;
// counters for scheduler allocate/handle operations
private Counter schedulerAllocateCounter;
private Counter schedulerHandleCounter;
@ -146,6 +151,7 @@ public class ResourceSchedulerWrapper implements
public final Logger LOG = Logger.getLogger(ResourceSchedulerWrapper.class);
public ResourceSchedulerWrapper() {
super(ResourceSchedulerWrapper.class.getName());
samplerLock = new ReentrantLock();
queueLock = new ReentrantLock();
}
@ -794,10 +800,39 @@ public Configuration getConf() {
return conf;
}
@SuppressWarnings("unchecked")
@Override
public void reinitialize(Configuration entries, RMContext rmContext)
throws IOException {
scheduler.reinitialize(entries, rmContext);
public void serviceInit(Configuration conf) throws Exception {
((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>)
scheduler).init(conf);
super.serviceInit(conf);
}
@SuppressWarnings("unchecked")
@Override
public void serviceStart() throws Exception {
((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>)
scheduler).start();
super.serviceStart();
}
@SuppressWarnings("unchecked")
@Override
public void serviceStop() throws Exception {
((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>)
scheduler).stop();
super.serviceStop();
}
@Override
public void setRMContext(RMContext rmContext) {
scheduler.setRMContext(rmContext);
}
@Override
public void reinitialize(Configuration conf, RMContext rmContext)
throws IOException {
scheduler.reinitialize(conf, rmContext);
}
@Override

View File

@ -130,6 +130,8 @@ Release 2.5.0 - UNRELEASED
NMContainerStatus which has more information that is needed for
work-preserving RM-restart. (Jian He via vinodkv)
YARN-1474. Make schedulers services. (Tsuyoshi Ozawa via kasha)
OPTIMIZATIONS
BUG FIXES

View File

@ -401,6 +401,8 @@ protected void serviceInit(Configuration configuration) throws Exception {
// Initialize the scheduler
scheduler = createScheduler();
scheduler.setRMContext(rmContext);
addIfService(scheduler);
rmContext.setScheduler(scheduler);
schedulerDispatcher = createSchedulerEventDispatcher();
@ -429,12 +431,6 @@ protected void serviceInit(Configuration configuration) throws Exception {
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null);
try {
scheduler.reinitialize(conf, rmContext);
} catch (IOException ioe) {
throw new RuntimeException("Failed to initialize scheduler", ioe);
}
// creating monitors that handle preemption
createPolicyMonitors();

View File

@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@ -41,7 +42,7 @@
public abstract class AbstractYarnScheduler
<T extends SchedulerApplicationAttempt, N extends SchedulerNode>
implements ResourceScheduler {
extends AbstractService implements ResourceScheduler {
private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
@ -62,6 +63,15 @@ public abstract class AbstractYarnScheduler
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
/**
* Construct the service.
*
* @param name service name
*/
public AbstractYarnScheduler(String name) {
super(name);
}
public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();

View File

@ -34,6 +34,15 @@
@LimitedPrivate("yarn")
@Evolving
public interface ResourceScheduler extends YarnScheduler, Recoverable {
/**
* Set RMContext for <code>ResourceScheduler</code>.
* This method should be called immediately after instantiating
* a scheduler once.
* @param rmContext created by ResourceManager
*/
void setRMContext(RMContext rmContext);
/**
* Re-initialize the <code>ResourceScheduler</code>.
* @param conf configuration

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@ -102,6 +103,8 @@ public class CapacityScheduler extends
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
private CSQueue root;
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
@Override
@ -179,8 +182,6 @@ public Configuration getConf() {
private int numNodeManagers = 0;
private boolean initialized = false;
private ResourceCalculator calculator;
private boolean usePortForNodeName;
@ -196,7 +197,9 @@ public Configuration getConf() {
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
public CapacityScheduler() {}
public CapacityScheduler() {
super(CapacityScheduler.class.getName());
}
@Override
public QueueMetrics getRootQueueMetrics() {
@ -238,55 +241,90 @@ public synchronized int getNumClusterNodes() {
}
@Override
public RMContext getRMContext() {
public synchronized RMContext getRMContext() {
return this.rmContext;
}
@Override
public synchronized void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
private synchronized void initScheduler(Configuration configuration) throws
IOException {
this.conf = loadCapacitySchedulerConfiguration(configuration);
validateConf(this.conf);
this.minimumAllocation = this.conf.getMinimumAllocation();
this.maximumAllocation = this.conf.getMaximumAllocation();
this.calculator = this.conf.getResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications =
new ConcurrentHashMap<ApplicationId,
SchedulerApplication<FiCaSchedulerApp>>();
initializeQueues(this.conf);
scheduleAsynchronously = this.conf.getScheduleAynschronously();
asyncScheduleInterval =
this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
DEFAULT_ASYNC_SCHEDULER_INTERVAL);
if (scheduleAsynchronously) {
asyncSchedulerThread = new AsyncScheduleThread(this);
}
LOG.info("Initialized CapacityScheduler with " +
"calculator=" + getResourceCalculator().getClass() + ", " +
"minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
"maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
"asynchronousScheduling=" + scheduleAsynchronously + ", " +
"asyncScheduleInterval=" + asyncScheduleInterval + "ms");
}
private synchronized void startSchedulerThreads() {
if (scheduleAsynchronously) {
Preconditions.checkNotNull(asyncSchedulerThread,
"asyncSchedulerThread is null");
asyncSchedulerThread.start();
}
}
@Override
public void serviceInit(Configuration conf) throws Exception {
Configuration configuration = new Configuration(conf);
initScheduler(configuration);
super.serviceInit(conf);
}
@Override
public void serviceStart() throws Exception {
startSchedulerThreads();
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
synchronized (this) {
if (scheduleAsynchronously && asyncSchedulerThread != null) {
asyncSchedulerThread.interrupt();
asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS);
}
}
super.serviceStop();
}
@Override
public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException {
reinitialize(Configuration conf, RMContext rmContext) throws IOException {
Configuration configuration = new Configuration(conf);
if (!initialized) {
this.rmContext = rmContext;
this.conf = loadCapacitySchedulerConfiguration(configuration);
validateConf(this.conf);
this.minimumAllocation = this.conf.getMinimumAllocation();
this.maximumAllocation = this.conf.getMaximumAllocation();
this.calculator = this.conf.getResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications =
new ConcurrentHashMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
initializeQueues(this.conf);
scheduleAsynchronously = this.conf.getScheduleAynschronously();
asyncScheduleInterval =
this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
DEFAULT_ASYNC_SCHEDULER_INTERVAL);
if (scheduleAsynchronously) {
asyncSchedulerThread = new AsyncScheduleThread(this);
asyncSchedulerThread.start();
}
initialized = true;
LOG.info("Initialized CapacityScheduler with " +
"calculator=" + getResourceCalculator().getClass() + ", " +
"minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
"maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
"asynchronousScheduling=" + scheduleAsynchronously + ", " +
"asyncScheduleInterval=" + asyncScheduleInterval + "ms");
} else {
CapacitySchedulerConfiguration oldConf = this.conf;
this.conf = loadCapacitySchedulerConfiguration(configuration);
validateConf(this.conf);
try {
LOG.info("Re-initializing queues...");
reinitializeQueues(this.conf);
} catch (Throwable t) {
this.conf = oldConf;
throw new IOException("Failed to re-init queues", t);
}
CapacitySchedulerConfiguration oldConf = this.conf;
this.conf = loadCapacitySchedulerConfiguration(configuration);
validateConf(this.conf);
try {
LOG.info("Re-initializing queues...");
reinitializeQueues(this.conf);
} catch (Throwable t) {
this.conf = oldConf;
throw new IOException("Failed to re-init queues", t);
}
}

View File

@ -68,7 +68,9 @@ public class AllocationFileLoaderService extends AbstractService {
* (this is done to prevent loading a file that hasn't been fully written).
*/
public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000;
public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
private final Clock clock;
private long lastSuccessfulReload; // Last time we successfully reloaded queues
@ -146,7 +148,14 @@ public void run() {
@Override
public void stop() {
running = false;
reloadThread.interrupt();
if (reloadThread != null) {
reloadThread.interrupt();
try {
reloadThread.join(THREAD_JOIN_TIMEOUT_MS);
} catch (InterruptedException e) {
LOG.warn("reloadThread fails to join.");
}
}
super.stop();
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -117,7 +118,6 @@
@SuppressWarnings("unchecked")
public class FairScheduler extends
AbstractYarnScheduler<FSSchedulerApp, FSSchedulerNode> {
private boolean initialized;
private FairSchedulerConfiguration conf;
private Resource incrAllocation;
@ -137,6 +137,11 @@ public class FairScheduler extends
// How often fair shares are re-calculated (ms)
protected long UPDATE_INTERVAL = 500;
private Thread updateThread;
private Thread schedulingThread;
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
// Aggregate metrics
FSQueueMetrics rootMetrics;
@ -182,6 +187,7 @@ public class FairScheduler extends
AllocationConfiguration allocConf;
public FairScheduler() {
super(FairScheduler.class.getName());
clock = new SystemClock();
allocsLoader = new AllocationFileLoaderService();
queueMgr = new QueueManager(this);
@ -473,7 +479,8 @@ protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
return resToPreempt;
}
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
public synchronized RMContainerTokenSecretManager
getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager();
}
@ -1154,87 +1161,130 @@ public void recover(RMState state) throws Exception {
// NOT IMPLEMENTED
}
@Override
public synchronized void reinitialize(Configuration conf, RMContext rmContext)
public synchronized void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
private synchronized void initScheduler(Configuration conf)
throws IOException {
if (!initialized) {
this.conf = new FairSchedulerConfiguration(conf);
validateConf(this.conf);
minimumAllocation = this.conf.getMinimumAllocation();
maximumAllocation = this.conf.getMaximumAllocation();
incrAllocation = this.conf.getIncrementAllocation();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs =
this.conf.getContinuousSchedulingSleepMs();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
preemptionEnabled = this.conf.getPreemptionEnabled();
preemptionUtilizationThreshold =
this.conf.getPreemptionUtilizationThreshold();
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
sizeBasedWeight = this.conf.getSizeBasedWeight();
preemptionInterval = this.conf.getPreemptionInterval();
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
usePortForNodeName = this.conf.getUsePortForNodeName();
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext;
// This stores per-application scheduling information
this.applications =
new ConcurrentHashMap<ApplicationId, SchedulerApplication<FSSchedulerApp>>();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
this.conf = new FairSchedulerConfiguration(conf);
validateConf(this.conf);
minimumAllocation = this.conf.getMinimumAllocation();
maximumAllocation = this.conf.getMaximumAllocation();
incrAllocation = this.conf.getIncrementAllocation();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs =
this.conf.getContinuousSchedulingSleepMs();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
preemptionEnabled = this.conf.getPreemptionEnabled();
preemptionUtilizationThreshold =
this.conf.getPreemptionUtilizationThreshold();
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
sizeBasedWeight = this.conf.getSizeBasedWeight();
preemptionInterval = this.conf.getPreemptionInterval();
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
usePortForNodeName = this.conf.getUsePortForNodeName();
initialized = true;
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
// This stores per-application scheduling information
this.applications =
new ConcurrentHashMap<ApplicationId,SchedulerApplication<FSSchedulerApp>>();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
allocConf = new AllocationConfiguration(conf);
try {
queueMgr.initialize(conf);
} catch (Exception e) {
throw new IOException("Failed to start FairScheduler", e);
}
allocConf = new AllocationConfiguration(conf);
try {
queueMgr.initialize(conf);
} catch (Exception e) {
throw new IOException("Failed to start FairScheduler", e);
}
Thread updateThread = new Thread(new UpdateThread());
updateThread.setName("FairSchedulerUpdateThread");
updateThread.setDaemon(true);
updateThread.start();
updateThread = new Thread(new UpdateThread());
updateThread.setName("FairSchedulerUpdateThread");
updateThread.setDaemon(true);
if (continuousSchedulingEnabled) {
// start continuous scheduling thread
Thread schedulingThread = new Thread(
if (continuousSchedulingEnabled) {
// start continuous scheduling thread
schedulingThread = new Thread(
new Runnable() {
@Override
public void run() {
continuousScheduling();
}
}
);
schedulingThread.setName("ContinuousScheduling");
schedulingThread.setDaemon(true);
schedulingThread.start();
);
schedulingThread.setName("ContinuousScheduling");
schedulingThread.setDaemon(true);
}
allocsLoader.init(conf);
allocsLoader.setReloadListener(new AllocationReloadListener());
// If we fail to load allocations file on initialize, we want to fail
// immediately. After a successful load, exceptions on future reloads
// will just result in leaving things as they are.
try {
allocsLoader.reloadAllocations();
} catch (Exception e) {
throw new IOException("Failed to initialize FairScheduler", e);
}
}
private synchronized void startSchedulerThreads() {
Preconditions.checkNotNull(updateThread, "updateThread is null");
Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
updateThread.start();
if (continuousSchedulingEnabled) {
Preconditions.checkNotNull(schedulingThread, "schedulingThread is null");
schedulingThread.start();
}
allocsLoader.start();
}
@Override
public void serviceInit(Configuration conf) throws Exception {
initScheduler(conf);
super.serviceInit(conf);
}
@Override
public void serviceStart() throws Exception {
startSchedulerThreads();
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
synchronized (this) {
if (updateThread != null) {
updateThread.interrupt();
updateThread.join(THREAD_JOIN_TIMEOUT_MS);
}
allocsLoader.init(conf);
allocsLoader.setReloadListener(new AllocationReloadListener());
// If we fail to load allocations file on initialize, we want to fail
// immediately. After a successful load, exceptions on future reloads
// will just result in leaving things as they are.
try {
allocsLoader.reloadAllocations();
} catch (Exception e) {
throw new IOException("Failed to initialize FairScheduler", e);
if (continuousSchedulingEnabled) {
if (schedulingThread != null) {
schedulingThread.interrupt();
schedulingThread.join(THREAD_JOIN_TIMEOUT_MS);
}
}
allocsLoader.start();
} else {
try {
allocsLoader.reloadAllocations();
} catch (Exception e) {
LOG.error("Failed to reload allocations file", e);
if (allocsLoader != null) {
allocsLoader.stop();
}
}
super.serviceStop();
}
@Override
public synchronized void reinitialize(Configuration conf, RMContext rmContext)
throws IOException {
try {
allocsLoader.reloadAllocations();
} catch (Exception e) {
LOG.error("Failed to reload allocations file", e);
}
}
@Override

View File

@ -89,6 +89,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -111,7 +112,6 @@ public class FifoScheduler extends
Configuration conf;
private boolean initialized;
private boolean usePortForNodeName;
private ActiveUsersManager activeUsersManager;
@ -180,6 +180,47 @@ public ActiveUsersManager getActiveUsersManager() {
}
};
public FifoScheduler() {
super(FifoScheduler.class.getName());
}
private synchronized void initScheduler(Configuration conf) {
validateConf(conf);
//Use ConcurrentSkipListMap because applications need to be ordered
this.applications =
new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
this.minimumAllocation =
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
this.maximumAllocation =
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
this.usePortForNodeName = conf.getBoolean(
YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
conf);
this.activeUsersManager = new ActiveUsersManager(metrics);
}
@Override
public void serviceInit(Configuration conf) throws Exception {
initScheduler(conf);
super.serviceInit(conf);
}
@Override
public void serviceStart() throws Exception {
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
super.serviceStop();
}
@Override
public synchronized void setConf(Configuration conf) {
this.conf = conf;
@ -215,36 +256,18 @@ public int getNumClusterNodes() {
return nodes.size();
}
@Override
public synchronized void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
@Override
public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException
{
setConf(conf);
if (!this.initialized) {
validateConf(conf);
this.rmContext = rmContext;
//Use ConcurrentSkipListMap because applications need to be ordered
this.applications =
new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
this.minimumAllocation =
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
this.maximumAllocation =
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
this.usePortForNodeName = conf.getBoolean(
YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
conf);
this.activeUsersManager = new ActiveUsersManager(metrics);
this.initialized = true;
}
}
@Override
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,

View File

@ -77,12 +77,12 @@ public static void setup() {
@Test (timeout = 30000)
public void testConfValidation() throws Exception {
ResourceScheduler scheduler = new FifoScheduler();
FifoScheduler scheduler = new FifoScheduler();
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
try {
scheduler.reinitialize(conf, null);
scheduler.serviceInit(conf);
fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation.");
} catch (YarnRuntimeException e) {
@ -218,6 +218,9 @@ public void test() throws Exception {
public void testNodeUpdateBeforeAppAttemptInit() throws Exception {
FifoScheduler scheduler = new FifoScheduler();
MockRM rm = new MockRM(conf);
scheduler.setRMContext(rm.getRMContext());
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, rm.getRMContext());
RMNode node = MockNodes.newNodeInfo(1,
@ -293,6 +296,8 @@ public void testReconnectedNode() throws Exception {
conf.setQueues("default", new String[] {"default"});
conf.setCapacity("default", 100);
FifoScheduler fs = new FifoScheduler();
fs.init(conf);
fs.start();
fs.reinitialize(conf, null);
RMNode n1 =
@ -313,6 +318,7 @@ public void testReconnectedNode() throws Exception {
fs.handle(new NodeUpdateSchedulerEvent(n1));
Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
fs.stop();
}
@Test (timeout = 50000)

View File

@ -121,13 +121,16 @@ public void setUp() throws Exception {
@After
public void tearDown() throws Exception {
resourceManager.stop();
if (resourceManager != null) {
resourceManager.stop();
}
}
@Test (timeout = 30000)
public void testConfValidation() throws Exception {
ResourceScheduler scheduler = new CapacityScheduler();
scheduler.setRMContext(resourceManager.getRMContext());
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
@ -342,18 +345,23 @@ public void testMaximumCapacitySetup() {
public void testRefreshQueues() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null);
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null));
cs.setRMContext(resourceManager.getRMContext());
cs.init(conf);
cs.start();
cs.reinitialize(conf, rmContext);
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
conf.setCapacity(A, 80f);
conf.setCapacity(B, 20f);
cs.reinitialize(conf, mockContext);
checkQueueCapacities(cs, 80f, 20f);
cs.stop();
}
private void checkQueueCapacities(CapacityScheduler cs,
@ -456,6 +464,9 @@ public void testReconnectedNode() throws Exception {
setupQueueConfiguration(csConf);
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
cs.init(csConf);
cs.start();
cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
null, null, new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
@ -475,6 +486,7 @@ null, null, new RMContainerTokenSecretManager(csConf),
cs.handle(new NodeAddedSchedulerEvent(n1));
Assert.assertEquals(4 * GB, cs.getClusterResource().getMemory());
cs.stop();
}
@Test
@ -483,6 +495,9 @@ public void testRefreshQueuesWithNewQueue() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
cs.init(conf);
cs.start();
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
@ -513,6 +528,7 @@ null, new RMContainerTokenSecretManager(conf),
assertEquals(queueB, queueB4.getParent());
} finally {
B3_CAPACITY += B4_CAPACITY;
cs.stop();
}
}
@Test

View File

@ -114,7 +114,7 @@ public void setUp() throws Exception {
setupQueueConfiguration(csConf, newRoot);
YarnConfiguration conf = new YarnConfiguration();
cs.setConf(conf);
csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(conf);
@ -142,7 +142,9 @@ public void setUp() throws Exception {
queues, queues,
TestUtils.spyHook);
cs.reinitialize(csConf, rmContext);
cs.setRMContext(rmContext);
cs.init(csConf);
cs.start();
}
private static final String A = "a";
@ -2080,5 +2082,8 @@ private CapacitySchedulerContext mockCSContext(
@After
public void tearDown() throws Exception {
if (cs != null) {
cs.stop();
}
}
}

View File

@ -43,11 +43,15 @@ public void testQueueParsing() throws Exception {
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext = new RMContextImpl(null, null,
null, null, null, null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null);
capacityScheduler.setConf(conf);
capacityScheduler.reinitialize(conf, new RMContextImpl(null, null,
null, null, null, null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null));
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(conf);
capacityScheduler.start();
capacityScheduler.reinitialize(conf, rmContext);
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
@ -62,6 +66,7 @@ null, null, null, null, new RMContainerTokenSecretManager(conf),
Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.7 * 0.55 * 0.7,
c12.getAbsoluteMaximumCapacity(), DELTA);
capacityScheduler.stop();
}
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
@ -142,7 +147,10 @@ public void testRootQueueParsing() throws Exception {
CapacityScheduler capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(new YarnConfiguration());
capacityScheduler.init(conf);
capacityScheduler.start();
capacityScheduler.reinitialize(conf, null);
capacityScheduler.stop();
}
public void testMaxCapacity() throws Exception {
@ -164,6 +172,8 @@ public void testMaxCapacity() throws Exception {
try {
capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(new YarnConfiguration());
capacityScheduler.init(conf);
capacityScheduler.start();
capacityScheduler.reinitialize(conf, null);
} catch (IllegalArgumentException iae) {
fail = true;
@ -176,6 +186,8 @@ public void testMaxCapacity() throws Exception {
// Now this should work
capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(new YarnConfiguration());
capacityScheduler.init(conf);
capacityScheduler.start();
capacityScheduler.reinitialize(conf, null);
fail = false;
@ -187,6 +199,7 @@ public void testMaxCapacity() throws Exception {
}
Assert.assertTrue("Didn't throw IllegalArgumentException for wrong " +
"setMaxCap", fail);
capacityScheduler.stop();
}
}

View File

@ -48,6 +48,8 @@ public void setup() throws IOException {
ResourceManager resourceManager = new ResourceManager();
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
String queueName = "root.queue1";

View File

@ -120,6 +120,8 @@ public void setUp() throws IOException {
// to initialize the master key
resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
scheduler.setRMContext(resourceManager.getRMContext());
}
@After
@ -133,12 +135,12 @@ public void tearDown() {
@Test (timeout = 30000)
public void testConfValidation() throws Exception {
ResourceScheduler scheduler = new FairScheduler();
FairScheduler scheduler = new FairScheduler();
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
try {
scheduler.reinitialize(conf, null);
scheduler.serviceInit(conf);
fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation.");
} catch (YarnRuntimeException e) {
@ -152,7 +154,7 @@ public void testConfValidation() throws Exception {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1);
try {
scheduler.reinitialize(conf, null);
scheduler.serviceInit(conf);
fail("Exception is expected because the min vcores allocation is" +
" larger than the max vcores allocation.");
} catch (YarnRuntimeException e) {
@ -184,6 +186,8 @@ public void testLoadConfigurationOnInitialize() throws IOException {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
128);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
Assert.assertEquals(true, scheduler.assignMultiple);
Assert.assertEquals(3, scheduler.maxAssign);
@ -211,6 +215,7 @@ public void testNonMinZeroResourcesSettings() throws IOException {
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
fs.init(conf);
fs.reinitialize(conf, null);
Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory());
Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores());
@ -228,8 +233,9 @@ public void testMinZeroResourcesSettings() throws IOException {
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
fs.reinitialize(conf, null);
Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory());
fs.init(conf);
fs.reinitialize(conf, null);
Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory());
Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores());
Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory());
Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores());
@ -237,6 +243,8 @@ public void testMinZeroResourcesSettings() throws IOException {
@Test
public void testAggregateCapacityTracking() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@ -262,6 +270,8 @@ public void testAggregateCapacityTracking() throws Exception {
@Test
public void testSimpleFairShareCalculation() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
@ -289,6 +299,8 @@ public void testSimpleFairShareCalculation() throws IOException {
@Test
public void testSimpleHierarchicalFairShareCalculation() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
@ -322,6 +334,8 @@ public void testSimpleHierarchicalFairShareCalculation() throws IOException {
@Test
public void testHierarchicalQueuesSimilarParents() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueManager = scheduler.getQueueManager();
@ -346,6 +360,8 @@ public void testHierarchicalQueuesSimilarParents() throws IOException {
@Test
public void testSchedulerRootQueueMetrics() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@ -385,6 +401,8 @@ public void testSchedulerRootQueueMetrics() throws Exception {
@Test (timeout = 5000)
public void testSimpleContainerAllocation() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@ -433,6 +451,8 @@ public void testSimpleContainerAllocation() throws IOException {
@Test (timeout = 5000)
public void testSimpleContainerReservation() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@ -487,6 +507,8 @@ public void testSimpleContainerReservation() throws Exception {
@Test
public void testUserAsDefaultQueue() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMContext rmContext = resourceManager.getRMContext();
Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
@ -513,6 +535,8 @@ public void testUserAsDefaultQueue() throws Exception {
@Test
public void testNotUserAsDefaultQueue() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMContext rmContext = resourceManager.getRMContext();
Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
@ -539,6 +563,8 @@ public void testNotUserAsDefaultQueue() throws Exception {
@Test
public void testEmptyQueueName() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// only default queue
@ -559,8 +585,10 @@ public void testEmptyQueueName() throws Exception {
@Test
public void testAssignToQueue() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
@ -577,6 +605,8 @@ public void testAssignToQueue() throws Exception {
@Test
public void testAssignToNonLeafQueueReturnsNull() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
scheduler.getQueueManager().getLeafQueue("root.child1.granchild", true);
@ -594,6 +624,8 @@ public void testAssignToNonLeafQueueReturnsNull() throws Exception {
public void testQueuePlacementWithPolicy() throws Exception {
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId appId;
@ -654,7 +686,9 @@ public void testFairShareWithMinAlloc() throws Exception {
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
@ -703,6 +737,8 @@ public void testNestedUserQueue() throws IOException {
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
@ -735,6 +771,8 @@ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception {
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
int capacity = 16 * 1024;
@ -769,6 +807,8 @@ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception {
*/
@Test
public void testQueueDemandCalculation() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
@ -819,6 +859,8 @@ public void testQueueDemandCalculation() throws Exception {
@Test
public void testAppAdditionAndRemoval() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId attemptId =createAppAttemptId(1, 1);
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default",
@ -869,6 +911,8 @@ public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAX
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueManager = scheduler.getQueueManager();
@ -901,7 +945,9 @@ public void testConfigureRootQueue() throws Exception {
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueManager = scheduler.getQueueManager();
@ -928,6 +974,8 @@ public void testIsStarvedForMinShare() throws Exception {
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
@ -985,8 +1033,10 @@ public void testIsStarvedForFairShare() throws Exception {
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
@ -1057,7 +1107,9 @@ public void testChoiceOfPreemptedContainers() throws Exception {
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Create two nodes
@ -1224,6 +1276,8 @@ public void testPreemptionDecision() throws Exception {
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Create four nodes
@ -1321,6 +1375,8 @@ public void testPreemptionDecision() throws Exception {
@Test (timeout = 5000)
public void testMultipleContainersWaitingForReservation() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@ -1363,8 +1419,10 @@ public void testUserMaxRunningApps() throws Exception {
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
RMNode node1 =
MockNodes
@ -1404,6 +1462,8 @@ public void testUserMaxRunningApps() throws Exception {
@Test (timeout = 5000)
public void testReservationWhileMultiplePriorities() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@ -1484,8 +1544,10 @@ public void testAclSubmitApplication() throws Exception {
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
"norealuserhasthisname", 1);
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
@ -1499,6 +1561,8 @@ public void testAclSubmitApplication() throws Exception {
@Test (timeout = 5000)
public void testMultipleNodesSingleRackRequest() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 =
@ -1548,6 +1612,8 @@ public void testMultipleNodesSingleRackRequest() throws Exception {
@Test (timeout = 5000)
public void testFifoWithinQueue() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 =
@ -1592,6 +1658,8 @@ public void testFifoWithinQueue() throws Exception {
@Test(timeout = 3000)
public void testMaxAssign() throws Exception {
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node =
@ -1635,6 +1703,8 @@ public void testMaxAssign() throws Exception {
*/
@Test(timeout = 5000)
public void testAssignContainer() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
final String user = "user1";
@ -1718,9 +1788,11 @@ public void testNotAllowSubmitApplication() throws Exception {
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
int appId = this.APP_ID++;
String user = "usernotallow";
String queue = "queue1";
@ -1769,6 +1841,8 @@ public void testNotAllowSubmitApplication() throws Exception {
@Test
public void testReservationThatDoesntFit() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 =
@ -1797,6 +1871,8 @@ public void testReservationThatDoesntFit() throws IOException {
@Test
public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
@ -1825,6 +1901,8 @@ public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException {
@Test
public void testStrictLocality() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@ -1865,6 +1943,8 @@ public void testStrictLocality() throws IOException {
@Test
public void testCancelStrictLocality() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@ -1915,6 +1995,8 @@ public void testCancelStrictLocality() throws IOException {
*/
@Test
public void testReservationsStrictLocality() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@ -1955,6 +2037,8 @@ public void testReservationsStrictLocality() throws IOException {
@Test
public void testNoMoreCpuOnNode() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1),
@ -1976,6 +2060,8 @@ public void testNoMoreCpuOnNode() throws IOException {
@Test
public void testBasicDRFAssignment() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5));
@ -2016,6 +2102,8 @@ public void testBasicDRFAssignment() throws Exception {
*/
@Test
public void testBasicDRFWithQueues() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7),
@ -2052,6 +2140,8 @@ public void testBasicDRFWithQueues() throws Exception {
@Test
public void testDRFHierarchicalQueues() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12),
@ -2120,7 +2210,9 @@ public void testDRFHierarchicalQueues() throws Exception {
public void testHostPortNodeName() throws Exception {
conf.setBoolean(YarnConfiguration
.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
scheduler.reinitialize(conf,
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf,
resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024),
1, "127.0.0.1", 1);
@ -2200,9 +2292,11 @@ public void testUserAndQueueMaxRunningApps() throws Exception {
out.println("</user>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// exceeds no limits
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1");
verifyAppRunnable(attId1, true);
@ -2254,9 +2348,11 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception {
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// exceeds no limits
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1");
verifyAppRunnable(attId1, true);
@ -2316,6 +2412,9 @@ public void testContinuousScheduling() throws Exception {
Configuration conf = createConfiguration();
conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
true);
fs.setRMContext(resourceManager.getRMContext());
fs.init(conf);
fs.start();
fs.reinitialize(conf, resourceManager.getRMContext());
Assert.assertTrue("Continuous scheduling should be enabled.",
fs.isContinuousSchedulingEnabled());
@ -2396,6 +2495,8 @@ public void testDontAllowUndeclaredPools() throws Exception{
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueManager = scheduler.getQueueManager();
@ -2439,6 +2540,8 @@ public void testDefaultRuleInitializesProperlyWhenPolicyNotConfigured()
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
List<QueuePlacementRule> rules = scheduler.allocConf.placementPolicy
@ -2455,6 +2558,8 @@ public void testDefaultRuleInitializesProperlyWhenPolicyNotConfigured()
@SuppressWarnings("resource")
@Test
public void testBlacklistNodes() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
final int GB = 1024;
@ -2507,6 +2612,8 @@ public void testBlacklistNodes() throws Exception {
@Test
public void testGetAppsInQueue() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId appAttId1 =
@ -2552,8 +2659,10 @@ public void testAddAndRemoveAppFromFairScheduler() throws Exception {
@Test
public void testMoveRunnableApp() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true);
@ -2591,8 +2700,10 @@ public void testMoveRunnableApp() throws Exception {
@Test
public void testMoveNonRunnableApp() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true);
@ -2611,8 +2722,10 @@ public void testMoveNonRunnableApp() throws Exception {
@Test
public void testMoveMakesAppRunnable() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true);
@ -2638,8 +2751,10 @@ public void testMoveMakesAppRunnable() throws Exception {
@Test (expected = YarnException.class)
public void testMoveWouldViolateMaxAppsConstraints() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueMgr = scheduler.getQueueManager();
queueMgr.getLeafQueue("queue2", true);
scheduler.getAllocationConfiguration().queueMaxApps.put("root.queue2", 0);
@ -2652,8 +2767,10 @@ public void testMoveWouldViolateMaxAppsConstraints() throws Exception {
@Test (expected = YarnException.class)
public void testMoveWouldViolateMaxResourcesConstraints() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
queueMgr.getLeafQueue("queue2", true);
@ -2675,6 +2792,8 @@ public void testMoveWouldViolateMaxResourcesConstraints() throws Exception {
@Test (expected = YarnException.class)
public void testMoveToNonexistentQueue() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
scheduler.getQueueManager().getLeafQueue("queue1", true);

View File

@ -51,6 +51,8 @@ public void setUp() throws IOException {
resourceManager = new ResourceManager();
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
}
@ -69,7 +71,13 @@ public void testCreateEventLog() throws IOException {
public void tearDown() {
logFile.delete();
logFile.getParentFile().delete(); // fairscheduler/
scheduler = null;
resourceManager = null;
if (scheduler != null) {
scheduler.stop();
scheduler = null;
}
if (resourceManager != null) {
resourceManager.stop();
resourceManager = null;
}
}
}

View File

@ -145,9 +145,13 @@ public void testAppAttemptMetrics() throws Exception {
RMContext rmContext = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null, null, writer);
FifoScheduler schedular = new FifoScheduler();
schedular.reinitialize(new Configuration(), rmContext);
QueueMetrics metrics = schedular.getRootQueueMetrics();
FifoScheduler scheduler = new FifoScheduler();
Configuration conf = new Configuration();
scheduler.setRMContext(rmContext);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, rmContext);
QueueMetrics metrics = scheduler.getRootQueueMetrics();
int beforeAppsSubmitted = metrics.getAppsSubmitted();
ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
@ -155,18 +159,19 @@ public void testAppAttemptMetrics() throws Exception {
appId, 1);
SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "queue", "user");
schedular.handle(appEvent);
scheduler.handle(appEvent);
SchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
schedular.handle(attemptEvent);
scheduler.handle(attemptEvent);
appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
SchedulerEvent attemptEvent2 =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
schedular.handle(attemptEvent2);
scheduler.handle(attemptEvent2);
int afterAppsSubmitted = metrics.getAppsSubmitted();
Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted);
scheduler.stop();
}
@Test(timeout=2000)
@ -184,6 +189,9 @@ public void testNodeLocalAssignment() throws Exception {
null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
FifoScheduler scheduler = new FifoScheduler();
scheduler.setRMContext(rmContext);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(new Configuration(), rmContext);
RMNode node0 = MockNodes.newNodeInfo(1,
@ -232,6 +240,7 @@ public void testNodeLocalAssignment() throws Exception {
//Also check that the containers were scheduled
SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId);
Assert.assertEquals(3, info.getLiveContainers().size());
scheduler.stop();
}
@Test(timeout=2000)
@ -254,6 +263,9 @@ public Map<NodeId, FiCaSchedulerNode> getNodes(){
return nodes;
}
};
scheduler.setRMContext(rmContext);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(new Configuration(), rmContext);
RMNode node0 = MockNodes.newNodeInfo(1,
Resources.createResource(2048, 4), 1, "127.0.0.1");

View File

@ -203,10 +203,11 @@ public static CapacityScheduler mockCapacityScheduler() throws IOException {
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
cs.setRMContext(new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null));
cs.init(conf);
return cs;
}
@ -269,19 +270,21 @@ public static ResourceManager mockFifoRm(int apps, int racks, int nodes,
ResourceManager rm = mock(ResourceManager.class);
RMContext rmContext = mockRMContext(apps, racks, nodes,
mbsPerNode);
ResourceScheduler rs = mockFifoScheduler();
ResourceScheduler rs = mockFifoScheduler(rmContext);
when(rm.getResourceScheduler()).thenReturn(rs);
when(rm.getRMContext()).thenReturn(rmContext);
return rm;
}
public static FifoScheduler mockFifoScheduler() throws Exception {
public static FifoScheduler mockFifoScheduler(RMContext rmContext)
throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupFifoQueueConfiguration(conf);
FifoScheduler fs = new FifoScheduler();
fs.setConf(new YarnConfiguration());
fs.reinitialize(conf, null);
fs.setRMContext(rmContext);
fs.init(conf);
return fs;
}