From 38af23796971193fa529c3d08ffde8fcd6e607b6 Mon Sep 17 00:00:00 2001
From: Arun Suresh
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Test; + +import java.util.Arrays; + +public class TestCapacitySchedulerSchedulingRequestUpdate + extends CapacitySchedulerTestBase { + @Test + public void testBasicPendingResourceUpdate() throws Exception { + Configuration conf = TestUtils.getConfigurationWithQueueLabels( + new Configuration(false)); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + MockRM rm = new MockRM(conf) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.start(); + MockNM nm1 = // label = x + new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); + nm1.registerNode(); + + MockNM nm2 = // label = "" + new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService()); + nm2.registerNode(); + + // Launch app1 in queue=a1 + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + + // Launch app2 in queue=b1 + RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + // am1 asks for 8 * 1GB container for no label + am1.allocateIntraAppAntiAffinity( + ResourceSizing.newInstance(8, Resource.newInstance(1 * GB, 1)), + Priority.newInstance(1), 0, ImmutableSet.of("mapper", "reducer"), + "mapper", "reducer"); + + checkPendingResource(rm, "a1", 8 * GB, null); + checkPendingResource(rm, "a", 8 * GB, null); + checkPendingResource(rm, "root", 8 * GB, null); + + // am2 asks for 8 * 1GB container for no label + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 8)), null); + + checkPendingResource(rm, "a1", 8 * GB, null); + checkPendingResource(rm, "a", 8 * GB, null); + checkPendingResource(rm, "b1", 8 * GB, null); + checkPendingResource(rm, "b", 8 * GB, null); + // root = a + b + checkPendingResource(rm, "root", 16 * GB, null); + + // am2 asks for 8 * 1GB container in another priority for no label + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(2), "*", + Resources.createResource(1 * GB), 8)), null); + + checkPendingResource(rm, "a1", 8 * GB, null); + checkPendingResource(rm, "a", 8 * GB, null); + checkPendingResource(rm, "b1", 16 * GB, null); + checkPendingResource(rm, "b", 16 * GB, null); + // root = a + b + checkPendingResource(rm, "root", 24 * GB, null); + + // am1 asks 4 GB resource instead of 8 * GB for priority=1 + // am1 asks for 8 * 1GB container for no label + am1.allocateIntraAppAntiAffinity( + ResourceSizing.newInstance(4, Resource.newInstance(1 * GB, 1)), + Priority.newInstance(1), 0, ImmutableSet.of("mapper", "reducer"), + "mapper", "reducer"); + + checkPendingResource(rm, "a1", 4 * GB, null); + checkPendingResource(rm, "a", 4 * GB, null); + checkPendingResource(rm, "b1", 16 * GB, null); + checkPendingResource(rm, "b", 16 * GB, null); + // root = a + b + checkPendingResource(rm, "root", 20 * GB, null); + + // am1 asks 8 * GB resource which label=x + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(2), "*", + Resources.createResource(8 * GB), 1, true, "x")), null); + + checkPendingResource(rm, "a1", 4 * GB, null); + checkPendingResource(rm, "a", 4 * GB, null); + checkPendingResource(rm, "a1", 8 * GB, "x"); + checkPendingResource(rm, "a", 8 * GB, "x"); + checkPendingResource(rm, "b1", 16 * GB, null); + checkPendingResource(rm, "b", 16 * GB, null); + // root = a + b + checkPendingResource(rm, "root", 20 * GB, null); + checkPendingResource(rm, "root", 8 * GB, "x"); + + // complete am1/am2, pending resource should be 0 now + AppAttemptRemovedSchedulerEvent appRemovedEvent = + new AppAttemptRemovedSchedulerEvent(am2.getApplicationAttemptId(), + RMAppAttemptState.FINISHED, false); + rm.getResourceScheduler().handle(appRemovedEvent); + appRemovedEvent = new AppAttemptRemovedSchedulerEvent( + am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false); + rm.getResourceScheduler().handle(appRemovedEvent); + + checkPendingResource(rm, "a1", 0 * GB, null); + checkPendingResource(rm, "a", 0 * GB, null); + checkPendingResource(rm, "a1", 0 * GB, "x"); + checkPendingResource(rm, "a", 0 * GB, "x"); + checkPendingResource(rm, "b1", 0 * GB, null); + checkPendingResource(rm, "b", 0 * GB, null); + checkPendingResource(rm, "root", 0 * GB, null); + checkPendingResource(rm, "root", 0 * GB, "x"); + } + + @Test + public void testNodePartitionPendingResourceUpdate() throws Exception { + Configuration conf = TestUtils.getConfigurationWithQueueLabels( + new Configuration(false)); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + MockRM rm = new MockRM(conf) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.start(); + MockNM nm1 = // label = x + new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); + nm1.registerNode(); + + MockNM nm2 = // label = "" + new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService()); + nm2.registerNode(); + + // Launch app1 in queue=a1 + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + + // Launch app2 in queue=b1 + RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + // am1 asks for 8 * 1GB container for "x" + am1.allocateIntraAppAntiAffinity("x", + ResourceSizing.newInstance(8, Resource.newInstance(1 * GB, 1)), + Priority.newInstance(1), 0, "mapper", "reducer"); + + checkPendingResource(rm, "a1", 8 * GB, "x"); + checkPendingResource(rm, "a", 8 * GB, "x"); + checkPendingResource(rm, "root", 8 * GB, "x"); + + // am2 asks for 8 * 1GB container for "x" + am2.allocateIntraAppAntiAffinity("x", + ResourceSizing.newInstance(8, Resource.newInstance(1 * GB, 1)), + Priority.newInstance(1), 0, "mapper", "reducer"); + + checkPendingResource(rm, "a1", 8 * GB, "x"); + checkPendingResource(rm, "a", 8 * GB, "x"); + checkPendingResource(rm, "b1", 8 * GB, "x"); + checkPendingResource(rm, "b", 8 * GB, "x"); + // root = a + b + checkPendingResource(rm, "root", 16 * GB, "x"); + + // am1 asks for 6 * 1GB container for "x" in another priority + am1.allocateIntraAppAntiAffinity("x", + ResourceSizing.newInstance(6, Resource.newInstance(1 * GB, 1)), + Priority.newInstance(2), 0, "mapper", "reducer"); + + checkPendingResource(rm, "a1", 14 * GB, "x"); + checkPendingResource(rm, "a", 14 * GB, "x"); + checkPendingResource(rm, "b1", 8 * GB, "x"); + checkPendingResource(rm, "b", 8 * GB, "x"); + // root = a + b + checkPendingResource(rm, "root", 22 * GB, "x"); + + // am1 asks for 4 * 1GB container for "x" in priority=1, which should + // override 8 * 1GB + am1.allocateIntraAppAntiAffinity("x", + ResourceSizing.newInstance(4, Resource.newInstance(1 * GB, 1)), + Priority.newInstance(1), 0, "mapper", "reducer"); + + checkPendingResource(rm, "a1", 10 * GB, "x"); + checkPendingResource(rm, "a", 10 * GB, "x"); + checkPendingResource(rm, "b1", 8 * GB, "x"); + checkPendingResource(rm, "b", 8 * GB, "x"); + // root = a + b + checkPendingResource(rm, "root", 18 * GB, "x"); + + // complete am1/am2, pending resource should be 0 now + AppAttemptRemovedSchedulerEvent appRemovedEvent = + new AppAttemptRemovedSchedulerEvent(am2.getApplicationAttemptId(), + RMAppAttemptState.FINISHED, false); + rm.getResourceScheduler().handle(appRemovedEvent); + appRemovedEvent = new AppAttemptRemovedSchedulerEvent( + am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false); + rm.getResourceScheduler().handle(appRemovedEvent); + + checkPendingResource(rm, "a1", 0 * GB, null); + checkPendingResource(rm, "a", 0 * GB, null); + checkPendingResource(rm, "a1", 0 * GB, "x"); + checkPendingResource(rm, "a", 0 * GB, "x"); + checkPendingResource(rm, "b1", 0 * GB, null); + checkPendingResource(rm, "b", 0 * GB, null); + checkPendingResource(rm, "root", 0 * GB, null); + checkPendingResource(rm, "root", 0 * GB, "x"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java index d2e28be25e..a800bef95e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java @@ -132,7 +132,7 @@ public void testContainerIsRemovedFromAllocationExpirer() Assert.assertEquals(RMContainerState.RUNNING, rm1.getResourceScheduler().getRMContainer(containerId2).getState()); // Verify container size is 3G - Assert.assertEquals( + Assert.assertEquals( 3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) .getAllocatedResource().getMemorySize()); // Verify total resource usage diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java new file mode 100644 index 0000000000..0a44a1e693 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSchedulingRequestContainerAllocation {
+ private final int GB = 1024;
+
+ private YarnConfiguration conf;
+
+ RMNodeLabelsManager mgr;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ }
+
+ @Test
+ public void testIntraAppAntiAffinity() throws Exception {
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
+ new Configuration());
+ csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
+ true);
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ // 4 NMs.
+ MockNM[] nms = new MockNM[4];
+ RMNode[] rmNodes = new RMNode[4];
+ for (int i = 0; i < 4; i++) {
+ nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
+ rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
+ }
+
+ // app1 -> c
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
+
+ // app1 asks for 10 anti-affinity containers for the same app. It should
+ // only get 4 containers allocated because we only have 4 nodes.
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(10, Resource.newInstance(1024, 1)),
+ Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 4; j++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+ }
+ }
+
+ // App1 should get 5 containers allocated (1 AM + 1 node each).
+ FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(5, schedulerApp.getLiveContainers().size());
+
+ // Similarly, app1 asks 10 anti-affinity containers at different priority,
+ // it should be satisfied as well.
+ // app1 asks for 10 anti-affinity containers for the same app. It should
+ // only get 4 containers allocated because we only have 4 nodes.
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
+ Priority.newInstance(2), 1L, ImmutableSet.of("reducer"), "reducer");
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 4; j++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+ }
+ }
+
+ // App1 should get 9 containers allocated (1 AM + 8 containers).
+ Assert.assertEquals(9, schedulerApp.getLiveContainers().size());
+
+ // Test anti-affinity to both of "mapper/reducer", we should only get no
+ // container allocated
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
+ Priority.newInstance(3), 1L, ImmutableSet.of("reducer2"), "mapper");
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 4; j++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+ }
+ }
+
+ // App1 should get 10 containers allocated (1 AM + 9 containers).
+ Assert.assertEquals(9, schedulerApp.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test
+ public void testIntraAppAntiAffinityWithMultipleTags() throws Exception {
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
+ new Configuration());
+ csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
+ true);
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ // 4 NMs.
+ MockNM[] nms = new MockNM[4];
+ RMNode[] rmNodes = new RMNode[4];
+ for (int i = 0; i < 4; i++) {
+ nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
+ rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
+ }
+
+ // app1 -> c
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
+
+ // app1 asks for 2 anti-affinity containers for the same app.
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(2, Resource.newInstance(1024, 1)),
+ Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"),
+ "tag_1_1", "tag_1_2");
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 4; j++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+ }
+ }
+
+ // App1 should get 3 containers allocated (1 AM + 2 task).
+ FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(3, schedulerApp.getLiveContainers().size());
+
+ // app1 asks for 1 anti-affinity containers for the same app. anti-affinity
+ // to tag_1_1/tag_1_2. With allocation_tag = tag_2_1/tag_2_2
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
+ Priority.newInstance(2), 1L, ImmutableSet.of("tag_2_1", "tag_2_2"),
+ "tag_1_1", "tag_1_2");
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 4; j++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+ }
+ }
+
+ // App1 should get 4 containers allocated (1 AM + 2 task (first request) +
+ // 1 task (2nd request).
+ Assert.assertEquals(4, schedulerApp.getLiveContainers().size());
+
+ // app1 asks for 10 anti-affinity containers for the same app. anti-affinity
+ // to tag_1_1/tag_1_2/tag_2_1/tag_2_2. With allocation_tag = tag_3
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
+ Priority.newInstance(3), 1L, ImmutableSet.of("tag_3"),
+ "tag_1_1", "tag_1_2", "tag_2_1", "tag_2_2");
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 4; j++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+ }
+ }
+
+ // App1 should get 1 more containers allocated
+ // 1 AM + 2 task (first request) + 1 task (2nd request) +
+ // 1 task (3rd request)
+ Assert.assertEquals(5, schedulerApp.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test
+ public void testSchedulingRequestDisabledByDefault() throws Exception {
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
+ new Configuration());
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ // 4 NMs.
+ MockNM[] nms = new MockNM[4];
+ RMNode[] rmNodes = new RMNode[4];
+ for (int i = 0; i < 4; i++) {
+ nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
+ rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
+ }
+
+ // app1 -> c
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
+
+ // app1 asks for 2 anti-affinity containers for the same app.
+ boolean caughtException = false;
+ try {
+ // Since feature is disabled by default, we should expect exception.
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(2, Resource.newInstance(1024, 1)),
+ Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"),
+ "tag_1_1", "tag_1_2");
+ } catch (Exception e) {
+ caughtException = true;
+ }
+ Assert.assertTrue(caughtException);
+ rm1.close();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
new file mode 100644
index 0000000000..c7f13cd3d3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSchedulingRequestContainerAllocationAsync {
+ private final int GB = 1024;
+
+ private YarnConfiguration conf;
+
+ RMNodeLabelsManager mgr;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ }
+
+ private void testIntraAppAntiAffinityAsync(int numThreads) throws Exception {
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
+ new Configuration());
+ csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
+ true);
+ csConf.setInt(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
+ numThreads);
+ csConf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ + ".scheduling-interval-ms", 0);
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ // 200 NMs.
+ int nNMs = 200;
+ MockNM[] nms = new MockNM[nNMs];
+ RMNode[] rmNodes = new RMNode[nNMs];
+ for (int i = 0; i < nNMs; i++) {
+ nms[i] = rm1.registerNode("127.0.0." + i + ":1234", 10 * GB);
+ rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
+ }
+
+ // app1 -> c
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
+
+ // app1 asks for 10 anti-affinity containers for the same app. It should
+ // only get 4 containers allocated because we only have 4 nodes.
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(1000, Resource.newInstance(1024, 1)),
+ Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < nNMs; j++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+ }
+ }
+
+ // App1 should get #NM + 1 containers allocated (1 node each + 1 AM).
+ FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(nNMs + 1, schedulerApp.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test(timeout = 300000)
+ public void testSingleThreadAsyncContainerAllocation() throws Exception {
+ testIntraAppAntiAffinityAsync(1);
+ }
+
+ @Test(timeout = 300000)
+ public void testTwoThreadsAsyncContainerAllocation() throws Exception {
+ testIntraAppAntiAffinityAsync(2);
+ }
+
+ @Test(timeout = 300000)
+ public void testThreeThreadsAsyncContainerAllocation() throws Exception {
+ testIntraAppAntiAffinityAsync(3);
+ }
+
+ @Test(timeout = 300000)
+ public void testFourThreadsAsyncContainerAllocation() throws Exception {
+ testIntraAppAntiAffinityAsync(4);
+ }
+
+ @Test(timeout = 300000)
+ public void testFiveThreadsAsyncContainerAllocation() throws Exception {
+ testIntraAppAntiAffinityAsync(5);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index e8734cc512..542ba3ea36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -275,6 +275,8 @@ public static
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.function.LongBinaryOperator;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test behaviors of single constraint app placement allocator.
+ */
+public class TestSingleConstraintAppPlacementAllocator {
+ private AppSchedulingInfo appSchedulingInfo;
+ private AllocationTagsManager spyAllocationTagsManager;
+ private RMContext rmContext;
+ private SchedulerRequestKey schedulerRequestKey;
+ private SingleConstraintAppPlacementAllocator allocator;
+
+ @Before
+ public void setup() throws Exception {
+ // stub app scheduling info.
+ appSchedulingInfo = mock(AppSchedulingInfo.class);
+ when(appSchedulingInfo.getApplicationId()).thenReturn(
+ TestUtils.getMockApplicationId(1));
+ when(appSchedulingInfo.getApplicationAttemptId()).thenReturn(
+ TestUtils.getMockApplicationAttemptId(1, 1));
+
+ // stub RMContext
+ rmContext = TestUtils.getMockRMContext();
+
+ // Create allocation tags manager
+ AllocationTagsManager allocationTagsManager = new AllocationTagsManager(
+ rmContext);
+ spyAllocationTagsManager = spy(allocationTagsManager);
+ schedulerRequestKey = new SchedulerRequestKey(Priority.newInstance(1), 2L,
+ TestUtils.getMockContainerId(1, 1));
+ rmContext.setAllocationTagsManager(spyAllocationTagsManager);
+
+ // Create allocator
+ allocator = new SingleConstraintAppPlacementAllocator();
+ allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+ }
+
+ private void assertValidSchedulingRequest(
+ SchedulingRequest schedulingRequest) {
+ // Create allocator to avoid fields polluted by previous runs
+ allocator = new SingleConstraintAppPlacementAllocator();
+ allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+ }
+
+ private void assertInvalidSchedulingRequest(
+ SchedulingRequest schedulingRequest, boolean recreateAllocator) {
+ try {
+ // Create allocator
+ if (recreateAllocator) {
+ allocator = new SingleConstraintAppPlacementAllocator();
+ allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+ }
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+ } catch (SchedulerInvalidResoureRequestException e) {
+ // Expected
+ return;
+ }
+ Assert.fail(
+ "Expect failure for schedulingRequest=" + schedulingRequest.toString());
+ }
+
+ @Test
+ public void testSchedulingRequestValidation() {
+ // Valid
+ assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build());
+ Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
+ allocator.getTargetAllocationTags());
+ Assert.assertEquals("", allocator.getTargetNodePartition());
+
+ // Valid (with partition)
+ assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition("x"))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build());
+ Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
+ allocator.getTargetAllocationTags());
+ Assert.assertEquals("x", allocator.getTargetNodePartition());
+
+ // Valid (without specifying node partition)
+ assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer")).build())
+ .resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build());
+ Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
+ allocator.getTargetAllocationTags());
+ Assert.assertEquals("", allocator.getTargetNodePartition());
+
+ // Valid (with application Id target)
+ assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer")).build())
+ .resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build());
+ // Allocation tags should not include application Id
+ Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
+ allocator.getTargetAllocationTags());
+ Assert.assertEquals("", allocator.getTargetNodePartition());
+
+ // Invalid (without sizing)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer")).build())
+ .build(), true);
+
+ // Invalid (without target tags)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1).build())
+ .build(), true);
+
+ // Invalid (with multiple allocation tags expression specified)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper"),
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build(), true);
+
+ // Invalid (with multiple node partition target expression specified)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper"),
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp(""),
+ PlacementConstraints.PlacementTargets.nodePartition("x"))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build(), true);
+
+ // Invalid (not anti-affinity cardinality)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 1, 2,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build(), true);
+
+ // Invalid (not anti-affinity cardinality)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 2,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build(), true);
+
+ // Invalid (not NODE scope)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.RACK, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build(), true);
+
+ // Invalid (not GUARANTEED)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build(), true);
+ }
+
+ @Test
+ public void testSchedulingRequestUpdate() {
+ SchedulingRequest schedulingRequest =
+ SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build();
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+
+ // Update allocator with exactly same scheduling request, should succeeded.
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+
+ // Update allocator with scheduling request different at #allocations,
+ // should succeeded.
+ schedulingRequest.getResourceSizing().setNumAllocations(10);
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+
+ // Update allocator with scheduling request different at resource,
+ // should failed.
+ schedulingRequest.getResourceSizing().setResources(
+ Resource.newInstance(2048, 1));
+ assertInvalidSchedulingRequest(schedulingRequest, false);
+
+ // Update allocator with a different placement target (allocator tag),
+ // should failed
+ schedulingRequest = SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build();
+ assertInvalidSchedulingRequest(schedulingRequest, false);
+
+ // Update allocator with recover == true
+ int existingNumAllocations =
+ allocator.getSchedulingRequest().getResourceSizing()
+ .getNumAllocations();
+ schedulingRequest = SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build();
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, true);
+ Assert.assertEquals(existingNumAllocations + 1,
+ allocator.getSchedulingRequest().getResourceSizing()
+ .getNumAllocations());
+ }
+
+ @Test
+ public void testFunctionality() throws InvalidAllocationTagsQueryException {
+ SchedulingRequest schedulingRequest =
+ SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build();
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+ allocator.canAllocate(NodeType.NODE_LOCAL,
+ TestUtils.getMockNode("host1", "/rack1", 123, 1024));
+ verify(spyAllocationTagsManager, Mockito.times(1)).getNodeCardinalityByOp(
+ eq(NodeId.fromString("host1:123")), eq(TestUtils.getMockApplicationId(1)),
+ eq(ImmutableSet.of("mapper", "reducer")),
+ any(LongBinaryOperator.class));
+
+ allocator = new SingleConstraintAppPlacementAllocator();
+ allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+ // Valid (with partition)
+ schedulingRequest = SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition("x"))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build();
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+ allocator.canAllocate(NodeType.NODE_LOCAL,
+ TestUtils.getMockNode("host1", "/rack1", 123, 1024));
+ verify(spyAllocationTagsManager, Mockito.atLeast(1)).getNodeCardinalityByOp(
+ eq(NodeId.fromString("host1:123")),
+ eq(TestUtils.getMockApplicationId(1)), eq(ImmutableSet
+ .of("mapper", "reducer")), any(LongBinaryOperator.class));
+
+ SchedulerNode node1 = mock(SchedulerNode.class);
+ when(node1.getPartition()).thenReturn("x");
+ when(node1.getNodeID()).thenReturn(NodeId.fromString("host1:123"));
+
+ Assert.assertTrue(allocator
+ .precheckNode(node1, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
+
+ SchedulerNode node2 = mock(SchedulerNode.class);
+ when(node1.getPartition()).thenReturn("");
+ when(node1.getNodeID()).thenReturn(NodeId.fromString("host2:123"));
+ Assert.assertFalse(allocator
+ .precheckNode(node2, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
+ }
+}