YARN-10951. CapacityScheduler: Move all fields and initializer code that belongs to async scheduling to a new class (#3800). Contributed by Szilard Nemeth

This commit is contained in:
Szilard Nemeth 2021-12-17 00:18:14 +01:00 committed by GitHub
parent aec9cdb467
commit 8d214cb785
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 182 additions and 117 deletions

View File

@ -187,9 +187,6 @@ public class CapacityScheduler extends
private WorkflowPriorityMappingsManager workflowPriorityMappingsMgr;
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
private PreemptionManager preemptionManager = new PreemptionManager();
private volatile boolean isLazyPreemptionEnabled = false;
@ -227,10 +224,7 @@ public Configuration getConf() {
private ResourceCalculator calculator;
private boolean usePortForNodeName;
private boolean scheduleAsynchronously;
@VisibleForTesting
protected List<AsyncScheduleThread> asyncSchedulerThreads;
private ResourceCommitterService resourceCommitterService;
private AsyncSchedulingConfiguration asyncSchedulingConf;
private RMNodeLabelsManager labelManager;
private AppPriorityACLsManager appPriorityACLManager;
private boolean multiNodePlacementEnabled;
@ -238,16 +232,6 @@ public Configuration getConf() {
private boolean printedVerboseLoggingForAsyncScheduling;
private boolean appShouldFailFast;
/**
* EXPERT
*/
private long asyncScheduleInterval;
private static final String ASYNC_SCHEDULER_INTERVAL =
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
private long asyncMaxPendingBacklogs;
private CSMaxRunningAppsEnforcer maxRunningEnforcer;
public CapacityScheduler() {
@ -376,27 +360,7 @@ private ResourceCalculator initResourceCalculator() {
}
private void initAsyncSchedulingProperties() {
scheduleAsynchronously = this.conf.getScheduleAynschronously();
asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
DEFAULT_ASYNC_SCHEDULER_INTERVAL);
// number of threads for async scheduling
int maxAsyncSchedulingThreads = this.conf.getInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, 1);
maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);
if (scheduleAsynchronously) {
asyncSchedulerThreads = new ArrayList<>();
for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
asyncSchedulerThreads.add(new AsyncScheduleThread(this));
}
resourceCommitterService = new ResourceCommitterService(this);
asyncMaxPendingBacklogs = this.conf.getInt(
CapacitySchedulerConfiguration.
SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
CapacitySchedulerConfiguration.
DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
}
this.asyncSchedulingConf = new AsyncSchedulingConfiguration(conf, this);
}
private void initMultiNodePlacement() {
@ -419,8 +383,8 @@ private void printSchedulerInitialized() {
getResourceCalculator().getClass(),
getMinimumResourceCapability(),
getMaximumResourceCapability(),
scheduleAsynchronously,
asyncScheduleInterval,
asyncSchedulingConf.isScheduleAsynchronously(),
asyncSchedulingConf.getAsyncScheduleInterval(),
multiNodePlacementEnabled,
assignMultipleEnabled,
maxAssignPerHeartbeat,
@ -431,15 +395,7 @@ private void startSchedulerThreads() {
writeLock.lock();
try {
activitiesManager.start();
if (scheduleAsynchronously) {
Preconditions.checkNotNull(asyncSchedulerThreads,
"asyncSchedulerThreads is null");
for (Thread t : asyncSchedulerThreads) {
t.start();
}
resourceCommitterService.start();
}
asyncSchedulingConf.startThreads();
} finally {
writeLock.unlock();
}
@ -465,14 +421,7 @@ public void serviceStop() throws Exception {
writeLock.lock();
try {
this.activitiesManager.stop();
if (scheduleAsynchronously && asyncSchedulerThreads != null) {
for (Thread t : asyncSchedulerThreads) {
t.interrupt();
t.join(THREAD_JOIN_TIMEOUT_MS);
}
resourceCommitterService.interrupt();
resourceCommitterService.join(THREAD_JOIN_TIMEOUT_MS);
}
asyncSchedulingConf.serviceStopInvoked();
} finally {
writeLock.unlock();
}
@ -539,7 +488,7 @@ public void reinitialize(Configuration newConf, RMContext rmContext)
}
long getAsyncScheduleInterval() {
return asyncScheduleInterval;
return asyncSchedulingConf.getAsyncScheduleInterval();
}
private final static Random random = new Random(System.currentTimeMillis());
@ -671,6 +620,11 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{
Thread.sleep(cs.getAsyncScheduleInterval());
}
@VisibleForTesting
public void setAsyncSchedulingConf(AsyncSchedulingConfiguration conf) {
this.asyncSchedulingConf = conf;
}
static class AsyncScheduleThread extends Thread {
private final CapacityScheduler cs;
@ -692,7 +646,7 @@ public void run() {
} else {
// Don't run schedule if we have some pending backlogs already
if (cs.getAsyncSchedulingPendingBacklogs()
> cs.asyncMaxPendingBacklogs) {
> cs.asyncSchedulingConf.getAsyncMaxPendingBacklogs()) {
Thread.sleep(1);
} else{
schedule(cs);
@ -1479,7 +1433,7 @@ protected void nodeUpdate(RMNode rmNode) {
}
// Try to do scheduling
if (!scheduleAsynchronously) {
if (!asyncSchedulingConf.isScheduleAsynchronously()) {
writeLock.lock();
try {
// reset allocation and reservation stats before we start doing any
@ -2291,8 +2245,8 @@ private void addNode(RMNode nodeManager) {
"Added node " + nodeManager.getNodeAddress() + " clusterResource: "
+ clusterResource);
if (scheduleAsynchronously && getNumClusterNodes() == 1) {
for (AsyncScheduleThread t : asyncSchedulerThreads) {
if (asyncSchedulingConf.isScheduleAsynchronously() && getNumClusterNodes() == 1) {
for (AsyncScheduleThread t : asyncSchedulingConf.asyncSchedulerThreads) {
t.beginSchedule();
}
}
@ -2340,11 +2294,7 @@ private void removeNode(RMNode nodeInfo) {
new ResourceLimits(clusterResource));
int numNodes = nodeTracker.nodeCount();
if (scheduleAsynchronously && numNodes == 0) {
for (AsyncScheduleThread t : asyncSchedulerThreads) {
t.suspendSchedule();
}
}
asyncSchedulingConf.nodeRemoved(numNodes);
LOG.info(
"Removed node " + nodeInfo.getNodeAddress() + " clusterResource: "
@ -3092,9 +3042,9 @@ public void submitResourceCommitRequest(Resource cluster,
return;
}
if (scheduleAsynchronously) {
if (asyncSchedulingConf.isScheduleAsynchronously()) {
// Submit to a commit thread and commit it async-ly
resourceCommitterService.addNewCommitRequest(request);
asyncSchedulingConf.resourceCommitterService.addNewCommitRequest(request);
} else{
// Otherwise do it sync-ly.
tryCommit(cluster, request, true);
@ -3339,10 +3289,7 @@ public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
}
public int getAsyncSchedulingPendingBacklogs() {
if (scheduleAsynchronously) {
return resourceCommitterService.getPendingBacklogs();
}
return 0;
return asyncSchedulingConf.getPendingBacklogs();
}
@Override
@ -3483,7 +3430,7 @@ public boolean isMultiNodePlacementEnabled() {
}
public int getNumAsyncSchedulerThreads() {
return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size();
return asyncSchedulingConf.getNumAsyncSchedulerThreads();
}
@VisibleForTesting
@ -3503,4 +3450,109 @@ public boolean placementConstraintEnabled() {
public void setQueueManager(CapacitySchedulerQueueManager qm) {
this.queueManager = qm;
}
@VisibleForTesting
public List<AsyncScheduleThread> getAsyncSchedulerThreads() {
return asyncSchedulingConf.getAsyncSchedulerThreads();
}
static class AsyncSchedulingConfiguration {
// timeout to join when we stop this service
private static final long THREAD_JOIN_TIMEOUT_MS = 1000;
@VisibleForTesting
protected List<AsyncScheduleThread> asyncSchedulerThreads;
private ResourceCommitterService resourceCommitterService;
private long asyncScheduleInterval;
private static final String ASYNC_SCHEDULER_INTERVAL =
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
private long asyncMaxPendingBacklogs;
private final boolean scheduleAsynchronously;
AsyncSchedulingConfiguration(CapacitySchedulerConfiguration conf,
CapacityScheduler cs) {
this.scheduleAsynchronously = conf.getScheduleAynschronously();
if (this.scheduleAsynchronously) {
this.asyncScheduleInterval = conf.getLong(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_INTERVAL,
CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL);
// number of threads for async scheduling
int maxAsyncSchedulingThreads = conf.getInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
1);
maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);
this.asyncMaxPendingBacklogs = conf.getInt(
CapacitySchedulerConfiguration.
SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
CapacitySchedulerConfiguration.
DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
this.asyncSchedulerThreads = new ArrayList<>();
for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
asyncSchedulerThreads.add(new AsyncScheduleThread(cs));
}
this.resourceCommitterService = new ResourceCommitterService(cs);
}
}
public boolean isScheduleAsynchronously() {
return scheduleAsynchronously;
}
public long getAsyncScheduleInterval() {
return asyncScheduleInterval;
}
public long getAsyncMaxPendingBacklogs() {
return asyncMaxPendingBacklogs;
}
public void startThreads() {
if (scheduleAsynchronously) {
Preconditions.checkNotNull(asyncSchedulerThreads,
"asyncSchedulerThreads is null");
for (Thread t : asyncSchedulerThreads) {
t.start();
}
resourceCommitterService.start();
}
}
public void serviceStopInvoked() throws InterruptedException {
if (scheduleAsynchronously && asyncSchedulerThreads != null) {
for (Thread t : asyncSchedulerThreads) {
t.interrupt();
t.join(THREAD_JOIN_TIMEOUT_MS);
}
resourceCommitterService.interrupt();
resourceCommitterService.join(THREAD_JOIN_TIMEOUT_MS);
}
}
public void nodeRemoved(int numNodes) {
if (scheduleAsynchronously && numNodes == 0) {
for (AsyncScheduleThread t : asyncSchedulerThreads) {
t.suspendSchedule();
}
}
}
public int getPendingBacklogs() {
if (scheduleAsynchronously) {
return resourceCommitterService.getPendingBacklogs();
}
return 0;
}
public int getNumAsyncSchedulerThreads() {
return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size();
}
@VisibleForTesting
public List<AsyncScheduleThread> getAsyncSchedulerThreads() {
return asyncSchedulerThreads;
}
}
}

View File

@ -273,6 +273,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS =
SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".maximum-pending-backlogs";
@Private
public static final String SCHEDULE_ASYNCHRONOUSLY_INTERVAL =
SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".scheduling-interval-ms";
@Private
public static final long DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL = 5;
@Private
public static final String APP_FAIL_FAST = PREFIX + "application.fail-fast";

View File

@ -153,7 +153,7 @@ public RMNodeLabelsManager createNodeLabelManager() {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
for (CapacityScheduler.AsyncScheduleThread thread :
cs.asyncSchedulerThreads) {
cs.getAsyncSchedulerThreads()) {
Assert.assertTrue(thread.getName()
.startsWith("AsyncCapacitySchedulerThread"));
}

View File

@ -237,7 +237,7 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(
if (numThreads > 0) {
// disable async scheduling threads
for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) {
for (CapacityScheduler.AsyncScheduleThread t : cs.getAsyncSchedulerThreads()) {
t.suspendSchedule();
}
}
@ -268,7 +268,7 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(
if (numThreads > 0) {
// enable async scheduling threads
for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) {
for (CapacityScheduler.AsyncScheduleThread t : cs.getAsyncSchedulerThreads()) {
t.beginSchedule();
}

View File

@ -936,7 +936,7 @@ private void applyCSAssignment(Resource clusterResource, CSAssignment assign,
LeafQueue q, final Map<NodeId, FiCaSchedulerNode> nodes,
final Map<ApplicationAttemptId, FiCaSchedulerApp> apps)
throws IOException {
TestUtils.applyResourceCommitRequest(clusterResource, assign, nodes, apps);
TestUtils.applyResourceCommitRequest(clusterResource, assign, nodes, apps, csConf);
}
@Test

View File

@ -291,7 +291,7 @@ public void testReservation() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(2 * GB, a.getUsedResources().getMemorySize());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -305,7 +305,7 @@ public void testReservation() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(5 * GB, a.getUsedResources().getMemorySize());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -319,7 +319,7 @@ public void testReservation() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -337,7 +337,7 @@ public void testReservation() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(13 * GB, a.getUsedResources().getMemorySize());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@ -356,7 +356,7 @@ public void testReservation() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(18 * GB, a.getUsedResources().getMemorySize());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@ -376,7 +376,7 @@ public void testReservation() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(18 * GB, a.getUsedResources().getMemorySize());
assertEquals(18 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -477,7 +477,7 @@ public void testReservationLimitOtherUsers() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(2 * GB, a.getUsedResources().getMemorySize());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, app_1.getCurrentConsumption().getMemorySize());
@ -491,7 +491,7 @@ public void testReservationLimitOtherUsers() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(4 * GB, a.getUsedResources().getMemorySize());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(2 * GB, app_1.getCurrentConsumption().getMemorySize());
@ -514,7 +514,7 @@ public void testReservationLimitOtherUsers() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(12 * GB, a.getUsedResources().getMemorySize());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(2 * GB, app_1.getCurrentConsumption().getMemorySize());
@ -530,7 +530,7 @@ public void testReservationLimitOtherUsers() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(14 * GB, a.getUsedResources().getMemorySize());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(4 * GB, app_1.getCurrentConsumption().getMemorySize());
@ -628,7 +628,7 @@ public void testReservationNoContinueLook() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(2 * GB, a.getUsedResources().getMemorySize());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -642,7 +642,7 @@ public void testReservationNoContinueLook() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(5 * GB, a.getUsedResources().getMemorySize());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -656,7 +656,7 @@ public void testReservationNoContinueLook() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -674,7 +674,7 @@ public void testReservationNoContinueLook() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(13 * GB, a.getUsedResources().getMemorySize());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@ -693,7 +693,7 @@ public void testReservationNoContinueLook() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(18 * GB, a.getUsedResources().getMemorySize());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@ -713,7 +713,7 @@ public void testReservationNoContinueLook() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(18 * GB, a.getUsedResources().getMemorySize());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@ -811,7 +811,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(2 * GB, a.getUsedResources().getMemorySize());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -824,7 +824,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(5 * GB, a.getUsedResources().getMemorySize());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -837,7 +837,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -854,7 +854,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(13 * GB, a.getUsedResources().getMemorySize());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@ -872,7 +872,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(13 * GB, a.getUsedResources().getMemorySize());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -1102,7 +1102,7 @@ public void testAssignToQueue() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(2 * GB, a.getUsedResources().getMemorySize());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -1115,7 +1115,7 @@ public void testAssignToQueue() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(5 * GB, a.getUsedResources().getMemorySize());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -1128,7 +1128,7 @@ public void testAssignToQueue() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -1144,7 +1144,7 @@ public void testAssignToQueue() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(13 * GB, a.getUsedResources().getMemorySize());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@ -1293,7 +1293,7 @@ public void testAssignToUser() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(2 * GB, a.getUsedResources().getMemorySize());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -1306,7 +1306,7 @@ public void testAssignToUser() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(5 * GB, a.getUsedResources().getMemorySize());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -1319,7 +1319,7 @@ public void testAssignToUser() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -1335,7 +1335,7 @@ public void testAssignToUser() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(13 * GB, a.getUsedResources().getMemorySize());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(5 * GB, app_0.getCurrentReservation().getMemorySize());
@ -1462,7 +1462,7 @@ public void testReservationsNoneAvailable() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(2 * GB, a.getUsedResources().getMemorySize());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -1476,7 +1476,7 @@ public void testReservationsNoneAvailable() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(5 * GB, a.getUsedResources().getMemorySize());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -1490,7 +1490,7 @@ public void testReservationsNoneAvailable() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -1508,7 +1508,7 @@ public void testReservationsNoneAvailable() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(Resources.createResource(10 * GB)),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -1526,7 +1526,7 @@ public void testReservationsNoneAvailable() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(Resources.createResource(10 * GB)),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -1542,7 +1542,7 @@ public void testReservationsNoneAvailable() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(13 * GB, a.getUsedResources().getMemorySize());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@ -1557,7 +1557,7 @@ public void testReservationsNoneAvailable() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(21 * GB, a.getUsedResources().getMemorySize());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(8 * GB, a.getMetrics().getReservedMB());
@ -1574,7 +1574,7 @@ public void testReservationsNoneAvailable() throws Exception {
TestUtils.applyResourceCommitRequest(clusterResource,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf);
assertEquals(21 * GB, a.getUsedResources().getMemorySize());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(8 * GB, a.getMetrics().getReservedMB());

View File

@ -447,7 +447,14 @@ public static SchedulerRequestKey toSchedulerKey(Priority pri,
public static void applyResourceCommitRequest(Resource clusterResource,
CSAssignment csAssignment,
final Map<NodeId, FiCaSchedulerNode> nodes,
final Map<ApplicationAttemptId, FiCaSchedulerApp> apps)
final Map<ApplicationAttemptId, FiCaSchedulerApp> apps) throws IOException {
applyResourceCommitRequest(clusterResource, csAssignment, nodes, apps, null);
}
public static void applyResourceCommitRequest(Resource clusterResource,
CSAssignment csAssignment,
final Map<NodeId, FiCaSchedulerNode> nodes,
final Map<ApplicationAttemptId, FiCaSchedulerApp> apps, CapacitySchedulerConfiguration csConf)
throws IOException {
CapacityScheduler cs = new CapacityScheduler() {
@Override
@ -461,7 +468,7 @@ public FiCaSchedulerApp getApplicationAttempt(
return apps.get(applicationAttemptId);
}
};
cs.setAsyncSchedulingConf(new CapacityScheduler.AsyncSchedulingConfiguration(csConf, cs));
cs.setResourceCalculator(new DefaultResourceCalculator());
cs.submitResourceCommitRequest(clusterResource,