ps) {
+ N node = null;
+ if (1 == ps.getAllNodes().size()) {
+ node = ps.getAllNodes().values().iterator().next();
+ }
+
+ return node;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java
new file mode 100644
index 0000000000..da356f5f54
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+
+/**
+ * Result of ResourceRequest update
+ */
+public class ResourceRequestUpdateResult {
+ private final ResourceRequest lastAnyResourceRequest;
+ private final ResourceRequest newResourceRequest;
+
+ public ResourceRequestUpdateResult(ResourceRequest lastAnyResourceRequest,
+ ResourceRequest newResourceRequest) {
+ this.lastAnyResourceRequest = lastAnyResourceRequest;
+ this.newResourceRequest = newResourceRequest;
+ }
+
+ public ResourceRequest getLastAnyResourceRequest() {
+ return lastAnyResourceRequest;
+ }
+
+ public ResourceRequest getNewResourceRequest() {
+ return newResourceRequest;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
new file mode 100644
index 0000000000..f87f764778
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ * In addition to {@link PlacementSet}, this also maintains
+ * pending ResourceRequests:
+ * - When new ResourceRequest(s) added to scheduler, or,
+ * - Or new container allocated, scheduler can notify corresponding
+ * PlacementSet.
+ *
+ *
+ *
+ * Different set of resource requests (E.g., resource requests with the
+ * same schedulerKey) can have one instance of PlacementSet, each PlacementSet
+ * can have different ways to order nodes depends on requests.
+ *
+ */
+public interface SchedulingPlacementSet
+ extends PlacementSet {
+ /**
+ * Get iterator of preferred node depends on requirement and/or availability
+ * @param clusterPlacementSet input cluster PlacementSet
+ * @return iterator of preferred node
+ */
+ Iterator getPreferredNodeIterator(PlacementSet clusterPlacementSet);
+
+ /**
+ * Replace existing ResourceRequest by the new requests
+ *
+ * @param requests new ResourceRequests
+ * @param recoverPreemptedRequestForAContainer if we're recovering resource
+ * requests for preempted container
+ * @return true if total pending resource changed
+ */
+ ResourceRequestUpdateResult updateResourceRequests(
+ List requests,
+ boolean recoverPreemptedRequestForAContainer);
+
+ /**
+ * Get pending ResourceRequests by given schedulerRequestKey
+ * @return Map of resourceName to ResourceRequest
+ */
+ Map getResourceRequests();
+
+ /**
+ * Get ResourceRequest by given schedulerKey and resourceName
+ * @param resourceName resourceName
+ * @param schedulerRequestKey schedulerRequestKey
+ * @return ResourceRequest
+ */
+ ResourceRequest getResourceRequest(String resourceName,
+ SchedulerRequestKey schedulerRequestKey);
+
+ /**
+ * Notify container allocated.
+ * @param type Type of the allocation
+ * @param node Which node this container allocated on
+ * @param request resource request
+ * @return list of ResourceRequests deducted
+ */
+ List allocate(NodeType type, SchedulerNode node,
+ ResourceRequest request);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java
new file mode 100644
index 0000000000..48efaa14b2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A simple PlacementSet which keeps an unordered map
+ */
+public class SimplePlacementSet
+ implements PlacementSet {
+
+ private Map map;
+ private String partition;
+
+ public SimplePlacementSet(N node) {
+ if (null != node) {
+ // Only one node in the initial PlacementSet
+ this.map = ImmutableMap.of(node.getNodeID(), node);
+ this.partition = node.getPartition();
+ } else {
+ this.map = Collections.emptyMap();
+ this.partition = NodeLabel.DEFAULT_NODE_LABEL_PARTITION;
+ }
+ }
+
+ public SimplePlacementSet(Map map, String partition) {
+ this.map = map;
+ this.partition = partition;
+ }
+
+ @Override
+ public Map getAllNodes() {
+ return map;
+ }
+
+ @Override
+ public long getVersion() {
+ return 0L;
+ }
+
+ @Override
+ public String getPartition() {
+ return partition;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
index 7bec03a17c..b7cb1bff69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
+import java.util.concurrent.ConcurrentSkipListSet;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -35,7 +37,7 @@ public abstract class AbstractComparatorOrderingPolicy schedulableEntities;
+ protected ConcurrentSkipListSet schedulableEntities;
protected Comparator comparator;
protected Map entitiesToReorder = new HashMap();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
index 3cfcd7a6f1..3371df803f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
+import java.util.concurrent.ConcurrentSkipListSet;
import com.google.common.annotations.VisibleForTesting;
@@ -61,7 +62,7 @@ public FairOrderingPolicy() {
comparators
);
this.comparator = fairComparator;
- this.schedulableEntities = new TreeSet(comparator);
+ this.schedulableEntities = new ConcurrentSkipListSet(comparator);
}
private double getMagnitude(SchedulableEntity r) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
index 10f8eebbf6..2d066bb432 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
+import java.util.concurrent.ConcurrentSkipListSet;
+
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
@@ -32,7 +34,7 @@ public FifoOrderingPolicy() {
comparators.add(new PriorityComparator());
comparators.add(new FifoComparator());
this.comparator = new CompoundComparator(comparators);
- this.schedulableEntities = new TreeSet(comparator);
+ this.schedulableEntities = new ConcurrentSkipListSet(comparator);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java
index 0891289394..6ced9e2dcb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java
@@ -21,6 +21,7 @@
import java.util.*;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import java.util.concurrent.ConcurrentSkipListSet;
/**
* This ordering policy is used for pending applications only.
@@ -46,7 +47,7 @@ public FifoOrderingPolicyForPendingApps() {
comparators.add(new PriorityComparator());
comparators.add(new FifoComparator());
this.comparator = new CompoundComparator(comparators);
- this.schedulableEntities = new TreeSet(comparator);
+ this.schedulableEntities = new ConcurrentSkipListSet(comparator);
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 58bb7215c7..38616242c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -23,6 +23,7 @@
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collection;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -167,6 +168,28 @@ public void drainEvents() {
}
}
+ private void waitForState(ApplicationId appId, EnumSet finalStates)
+ throws InterruptedException {
+ RMApp app = getRMContext().getRMApps().get(appId);
+ Assert.assertNotNull("app shouldn't be null", app);
+ final int timeoutMsecs = 80 * SECOND;
+ int timeWaiting = 0;
+ while (!finalStates.contains(app.getState())) {
+ if (timeWaiting >= timeoutMsecs) {
+ break;
+ }
+
+ LOG.info("App : " + appId + " State is : " + app.getState() +
+ " Waiting for state : " + finalStates);
+ Thread.sleep(WAIT_MS_PER_LOOP);
+ timeWaiting += WAIT_MS_PER_LOOP;
+ }
+
+ LOG.info("App State is : " + app.getState());
+ Assert.assertTrue("App State is not correct (timeout).",
+ finalStates.contains(app.getState()));
+ }
+
/**
* Wait until an application has reached a specified state.
* The timeout is 80 seconds.
@@ -254,7 +277,7 @@ public static void waitForState(RMAppAttempt attempt,
RMAppAttemptState finalState, int timeoutMsecs)
throws InterruptedException {
int timeWaiting = 0;
- while (!finalState.equals(attempt.getAppAttemptState())) {
+ while (finalState != attempt.getAppAttemptState()) {
if (timeWaiting >= timeoutMsecs) {
break;
}
@@ -267,7 +290,7 @@ public static void waitForState(RMAppAttempt attempt,
LOG.info("Attempt State is : " + attempt.getAppAttemptState());
Assert.assertEquals("Attempt state is not correct (timeout).", finalState,
- attempt.getState());
+ attempt.getState());
}
public void waitForContainerToComplete(RMAppAttempt attempt,
@@ -966,6 +989,26 @@ private static void waitForSchedulerAppAttemptAdded(
rm.getResourceScheduler()).getApplicationAttempt(attemptId));
}
+ public static MockAM launchAMWhenAsyncSchedulingEnabled(RMApp app, MockRM rm)
+ throws Exception {
+ int i = 0;
+ while (app.getCurrentAppAttempt() == null) {
+ if (i < 100) {
+ i++;
+ }
+ Thread.sleep(50);
+ }
+
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+
+ rm.waitForState(attempt.getAppAttemptId(),
+ RMAppAttemptState.ALLOCATED);
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
+
+ return am;
+ }
+
/**
* NOTE: nm.nodeHeartbeat is explicitly invoked,
* don't invoke it before calling launchAM
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 884e2368f4..e48d9d2c9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -1091,7 +1091,7 @@ public ApplicationReport createAndGetApplicationReport(
rmContext.getScheduler().getSchedulerAppInfo(attemptId)
.getLiveContainers()).thenReturn(rmContainers);
ContainerStatus cs = mock(ContainerStatus.class);
- when(containerimpl.getFinishedStatus()).thenReturn(cs);
+ when(containerimpl.completed()).thenReturn(false);
when(containerimpl.getDiagnosticsInfo()).thenReturn("N/A");
when(containerimpl.getContainerExitStatus()).thenReturn(0);
when(containerimpl.getContainerState()).thenReturn(ContainerState.COMPLETE);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java
index 56d38fb945..83a354de5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java
@@ -238,8 +238,10 @@ public void testCapacitySchedulerAllocation() throws Exception {
SchedulerHealth sh =
((CapacityScheduler) resourceManager.getResourceScheduler())
.getSchedulerHealth();
- Assert.assertEquals(2, sh.getAllocationCount().longValue());
- Assert.assertEquals(Resource.newInstance(3 * 1024, 2),
+ // Now SchedulerHealth records last container allocated, aggregated
+ // allocation account will not be changed
+ Assert.assertEquals(1, sh.getAllocationCount().longValue());
+ Assert.assertEquals(Resource.newInstance(1 * 1024, 1),
sh.getResourcesAllocated());
Assert.assertEquals(2, sh.getAggregateAllocationCount().longValue());
Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 865449f41c..0aeedce98a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -134,6 +134,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -3453,7 +3454,7 @@ public void testSchedulingOnRemovedNode() throws Exception {
scheduler.handle(new NodeRemovedSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
// schedulerNode is removed, try allocate a container
- scheduler.allocateContainersToNode(node);
+ scheduler.allocateContainersToNode(new SimplePlacementSet<>(node), true);
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
new AppAttemptRemovedSchedulerEvent(
@@ -3699,4 +3700,57 @@ private ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs,
cs.handle(addAttemptEvent1);
return appAttemptId1;
}
+
+ @Test
+ public void testAppAttemptLocalityStatistics() throws Exception {
+ Configuration conf =
+ TestUtils.getConfigurationWithMultipleQueues(new Configuration(false));
+ conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+
+ final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+
+ MockRM rm = new MockRM(conf) {
+ protected RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm.start();
+ MockNM nm1 =
+ new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
+ nm1.registerNode();
+
+ // Launch app1 in queue=a1
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a");
+
+ // Got one offswitch request and offswitch allocation
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+ // am1 asks for 1 GB resource on h1/default-rack/offswitch
+ am1.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), "*",
+ Resources.createResource(1 * GB), 2), ResourceRequest
+ .newInstance(Priority.newInstance(1), "/default-rack",
+ Resources.createResource(1 * GB), 2), ResourceRequest
+ .newInstance(Priority.newInstance(1), "h1",
+ Resources.createResource(1 * GB), 1)), null);
+
+ CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
+
+ // Got one nodelocal request and nodelocal allocation
+ cs.nodeUpdate(rm.getRMContext().getRMNodes().get(nm1.getNodeId()));
+
+ // Got one nodelocal request and racklocal allocation
+ cs.nodeUpdate(rm.getRMContext().getRMNodes().get(nm1.getNodeId()));
+
+ RMAppAttemptMetrics attemptMetrics = rm.getRMContext().getRMApps().get(
+ app1.getApplicationId()).getCurrentAppAttempt()
+ .getRMAppAttemptMetrics();
+
+ // We should get one node-local allocation, one rack-local allocation
+ // And one off-switch allocation
+ Assert.assertArrayEquals(new int[][] { { 1, 0, 0 }, { 0, 1, 0 }, { 0, 0, 1 } },
+ attemptMetrics.getLocalityStatistics());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
new file mode 100644
index 0000000000..9854a15e7a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestCapacitySchedulerAsyncScheduling {
+ private final int GB = 1024;
+
+ private YarnConfiguration conf;
+
+ RMNodeLabelsManager mgr;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ }
+
+ @Test(timeout = 300000)
+ public void testSingleThreadAsyncContainerAllocation() throws Exception {
+ testAsyncContainerAllocation(1);
+ }
+
+ @Test(timeout = 300000)
+ public void testTwoThreadsAsyncContainerAllocation() throws Exception {
+ testAsyncContainerAllocation(2);
+ }
+
+ @Test(timeout = 300000)
+ public void testThreeThreadsAsyncContainerAllocation() throws Exception {
+ testAsyncContainerAllocation(3);
+ }
+
+ public void testAsyncContainerAllocation(int numThreads) throws Exception {
+ conf.setInt(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
+ numThreads);
+ conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ + ".scheduling-interval-ms", 100);
+
+ final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+
+ // inject node label manager
+ MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm.getRMContext().setNodeLabelManager(mgr);
+ rm.start();
+
+ List nms = new ArrayList<>();
+ // Add 10 nodes to the cluster, in the cluster we have 200 GB resource
+ for (int i = 0; i < 10; i++) {
+ nms.add(rm.registerNode("h-" + i + ":1234", 20 * GB));
+ }
+
+ List ams = new ArrayList<>();
+ // Add 3 applications to the cluster, one app in one queue
+ // the i-th app ask (20 * i) containers. So in total we will have
+ // 123G container allocated
+ int totalAsked = 3 * GB; // 3 AMs
+
+ for (int i = 0; i < 3; i++) {
+ RMApp rmApp = rm.submitApp(1024, "app", "user", null, false,
+ Character.toString((char) (i % 34 + 97)), 1, null, null, false);
+ MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
+ am.registerAppAttempt();
+ ams.add(am);
+ }
+
+ for (int i = 0; i < 3; i++) {
+ ams.get(i).allocate("*", 1024, 20 * (i + 1), new ArrayList<>());
+ totalAsked += 20 * (i + 1) * GB;
+ }
+
+ // Wait for at most 15000 ms
+ int waitTime = 15000; // ms
+ while (waitTime > 0) {
+ if (rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB()
+ == totalAsked) {
+ break;
+ }
+ Thread.sleep(50);
+ waitTime -= 50;
+ }
+
+ Assert.assertEquals(
+ rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB(),
+ totalAsked);
+
+ // Wait for another 2 sec to make sure we will not allocate more than
+ // required
+ waitTime = 2000; // ms
+ while (waitTime > 0) {
+ Assert.assertEquals(
+ rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB(),
+ totalAsked);
+ waitTime -= 50;
+ Thread.sleep(50);
+ }
+
+ rm.close();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 7f4fc2cda5..40e5d2a4a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -54,6 +54,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -110,15 +111,16 @@ private FiCaSchedulerApp getMockApplication(int appId, String user) {
return application;
}
- private void stubQueueAllocation(final CSQueue queue,
- final Resource clusterResource, final FiCaSchedulerNode node,
+ private void stubQueueAllocation(final CSQueue queue,
+ final Resource clusterResource, final FiCaSchedulerNode node,
final int allocation) {
- stubQueueAllocation(queue, clusterResource, node, allocation,
+ stubQueueAllocation(queue, clusterResource, node, allocation,
NodeType.NODE_LOCAL);
}
- private void stubQueueAllocation(final CSQueue queue,
- final Resource clusterResource, final FiCaSchedulerNode node,
+ @SuppressWarnings("unchecked")
+ private void stubQueueAllocation(final CSQueue queue,
+ final Resource clusterResource, final FiCaSchedulerNode node,
final int allocation, final NodeType type) {
// Simulate the queue allocation
@@ -145,7 +147,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).
when(queue)
- .assignContainers(eq(clusterResource), eq(node),
+ .assignContainers(eq(clusterResource), any(PlacementSet.class),
any(ResourceLimits.class), any(SchedulingMode.class));
// Mock the node's resource availability
@@ -157,7 +159,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
return new CSAssignment(allocatedResource, type);
}
}).
- when(queue).assignContainers(eq(clusterResource), eq(node),
+ when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class),
any(ResourceLimits.class), any(SchedulingMode.class));
doNothing().when(node).releaseContainer(any(Container.class));
}
@@ -214,6 +216,7 @@ private void setupSortedQueues(CapacitySchedulerConfiguration conf) {
}
@Test
+ @SuppressWarnings("unchecked")
public void testSortedQueues() throws Exception {
// Setup queue configs
setupSortedQueues(csConf);
@@ -418,10 +421,10 @@ public void testSortedQueues() throws Exception {
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(d,b);
allocationOrder.verify(d).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), any(ResourceLimits.class),
+ any(PlacementSet.class), any(ResourceLimits.class),
any(SchedulingMode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), any(ResourceLimits.class),
+ any(PlacementSet.class), any(ResourceLimits.class),
any(SchedulingMode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index e2b4952675..555e0fd7c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -81,7 +81,7 @@ public void setUp() throws Exception {
mgr.init(conf);
}
- @Test(timeout = 3000000)
+ @Test(timeout = 60000)
public void testExcessReservationThanNodeManagerCapacity() throws Exception {
@SuppressWarnings("resource")
MockRM rm = new MockRM(conf);
@@ -598,4 +598,47 @@ public void testReservedContainerMetricsOnDecommisionedNode() throws Exception {
rm1.close();
}
+
+ @Test(timeout = 60000)
+ public void testAssignMultipleOffswitchContainers() throws Exception {
+ MockRM rm1 = new MockRM();
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 80 * GB);
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 5, new ArrayList());
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ // Do node heartbeats once
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ FiCaSchedulerApp schedulerApp1 =
+ cs.getApplicationAttempt(am1.getApplicationAttemptId());
+
+ // App1 will get one container allocated (plus AM container
+ Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
+
+ // Set assign multiple off-switch containers to 3
+ CapacitySchedulerConfiguration newCSConf = new CapacitySchedulerConfiguration();
+ newCSConf.setInt(
+ CapacitySchedulerConfiguration.OFFSWITCH_PER_HEARTBEAT_LIMIT, 3);
+
+ cs.reinitialize(newCSConf, rm1.getRMContext());
+
+ // Do node heartbeats once
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ // App1 will get 3 new container allocated (plus 2 previously allocated
+ // container)
+ Assert.assertEquals(5, schedulerApp1.getLiveContainers().size());
+
+ rm1.close();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
index 26146301d4..0696f572ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
@@ -59,9 +59,12 @@
.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
public class TestContainerResizing {
@@ -97,13 +100,14 @@ protected void decreaseContainers(
}
@Override
- public synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
+ public CSAssignment allocateContainersToNode(
+ PlacementSet ps, boolean withNodeHeartbeat) {
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
LOG.debug("Thread interrupted.");
}
- super.allocateContainersToNode(node);
+ return super.allocateContainersToNode(ps, withNodeHeartbeat);
}
}
@@ -452,7 +456,7 @@ public RMNodeLabelsManager createNodeLabelManager() {
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
sentRMContainerLaunched(rm1, containerId1);
- // am1 asks to change its AM container from 1GB to 3GB
+ // am1 asks to change its AM container from 1GB to 7GB
am1.sendContainerResizingRequest(Arrays.asList(
UpdateContainerRequest
.newInstance(0, containerId1,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 51b567bcce..8694efb0a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -41,6 +41,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
+import com.google.common.collect.ImmutableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
@@ -49,6 +50,7 @@
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -78,6 +80,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -196,6 +199,7 @@ private void setUpInternal(ResourceCalculator rC) throws Exception {
cs.setRMContext(spyRMContext);
cs.init(csConf);
+ cs.setResourceCalculator(rC);
cs.start();
when(spyRMContext.getScheduler()).thenReturn(cs);
@@ -268,6 +272,12 @@ static LeafQueue stubLeafQueue(LeafQueue queue) {
any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class),
any(RMContainer.class), any(ContainerStatus.class),
any(RMContainerEventType.class), any(CSQueue.class), anyBoolean());
+
+ // Stub out parent queue's accept and apply.
+ doReturn(true).when(parent).accept(any(Resource.class),
+ any(ResourceCommitRequest.class));
+ doNothing().when(parent).apply(any(Resource.class),
+ any(ResourceCommitRequest.class));
return queue;
}
@@ -339,6 +349,12 @@ public void testSingleQueueOneUserMetrics() throws Exception {
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
8*GB);
+ Map apps = ImmutableMap.of(
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+ app_1);
+ Map nodes = ImmutableMap.of(node_0.getNodeID(),
+ node_0);
+
final int numNodes = 1;
Resource clusterResource =
Resources.createResource(numNodes * (8*GB), numNodes * 16);
@@ -353,8 +369,10 @@ public void testSingleQueueOneUserMetrics() throws Exception {
// Start testing...
// Only 1 container
- a.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(
(int)(node_0.getTotalResource().getMemorySize() * a.getCapacity()) - (1*GB),
a.getMetrics().getAvailableMB());
@@ -526,6 +544,12 @@ public void testSingleQueueWithOneUser() throws Exception {
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
8*GB);
+ Map apps = ImmutableMap.of(
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+ app_1);
+ Map nodes = ImmutableMap.of(node_0.getNodeID(),
+ node_0);
+
final int numNodes = 1;
Resource clusterResource =
Resources.createResource(numNodes * (8*GB), numNodes * 16);
@@ -544,8 +568,10 @@ public void testSingleQueueWithOneUser() throws Exception {
// Start testing...
// Only 1 container
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(1*GB, a.getUsedResources().getMemorySize());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -555,8 +581,10 @@ public void testSingleQueueWithOneUser() throws Exception {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(2*GB, a.getUsedResources().getMemorySize());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -564,8 +592,10 @@ public void testSingleQueueWithOneUser() throws Exception {
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Can't allocate 3rd due to user-limit
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(2*GB, a.getUsedResources().getMemorySize());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -574,8 +604,10 @@ public void testSingleQueueWithOneUser() throws Exception {
// Bump up user-limit-factor, now allocate should work
a.setUserLimitFactor(10);
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(3*GB, a.getUsedResources().getMemorySize());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -583,8 +615,10 @@ public void testSingleQueueWithOneUser() throws Exception {
assertEquals(3*GB, a.getMetrics().getAllocatedMB());
// One more should work, for app_1, due to user-limit-factor
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(4*GB, a.getUsedResources().getMemorySize());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -594,8 +628,10 @@ public void testSingleQueueWithOneUser() throws Exception {
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
- a.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(4*GB, a.getUsedResources().getMemorySize());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -687,6 +723,13 @@ public void testDRFUsageRatioRounding() throws Exception {
assign.getResource().getMemorySize() > 0);
}
+ private void applyCSAssignment(Resource clusterResource, CSAssignment assign,
+ LeafQueue q, final Map nodes,
+ final Map apps)
+ throws IOException {
+ TestUtils.applyResourceCommitRequest(clusterResource, assign, nodes, apps);
+ }
+
@Test
public void testDRFUserLimits() throws Exception {
setUpWithDominantResourceCalculator();
@@ -723,6 +766,12 @@ public void testDRFUserLimits() throws Exception {
FiCaSchedulerNode node1 =
TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8 * GB, 100);
+ Map nodes = ImmutableMap.of(node0.getNodeID(),
+ node0, node1.getNodeID(), node1);
+ Map apps = ImmutableMap.of(
+ app0.getApplicationAttemptId(), app0, app2.getApplicationAttemptId(),
+ app2);
+
int numNodes = 2;
Resource clusterResource =
Resources.createResource(numNodes * (8 * GB), numNodes * 100);
@@ -758,12 +807,14 @@ public void testDRFUserLimits() throws Exception {
b.assignContainers(clusterResource, node0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
LOG.info(assign.toString());
+ applyCSAssignment(clusterResource, assign, b, nodes, apps);
} while (assign.getResource().getMemorySize() > 0 &&
assign.getAssignmentInformation().getNumReservations() == 0);
do {
assign =
b.assignContainers(clusterResource, node1, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assign, b, nodes, apps);
} while (assign.getResource().getMemorySize() > 0 &&
assign.getAssignmentInformation().getNumReservations() == 0);
//LOG.info("user_0: " + queueUser0.getUsed());
@@ -847,6 +898,12 @@ public void testUserLimits() throws Exception {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
+ Map apps = ImmutableMap.of(
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+ app_1);
+ Map nodes = ImmutableMap.of(node_0.getNodeID(),
+ node_0, node_1.getNodeID(), node_1);
+
/**
* Start testing...
*/
@@ -859,8 +916,10 @@ public void testUserLimits() throws Exception {
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
// 1 container to user_0
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(3*GB, a.getUsedResources().getMemorySize());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -868,8 +927,10 @@ public void testUserLimits() throws Exception {
// Allocate one container to app_1. Even if app_0
// submit earlier, it cannot get this container assigned since user_0
// exceeded user-limit already.
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(4*GB, a.getUsedResources().getMemorySize());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -877,8 +938,10 @@ public void testUserLimits() throws Exception {
// Allocate one container to app_0, before allocating this container,
// user-limit = ceil((4 + 1) / 2) = 3G. app_0's used resource (3G) <=
// user-limit.
- a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_1,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(7*GB, a.getUsedResources().getMemorySize());
assertEquals(6*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -890,7 +953,7 @@ public void testUserLimits() throws Exception {
}
@Test
- public void testComputeUserLimitAndSetHeadroom(){
+ public void testComputeUserLimitAndSetHeadroom() throws IOException {
LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B));
qb.setMaxCapacity(1.0f);
// Users
@@ -903,6 +966,9 @@ public void testComputeUserLimitAndSetHeadroom(){
String host_1 = "127.0.0.2";
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
+ Map nodes = ImmutableMap.of(node_0.getNodeID(),
+ node_0, node_1.getNodeID(), node_1);
+
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
@@ -925,6 +991,8 @@ public void testComputeUserLimitAndSetHeadroom(){
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, qb,
qb.getActiveUsersManager(), spyRMContext);
+ Map apps = new HashMap<>();
+ apps.put(app_0.getApplicationAttemptId(), app_0);
qb.submitApplicationAttempt(app_0, user_0);
Priority u0Priority = TestUtils.createMockPriority(1);
SchedulerRequestKey u0SchedKey = toSchedulerKey(u0Priority);
@@ -935,8 +1003,10 @@ public void testComputeUserLimitAndSetHeadroom(){
assertEquals("There should only be 1 active user!",
1, qb.getActiveUsersManager().getNumActiveUsers());
//get headroom
- qb.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ qb.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource,
"", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
@@ -949,14 +1019,17 @@ public void testComputeUserLimitAndSetHeadroom(){
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, qb,
qb.getActiveUsersManager(), spyRMContext);
+ apps.put(app_2.getApplicationAttemptId(), app_2);
Priority u1Priority = TestUtils.createMockPriority(2);
SchedulerRequestKey u1SchedKey = toSchedulerKey(u1Priority);
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
u1Priority, recordFactory)));
qb.submitApplicationAttempt(app_2, user_1);
- qb.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ qb.assignContainers(clusterResource, node_1,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource,
"", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
@@ -984,11 +1057,13 @@ public void testComputeUserLimitAndSetHeadroom(){
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, qb,
qb.getActiveUsersManager(), spyRMContext);
+ apps.put(app_1.getApplicationAttemptId(), app_1);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_1, qb,
qb.getActiveUsersManager(), spyRMContext);
+ apps.put(app_3.getApplicationAttemptId(), app_3);
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
u0Priority, recordFactory)));
@@ -997,10 +1072,14 @@ public void testComputeUserLimitAndSetHeadroom(){
u1Priority, recordFactory)));
qb.submitApplicationAttempt(app_1, user_0);
qb.submitApplicationAttempt(app_3, user_1);
- qb.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- qb.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ qb.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
+ applyCSAssignment(clusterResource,
+ qb.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
qb.computeUserLimitAndSetHeadroom(app_3, clusterResource,
"", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(4*GB, qb.getUsedResources().getMemorySize());
@@ -1013,12 +1092,15 @@ public void testComputeUserLimitAndSetHeadroom(){
FiCaSchedulerApp app_4 =
new FiCaSchedulerApp(appAttemptId_4, user_0, qb,
qb.getActiveUsersManager(), spyRMContext);
+ apps.put(app_4.getApplicationAttemptId(), app_4);
qb.submitApplicationAttempt(app_4, user_0);
app_4.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
u0Priority, recordFactory)));
- qb.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ qb.assignContainers(clusterResource, node_1,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
qb.computeUserLimitAndSetHeadroom(app_4, clusterResource,
"", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
qb.computeUserLimitAndSetHeadroom(app_3, clusterResource,
@@ -1078,6 +1160,12 @@ public void testUserHeadroomMultiApp() throws Exception {
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK,
0, 16*GB);
+ Map apps = ImmutableMap.of(
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+ app_1, app_2.getApplicationAttemptId(), app_2);
+ Map nodes = ImmutableMap.of(node_0.getNodeID(),
+ node_0, node_1.getNodeID(), node_1);
+
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (16*GB), 1);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
@@ -1088,8 +1176,10 @@ public void testUserHeadroomMultiApp() throws Exception {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
priority, recordFactory)));
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(1*GB, a.getUsedResources().getMemorySize());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1105,8 +1195,10 @@ public void testUserHeadroomMultiApp() throws Exception {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(2*GB, a.getUsedResources().getMemorySize());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1165,6 +1257,12 @@ public void testHeadroomWithMaxCap() throws Exception {
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
String host_1 = "127.0.0.2";
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
+
+ Map apps = ImmutableMap.of(
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+ app_1, app_2.getApplicationAttemptId(), app_2);
+ Map nodes = ImmutableMap.of(node_0.getNodeID(),
+ node_0, node_1.getNodeID(), node_1);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
@@ -1194,8 +1292,10 @@ public void testHeadroomWithMaxCap() throws Exception {
1, a.getActiveUsersManager().getNumActiveUsers());
// 1 container to user_0
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(2*GB, a.getUsedResources().getMemorySize());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1206,8 +1306,10 @@ public void testHeadroomWithMaxCap() throws Exception {
// the application is not yet active
// Again one to user_0 since he hasn't exceeded user limit yet
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(3*GB, a.getUsedResources().getMemorySize());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1223,8 +1325,10 @@ public void testHeadroomWithMaxCap() throws Exception {
// No more to user_0 since he is already over user-limit
// and no more containers to queue since it's already at max-cap
- a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_1,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(3*GB, a.getUsedResources().getMemorySize());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1237,8 +1341,10 @@ public void testHeadroomWithMaxCap() throws Exception {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
priority, recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
- a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_1,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(0*GB, app_2.getHeadroom().getMemorySize()); // hit queue max-cap
}
@@ -1283,10 +1389,17 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
new FiCaSchedulerApp(appAttemptId_3, user_2, a,
a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_3, user_2);
+
+ Map apps = ImmutableMap.of(
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+ app_1, app_2.getApplicationAttemptId(), app_2,
+ app_3.getApplicationAttemptId(), app_3);
// Setup some nodes
String host_0 = "127.0.0.1";
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+ Map nodes = ImmutableMap.of(node_0.getNodeID(),
+ node_0);
final int numNodes = 1;
Resource clusterResource =
@@ -1308,24 +1421,30 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
*/
// Only 1 container
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(1*GB, a.getUsedResources().getMemorySize());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(2*GB, a.getUsedResources().getMemorySize());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
// Can't allocate 3rd due to user-limit
a.setUserLimit(25);
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(2*GB, a.getUsedResources().getMemorySize());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1343,8 +1462,10 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Now allocations should goto app_2 since
// user_0 is at limit inspite of high user-limit-factor
a.setUserLimitFactor(10);
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(5*GB, a.getUsedResources().getMemorySize());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1353,8 +1474,10 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Now allocations should goto app_0 since
// user_0 is at user-limit not above it
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(6*GB, a.getUsedResources().getMemorySize());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1364,8 +1487,10 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(6*GB, a.getUsedResources().getMemorySize());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1376,8 +1501,10 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Now, allocations should goto app_3 since it's under user-limit
a.setMaxCapacity(1.0f);
a.setUserLimitFactor(1);
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(7*GB, a.getUsedResources().getMemorySize());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1385,8 +1512,10 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
assertEquals(1*GB, app_3.getCurrentConsumption().getMemorySize());
// Now we should assign to app_3 again since user_2 is under user-limit
- a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(8*GB, a.getUsedResources().getMemorySize());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1466,6 +1595,12 @@ public void testReservation() throws Exception {
// Setup some nodes
String host_0 = "127.0.0.1";
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
+
+ Map